Kafka与.net core(三)kafka操纵_玖富娱乐主管发布


玖富娱乐是一家为代理招商,直属主管信息发布为主的资讯网站,同时也兼顾玖富娱乐代理注册登录地址。

1.Kafka相干学问

  • Broker:即Kafka的效劳器,用户存储音讯,Kafa集群中的一台或多台效劳器统称为broker。
  • Message音讯:是通讯的基本单位,每一个 producer 能够向一个 topic(主题)宣布一些音讯。
    • Kafka中的Message是以topic为基本单位构造的,分歧的topic之间是相互自力的。每一个topic又能够分红几个分歧的partition(每一个topic有几个partition是在竖立topic时指定的),每一个partition存储一部分Message。
    • partition中的每条Message包含了以下三个属性:Kafka基于文件存储.经由历程分区,能够将日记内容疏散到多个server上,来制止文件尺寸到达单机磁盘的上限,每一个partiton都会被以后server(kafka实例)生存能够将一个topic切分多恣意多个partitions,来音讯生存/消耗的效力。
      • offset:音讯独一标识:对应范例:long
      • MessageSize 对应范例:int32
      • data 是message的具体内容。
    • 越多的partitions意味着能够包容更多的consumer,有用提拔并发消耗的才能。
  • Message:在Broker中通Log追加的体式格局举行耐久化存储。并举行分区(patitions)。
    • 一个Topic能够以为是一类音讯,每一个topic将被分红多partition(区),每一个partition在存储层面是append log文件。任何宣布到此partition的音讯都会被直接追加到log文件的尾部,每条音讯在文件中的地位称为offset(偏移量),partition是以文件的情势存储在文件体系中。
    • Logs文件依据broker中的设置装备摆设请求,保存肯定时候后删除来开释磁盘空间。

      

    • Topic物理上的分组,一个 topic能够分为多个 partition,每一个 partition 是一个有序的行列。partition中的每条音讯都会被分派一个有序的 id(offset)。
    • 为完成希罕存储,我们经由历程给文件建索引,每隔肯定字节的数据竖立一条索引

       

-玖富娱乐是一家为代理招商,直属主管信息发布为主的资讯网站,同时也兼顾玖富娱乐代理注册登录地址。-
  • 为了削减磁盘写入的次数,broker会将音讯临时buffer起来,当音讯的个数(或尺寸)到达肯定阀值时,再flush到磁盘,如许削减了磁盘IO挪用的次数。
  • Broker没有副本机制,一旦broker宕机,该broker的音讯将都不可用。Message音讯是有多份的。
  • consumer:音讯和数据消耗者,定阅topics并处置惩罚其宣布的音讯的历程叫做consumers。
    • 在 kafka中,我们能够以为一个group是一个定阅者,一个Topic中的每一个partions,只会被一个定阅者中的一个consumer消耗,不外一个 consumer能够消耗多个partitions中的音讯(消耗者数据小于Partions  的数目时)。注重:kafka的设想道理决议,关于一个topic,同一个group中不克不及有多于partitions个数的consumer同时消耗,否则将意味着某些consumer将没法获得音讯。
    • 一个partition中的音讯只会被group中的一个consumer音讯。每一个group中consumer音讯消耗相互自力。
  • 无状况致使音讯的删除成为困难(能够删除的音讯正在被定阅),kafka接纳基于时候的SLA(效劳水平包管),音讯生存肯定时候(一般为7天)后会被删除。
  • 音讯定阅者能够rewind back到恣意地位从新举行消耗,当定阅者毛病时,能够挑选最小的offset(id)举行从新读取消耗音讯。

2.kafka操纵

2.1.检察有哪些主题:

kafka-topics.sh --list --zookeeper 192.168.0.201:12181

2.2.检察topic的详细信息

kafka-topics.sh -zookeeper 127.0.0.1:2181 -describe -topic testKJ1

2.3.为topic增添副本

kafka-reassign-partitions.sh -zookeeper 127.0.0.1:2181 -reassignment-json-file json/partitions-to-move.json -execute

2.4.竖立topic

kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic testKJ1

2.5为topic增添partition

bin/kafka-topics.sh –zookeeper 127.0.0.1:2181 –alter –partitions 20 –topic testKJ1

2.6kafka生产者客户端敕令

kafka-console-producer.sh --broker-list localhost:9092 --topic testKJ1

2.7kafka消耗者客户端敕令

kafka-console-consumer.sh -zookeeper localhost:2181 --from-beginning --topic testKJ1

2.8kafka效劳启动

kafka-server-start.sh -daemon ../config/server.properties

3..net core操纵

producer端,引入Confluent.Kafka

Install-Package Confluent.Kafka -Version 1.0-beta2
using Confluent.Kafka;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;

namespace KafkaTest
{
    class Program
    {
        static void Main(string[] args)
        {
            Test().Wait();
        }
        static async Task Test()
        {
           var conf = new ProducerConfig { BootstrapServers = "39.**.**.**:9092" };

            Action<DeliveryReportResult<Null, string>> handler = r =>
                Console.WriteLine(!r.Error.IsError
                    ? $"Delivered message to {r.TopicPartitionOffset}"
                    : $"Delivery Error: {r.Error.Reason}");

            using (var p = new Producer<Null, string>(conf))
            {
                for (int i = 0; i < 100000;   i)
                {
                    p.BeginProduce("my-topic", new Message<Null, string> { Value = i.ToString() }, handler);
                }

                // wait for up to 10 seconds for any inflight messages to be delivered.
                p.Flush(TimeSpan.FromSeconds(10));
            }
        }
    }
}

consumer端,引入Confluent.Kafka

Install-Package Confluent.Kafka -Version 1.0-beta2
using Confluent.Kafka;
using System;
using System.Linq;
using System.Text;

namespace KafkaClient
{
    class Program
    {
        static void Main(string[] args)
        {
            

            var conf = new ConsumerConfig
            {
                GroupId = "test-consumer-group4",
                BootstrapServers = "39.**.**.**:9092",
                // Note: The AutoOffsetReset property determines the start offset in the event
                // there are not yet any committed offsets for the consumer group for the
                // topic/partitions of interest. By default, offsets are committed
                // automatically, so in this example, consumption will only start from the
                // earliest message in the topic 'my-topic' the first time you run the program.
                AutoOffsetReset = AutoOffsetResetType.Earliest
            };

            using (var c = new Consumer<Ignore, string>(conf))
            {
                c.Subscribe("my-topic");

                bool consuming = true;
                // The client will automatically recover from non-fatal errors. You typically
                // don't need to take any action unless an error is marked as fatal.
                c.OnError  = (_, e) => consuming = !e.IsFatal;

                while (consuming)
                {
                    try
                    {
                        var cr = c.Consume();
                        Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'.");
                    }
                    catch (ConsumeException e)
                    {
                        Console.WriteLine($"Error occured: {e.Error.Reason}");
                    }
                }

                // Ensure the consumer leaves the group cleanly and final offsets are committed.
                c.Close();
            }
        }
    }
}

 

-玖富娱乐是一家为代理招商,直属主管信息发布为主的资讯网站,同时也兼顾玖富娱乐代理注册登录地址。