Chat
Search
Ithy Logo

Creating a Simple Kafka Producer/Consumer Mechanism in C#.NET Core

A Comprehensive Guide to Building Reliable Kafka Integrations with C# and .NET Core

server room data center

Key Takeaways

  • Kafka Setup: Establishing and configuring a Kafka broker for seamless communication.
  • Producer Implementation: Crafting a robust Kafka producer using the Confluent.Kafka library.
  • Consumer Design: Developing a scalable Kafka consumer with proper error handling and offset management.

Introduction

Apache Kafka is a distributed event streaming platform capable of handling high-volume data streams in real-time. Integrating Kafka with .NET Core applications using C# provides a powerful combination for building scalable and robust data processing pipelines. This guide offers a step-by-step approach to creating a simple Kafka producer and consumer mechanism in C#.NET Core, leveraging the Confluent.Kafka library.

Prerequisites

1. Kafka Environment

Ensure you have a running Kafka broker. This can be achieved by setting up Kafka locally using Docker or connecting to a cloud-based Kafka service like Confluent Cloud.

2. .NET Development Tools

Install the .NET Core SDK (version 6.0 or later is recommended) to create and manage .NET Core projects.

3. Confluent.Kafka NuGet Package

The Confluent.Kafka library provides comprehensive support for Kafka operations in .NET. Install it using the following command:

dotnet add package Confluent.Kafka

Setting Up the .NET Core Project

Creating the Console Application

Start by creating a new .NET Core console application. Open your terminal or command prompt and execute:


dotnet new console -n KafkaDemo
cd KafkaDemo
dotnet add package Confluent.Kafka
  

This sequence initializes a new console project named "KafkaDemo" and adds the necessary Kafka library.

Project Structure

Organize your project to accommodate both producer and consumer logic. A recommended structure is:

  • KafkaDemo/
    • Producer.cs
    • Consumer.cs
    • Program.cs
    • App.config (optional for configurations)

Creating the Kafka Producer

Understanding the Producer

A Kafka producer is responsible for sending messages to a specific Kafka topic. It requires configuration parameters such as the bootstrap servers that locate the Kafka brokers.

Producer Configuration

Define the producer settings, including the Kafka broker addresses:


using Confluent.Kafka;
using System;
using System.Threading.Tasks;

public class KafkaProducer : IDisposable
{
    private readonly IProducer<string, string> _producer;
    private readonly string _topic;

    public KafkaProducer(string bootstrapServers, string topic)
    {
        var config = new ProducerConfig
        {
            BootstrapServers = bootstrapServers
        };

        _producer = new ProducerBuilder<string, string>(config).Build();
        _topic = topic;
    }

    public async Task ProduceMessageAsync(string key, string message)
    {
        try
        {
            var deliveryResult = await _producer.ProduceAsync(_topic, new Message<string, string>
            {
                Key = key,
                Value = message
            });

            Console.WriteLine($"Message delivered to {deliveryResult.TopicPartitionOffset}");
        }
        catch (ProduceException<string, string> ex)
        {
            Console.WriteLine($"Delivery failed: {ex.Error.Reason}");
        }
    }

    public void Dispose()
    {
        _producer.Dispose();
    }
}
  

Producer Usage Example

Integrate the producer within your main program to send messages:


class Program
{
    static async Task Main(string[] args)
    {
        string bootstrapServers = "localhost:9092";
        string topic = "demo-topic";

        using var producer = new KafkaProducer(bootstrapServers, topic);
        await producer.ProduceMessageAsync("key1", "Hello, Kafka!");
    }
}
  

Explanation of Key Components

  • ProducerConfig: Contains configuration parameters for the producer, primarily the Kafka broker addresses.
  • IProducer<TKey, TValue>: The producer interface where TKey and TValue define the types of the key and value.
  • ProduceAsync: Sends a message asynchronously to the specified topic.
  • Dispose: Ensures the producer is properly disposed of, releasing any held resources.

Creating the Kafka Consumer

Understanding the Consumer

A Kafka consumer subscribes to one or more topics and processes incoming messages. It manages offsets to keep track of consumed messages, ensuring reliability and scalability.

Consumer Configuration

Set up the consumer configuration, specifying the bootstrap servers, group ID, and offset reset behavior:


using Confluent.Kafka;
using System;
using System.Threading;

public class KafkaConsumer : IDisposable
{
    private readonly IConsumer<string, string> _consumer;
    private readonly string _topic;

    public KafkaConsumer(string bootstrapServers, string groupId, string topic)
    {
        var config = new ConsumerConfig
        {
            BootstrapServers = bootstrapServers,
            GroupId = groupId,
            AutoOffsetReset = AutoOffsetReset.Earliest, // Start from the beginning if no offset is present
            EnableAutoCommit = true
        };

        _consumer = new ConsumerBuilder<string, string>(config).Build();
        _topic = topic;
    }

    public void StartConsuming(CancellationToken cancellationToken)
    {
        _consumer.Subscribe(_topic);

        try
        {
            while (!cancellationToken.IsCancellationRequested)
            {
                var consumeResult = _consumer.Consume(cancellationToken);
                Console.WriteLine($"Received message: '{consumeResult.Message.Value}' at {consumeResult.TopicPartitionOffset}");
                // Process the message here
            }
        }
        catch (OperationCanceledException)
        {
            // Handle cancellation gracefully
        }
        finally
        {
            _consumer.Close();
        }
    }

    public void Dispose()
    {
        _consumer.Dispose();
    }
}
  

Consumer Usage Example

Implement the consumer in your main program to start listening for messages:


class Program
{
    static void Main(string[] args)
    {
        string bootstrapServers = "localhost:9092";
        string groupId = "demo-consumer-group";
        string topic = "demo-topic";

        using var consumer = new KafkaConsumer(bootstrapServers, groupId, topic);
        var cancellationTokenSource = new CancellationTokenSource();

        Console.CancelKeyPress += (_, e) =>
        {
            e.Cancel = true; // Prevent immediate termination
            cancellationTokenSource.Cancel();
            Console.WriteLine("Cancellation requested...");
        };

        consumer.StartConsuming(cancellationTokenSource.Token);
    }
}
  

Explanation of Key Components

  • ConsumerConfig: Configuration settings for the consumer, including broker addresses, group ID, and offset management.
  • IConsumer<TKey, TValue>: The consumer interface where TKey and TValue define the types of the key and value.
  • Subscribe: Subscribes the consumer to a specified topic.
  • Consume: Reads messages from the topic. This method blocks until a message is available or the operation is canceled.
  • Close: Ensures the consumer saves its offsets and leaves the consumer group properly.

Running the Producer and Consumer

Starting the Kafka Broker

If you're running Kafka locally, ensure that your Kafka broker is active. Using Docker Compose simplifies the setup process. Here's an example docker-compose.yml:


version: '2'
services:
  zookeeper:
    image: wurstmeister/zookeeper
    ports:
      - "2181:2181"
  kafka:
    image: wurstmeister/kafka
    ports:
      - "9092:9092"
    environment:
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    depends_on:
      - zookeeper
  

Run the following command to start Kafka:

docker-compose up -d

Running the Producer

Navigate to the producer’s project directory and execute:

dotnet run

You should see output indicating that the message has been delivered, such as:

Message delivered to demo-topic-0@offset

Running the Consumer

In a separate terminal window, navigate to the consumer’s project directory and execute:

dotnet run

The consumer should display the received messages:

Received message: 'Hello, Kafka!' at demo-topic-0@offset

Verifying the Kafka Setup

Ensure that the Kafka topic exists and is receiving messages. You can list topics and check their details using Kafka’s CLI tools:


# List all topics
kafka-topics --list --bootstrap-server localhost:9092

# Describe a specific topic
kafka-topics --describe --topic demo-topic --bootstrap-server localhost:9092
  

If the topic does not exist, you can create it using:


kafka-topics --create --topic demo-topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
  

Enhancing the Producer and Consumer

Error Handling and Logging

For production-ready applications, implement comprehensive error handling and logging to track message deliveries and consumptions. This can involve integrating logging frameworks like Serilog or NLog.

Serialization and Deserialization

To send complex objects, serialize them before producing and deserialize upon consumption. Using JSON is a common approach:


using Newtonsoft.Json;

// Serialization
string serializedMessage = JsonConvert.SerializeObject(yourObject);
await producer.ProduceMessageAsync("key1", serializedMessage);

// Deserialization
var deserializedObject = JsonConvert.DeserializeObject<YourObjectType>(consumeResult.Message.Value);
  

Configuration Management

Manage your configurations externally using appsettings.json or environment variables to accommodate different environments (development, staging, production).

Scaling Consumers

Kafka consumers can scale horizontally by assigning the same group ID to multiple consumer instances, allowing Kafka to distribute partitions among them for load balancing.

Security Considerations

Implement security measures such as SSL encryption and SASL authentication to secure data in transit between producers, consumers, and Kafka brokers.


Recap and Conclusion

Integrating Apache Kafka with C#.NET Core applications enables efficient and scalable real-time data processing. By following this guide, you have learned how to set up a Kafka producer and consumer using the Confluent.Kafka library, configure your environment, and run your applications effectively. For production environments, consider implementing advanced features such as error handling, logging, serialization, configuration management, and security to build a robust Kafka-based system.

References


Last updated January 20, 2025
Ask Ithy AI
Export Article
Delete Article