Apache Kafka is a distributed event streaming platform capable of handling high-volume data streams in real-time. Integrating Kafka with .NET Core applications using C# provides a powerful combination for building scalable and robust data processing pipelines. This guide offers a step-by-step approach to creating a simple Kafka producer and consumer mechanism in C#.NET Core, leveraging the Confluent.Kafka
library.
Ensure you have a running Kafka broker. This can be achieved by setting up Kafka locally using Docker or connecting to a cloud-based Kafka service like Confluent Cloud.
Install the .NET Core SDK (version 6.0 or later is recommended) to create and manage .NET Core projects.
The Confluent.Kafka
library provides comprehensive support for Kafka operations in .NET. Install it using the following command:
dotnet add package Confluent.Kafka
Start by creating a new .NET Core console application. Open your terminal or command prompt and execute:
dotnet new console -n KafkaDemo
cd KafkaDemo
dotnet add package Confluent.Kafka
This sequence initializes a new console project named "KafkaDemo" and adds the necessary Kafka library.
Organize your project to accommodate both producer and consumer logic. A recommended structure is:
A Kafka producer is responsible for sending messages to a specific Kafka topic. It requires configuration parameters such as the bootstrap servers that locate the Kafka brokers.
Define the producer settings, including the Kafka broker addresses:
using Confluent.Kafka;
using System;
using System.Threading.Tasks;
public class KafkaProducer : IDisposable
{
private readonly IProducer<string, string> _producer;
private readonly string _topic;
public KafkaProducer(string bootstrapServers, string topic)
{
var config = new ProducerConfig
{
BootstrapServers = bootstrapServers
};
_producer = new ProducerBuilder<string, string>(config).Build();
_topic = topic;
}
public async Task ProduceMessageAsync(string key, string message)
{
try
{
var deliveryResult = await _producer.ProduceAsync(_topic, new Message<string, string>
{
Key = key,
Value = message
});
Console.WriteLine($"Message delivered to {deliveryResult.TopicPartitionOffset}");
}
catch (ProduceException<string, string> ex)
{
Console.WriteLine($"Delivery failed: {ex.Error.Reason}");
}
}
public void Dispose()
{
_producer.Dispose();
}
}
Integrate the producer within your main program to send messages:
class Program
{
static async Task Main(string[] args)
{
string bootstrapServers = "localhost:9092";
string topic = "demo-topic";
using var producer = new KafkaProducer(bootstrapServers, topic);
await producer.ProduceMessageAsync("key1", "Hello, Kafka!");
}
}
ProducerConfig
: Contains configuration parameters for the producer, primarily the Kafka broker addresses.IProducer<TKey, TValue>
: The producer interface where TKey and TValue define the types of the key and value.ProduceAsync
: Sends a message asynchronously to the specified topic.Dispose
: Ensures the producer is properly disposed of, releasing any held resources.A Kafka consumer subscribes to one or more topics and processes incoming messages. It manages offsets to keep track of consumed messages, ensuring reliability and scalability.
Set up the consumer configuration, specifying the bootstrap servers, group ID, and offset reset behavior:
using Confluent.Kafka;
using System;
using System.Threading;
public class KafkaConsumer : IDisposable
{
private readonly IConsumer<string, string> _consumer;
private readonly string _topic;
public KafkaConsumer(string bootstrapServers, string groupId, string topic)
{
var config = new ConsumerConfig
{
BootstrapServers = bootstrapServers,
GroupId = groupId,
AutoOffsetReset = AutoOffsetReset.Earliest, // Start from the beginning if no offset is present
EnableAutoCommit = true
};
_consumer = new ConsumerBuilder<string, string>(config).Build();
_topic = topic;
}
public void StartConsuming(CancellationToken cancellationToken)
{
_consumer.Subscribe(_topic);
try
{
while (!cancellationToken.IsCancellationRequested)
{
var consumeResult = _consumer.Consume(cancellationToken);
Console.WriteLine($"Received message: '{consumeResult.Message.Value}' at {consumeResult.TopicPartitionOffset}");
// Process the message here
}
}
catch (OperationCanceledException)
{
// Handle cancellation gracefully
}
finally
{
_consumer.Close();
}
}
public void Dispose()
{
_consumer.Dispose();
}
}
Implement the consumer in your main program to start listening for messages:
class Program
{
static void Main(string[] args)
{
string bootstrapServers = "localhost:9092";
string groupId = "demo-consumer-group";
string topic = "demo-topic";
using var consumer = new KafkaConsumer(bootstrapServers, groupId, topic);
var cancellationTokenSource = new CancellationTokenSource();
Console.CancelKeyPress += (_, e) =>
{
e.Cancel = true; // Prevent immediate termination
cancellationTokenSource.Cancel();
Console.WriteLine("Cancellation requested...");
};
consumer.StartConsuming(cancellationTokenSource.Token);
}
}
ConsumerConfig
: Configuration settings for the consumer, including broker addresses, group ID, and offset management.IConsumer<TKey, TValue>
: The consumer interface where TKey and TValue define the types of the key and value.Subscribe
: Subscribes the consumer to a specified topic.Consume
: Reads messages from the topic. This method blocks until a message is available or the operation is canceled.Close
: Ensures the consumer saves its offsets and leaves the consumer group properly.If you're running Kafka locally, ensure that your Kafka broker is active. Using Docker Compose simplifies the setup process. Here's an example docker-compose.yml
:
version: '2'
services:
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka
ports:
- "9092:9092"
environment:
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
depends_on:
- zookeeper
Run the following command to start Kafka:
docker-compose up -d
Navigate to the producer’s project directory and execute:
dotnet run
You should see output indicating that the message has been delivered, such as:
Message delivered to demo-topic-0@offset
In a separate terminal window, navigate to the consumer’s project directory and execute:
dotnet run
The consumer should display the received messages:
Received message: 'Hello, Kafka!' at demo-topic-0@offset
Ensure that the Kafka topic exists and is receiving messages. You can list topics and check their details using Kafka’s CLI tools:
# List all topics
kafka-topics --list --bootstrap-server localhost:9092
# Describe a specific topic
kafka-topics --describe --topic demo-topic --bootstrap-server localhost:9092
If the topic does not exist, you can create it using:
kafka-topics --create --topic demo-topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
For production-ready applications, implement comprehensive error handling and logging to track message deliveries and consumptions. This can involve integrating logging frameworks like Serilog or NLog.
To send complex objects, serialize them before producing and deserialize upon consumption. Using JSON is a common approach:
using Newtonsoft.Json;
// Serialization
string serializedMessage = JsonConvert.SerializeObject(yourObject);
await producer.ProduceMessageAsync("key1", serializedMessage);
// Deserialization
var deserializedObject = JsonConvert.DeserializeObject<YourObjectType>(consumeResult.Message.Value);
Manage your configurations externally using appsettings.json
or environment variables to accommodate different environments (development, staging, production).
Kafka consumers can scale horizontally by assigning the same group ID to multiple consumer instances, allowing Kafka to distribute partitions among them for load balancing.
Implement security measures such as SSL encryption and SASL authentication to secure data in transit between producers, consumers, and Kafka brokers.
Integrating Apache Kafka with C#.NET Core applications enables efficient and scalable real-time data processing. By following this guide, you have learned how to set up a Kafka producer and consumer using the Confluent.Kafka
library, configure your environment, and run your applications effectively. For production environments, consider implementing advanced features such as error handling, logging, serialization, configuration management, and security to build a robust Kafka-based system.