本文共 3491 字,大约阅读时间需要 11 分钟。
Kafka是一个高度可扩展的分布式流处理平台,由LinkedIn开发并归属于Apache开源项目。其主要功能包括消息发布、消息订阅、流数据存储以及高效的流处理。Kafka不仅可以作为传统的消息队列使用,也适合构建实时数据管道或流式应用程序。
Kafka具有以下三个关键特性:
流式数据处理
主要应用场景
Kafka的核心组件
利用Docker安装Kafka非常简单。以下是完整的Docker Compose配置文件:
version: '3'services: zookeeper: image: wurstmeister/zookeeper ports: - "2181:2181" kafka: image: wurstmeister/kafka depends_on: - zookeeper ports: - "9092:9092" environment: - KAFKA_ADVERTISED_HOST_NAME=192.168.0.117 - KAFKA_CREATE_TOPICS=test:3:1 - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
运行以下命令启动Kafka环境:
sudo docker-compose up -d
using Confluent.Kafka;using System;using System.Threading;using System.Collections.Generic;public class Program{ public static async Task Main(string[] args) { Console.WriteLine("Hello World Producer!"); var config = new ProducerConfig { BootstrapServers = "192.168.0.117:9092", ClientId = Dns.GetHostName() }; using (var producer = new ProducerBuilder(config).Build()) { string topic = "test"; for (int i = 0; i < 100; i++) { var msg = $"message {i}"; Console.WriteLine($"Send message: {msg}"); var result = await producer.ProduceAsync(topic, new Message { Value = msg }); Console.WriteLine($"Result: key {result.Key}, value {result.Value}, partition {result.TopicPartition}"); Thread.Sleep(500); } } Console.ReadLine(); }}
using Confluent.Kafka;using System;using System.Collections.Generic;public class ConsumerProgram{ public static void Main(string[] args) { Console.WriteLine("Hello World Kafka Consumer!"); var config = new ConsumerConfig { BootstrapServers = "192.168.0.117:9092", GroupId = "foo", AutoOffsetReset = AutoOffsetReset.Earliest }; var cancelToken = new CancellationToken(); using (var consumer = new ConsumerBuilder(config).Build()) { string topic = "test"; consumer.Subscribe(topic); while (!cancelToken.IsCanceled) { try { var consumeResult = consumer.Consume(CancellationToken.None); Console.WriteLine( $"Consumer message: {consumeResult.Message.Value}, " + $"Topic: {consumeResult.Topic}, Partition: {consumeResult.Partition}"); } catch (KafkaException e) { Consolehetto.WriteLine(e.ToString()); } } consumer.Close(); } }}
Kafka适用于需要处理大量实时数据的场景,尤其在金融、社交网络和物流等领域表现出色。通过合理配置和使用适当的Partition数,开发者可以根据应用需求调整吞吐量和可靠性。
转载地址:http://oetyk.baihongyu.com/