对于 C 语言开发者,主要有以下几种与 Kafka 交互的方式:

(图片来源网络,侵删)
librdkafka:这是最流行、功能最全、性能最高的 C/C++ 客户端库,由 Confluent(一家专注于 Kafka 的公司)开发和维护,是事实上的标准。- Kafka C/C++ 客户端:这是由 Apache 官方维护的客户端库,但近年来更新和维护频率远低于
librdkafka,社区和功能也相对较少。 - 通过其他语言的绑定:使用
swig为librdkafka生成其他语言的绑定。
强烈推荐使用 librdkafka,本教程将主要围绕 librdkafka 展开。
为什么选择 librdkafka?
- 高性能:异步、非阻塞 I/O,充分利用了现代操作系统的能力。
- 功能完整:支持生产者、消费者、事务、 Exactly-Once 语义、消费者组、偏移量管理等所有 Kafka 特性。
- 稳定可靠:被广泛用于生产环境,经过大规模验证。
- 易于使用:提供了清晰的 API 设计和丰富的配置选项。
- 跨平台:支持 Linux, macOS, Windows。
环境准备与安装
1 安装 Kafka
你需要一个正在运行的 Kafka 集群,如果你没有,可以按照 Kafka 官方快速入门指南 快速搭建一个本地环境。
2 安装 librdkafka
Linux (Ubuntu/Debian) 使用包管理器安装最简单:
sudo apt-get update sudo apt-get install -y librdkafka-dev
macOS (Homebrew)

(图片来源网络,侵删)
brew install librdkafka
从源码编译 如果需要最新版本或特定功能,可以从源码编译:
git clone https://github.com/confluentinc/librdkafka.git cd librdkafka ./configure make sudo make install
3 项目配置
在你的 C 项目中,需要链接 librdkafka 库。
使用 gcc/g++ 编译时:
gcc -o my_producer my_producer.c -lrdkafka
-lrdkafka 告诉链接器链接 librdkafka 库。
使用 CMake (推荐)
CMakeLists.txt 示例:
cmake_minimum_required(VERSION 3.10)
project(MyKafkaProject)
# 查找 librdkafka 包
find_package(PkgConfig REQUIRED)
pkg_check_modules(RDKAFKA REQUIRED librdkafka)
# 添加可执行文件
add_executable(my_producer my_producer.c)
# 链接库
target_link_libraries(my_producer ${RDKAFKA_LIBRARIES})
核心概念
librdkafka 的 API 主要围绕以下几个核心对象:
rd_kafka_t:代表一个 Kafka 客户端实例(生产者或消费者)。rd_kafka_topic_t:代表一个 Kafka Topic,通常由rd_kafka_t对象通过rd_kafka_topic_new()创建。rd_kafka_conf_t:客户端的配置对象,用于设置各种参数(如 Broker 地址、客户端 ID、序列化器等)。rd_kafka_message_t:代表一条消息,用于生产者发送和消费者接收。rd_kafka_queue_t:消息队列,用于异步接收消息或事件。rd_kafka_error_t:错误对象,用于处理操作中发生的错误。
实战:编写一个简单的生产者
这个生产器会向指定的 Topic 发送一条消息。
producer.c
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <librdkafka/rdkafka.h>
int main(int argc, char **argv) {
if (argc != 4) {
fprintf(stderr, "Usage: %s <broker> <topic> <message>\n", argv[0]);
return 1;
}
const char *brokers = argv[1];
const char *topic = argv[2];
const char *message = argv[3];
rd_kafka_conf_t *conf = rd_kafka_conf_new();
char errstr[512];
// 设置客户端 ID,用于在 Broker 端标识客户端
if (rd_kafka_conf_set(conf, "client.id", "my-c-producer", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
fprintf(stderr, "Error setting client.id: %s\n", errstr);
rd_kafka_conf_destroy(conf);
return 1;
}
// 创建生产者实例
rd_kafka_t *producer = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));
if (!producer) {
fprintf(stderr, "Failed to create producer: %s\n", errstr);
return 1;
}
printf("Producer created successfully.\n");
printf("Brokers: %s\n", brokers);
printf("Topic: %s\n", topic);
printf("Message: %s\n", message);
// 发送消息
// rd_kafka_producev 是一个方便的函数,可以一次性指定所有参数
// RD_KAFKA_V_FLAG 是标志位,RD_KAFKA_V_TOPIC 是主题,RD_KAFKA_V_VALUE 是消息内容
rd_kafka_producev(
producer,
RD_KAFKA_V_TOPIC(topic),
RD_KAFKA_V_VALUE(message, strlen(message)),
RD_KAFKA_V_KEY("my-key-1", 8), // 可选的消息键
RD_KAFKA_V_OPAQUE(NULL), // 可选的用户数据,会在回调中收到
RD_KAFKA_V_END
);
// 检查发送是否被接受到队列
if (rd_kafka_last_error() != RD_KAFKA_RESP_ERR_NO_ERROR) {
fprintf(stderr, "Failed to produce message: %s\n", rd_kafka_err2str(rd_kafka_last_error()));
rd_kafka_destroy(producer);
return 1;
}
printf("Message queued for delivery.\n");
// 等待消息发送完成
// 在实际应用中,通常会使用 poll() 或回调来处理发送结果
// 这里为了简单,我们只是等待一小段时间
rd_kafka_flush(producer, 10000); // 等待最多10秒,直到所有消息被发送
if (rd_kafka_outq_len(producer) > 0) {
fprintf(stderr, "%% %d message(s) still in queue\n", rd_kafka_outq_len(producer));
}
// 销毁生产者实例
rd_kafka_destroy(producer);
printf("Producer destroyed.\n");
return 0;
}
编译和运行:
假设你的 Kafka Broker 运行在 localhost:9092,Topic 是 test-topic。
# 编译 gcc -o producer producer.c -lrdkafka # 运行 ./producer localhost:9092 test-topic "Hello from C Producer!"
实战:编写一个简单的消费者
这个消费者会订阅一个 Topic 并持续打印接收到的消息。
consumer.c
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <librdkafka/rdkafka.h>
#include <signal.h>
volatile sig_atomic_t run = 1;
// 信号处理函数,用于优雅地退出
static void sigint_handler(int sig) {
run = 0;
}
// 消费者消息回调函数
static void msg_consume_cb(rd_kafka_t *rk, const rd_kafka_message_t *rkmessage, void *opaque) {
if (rkmessage->err) {
fprintf(stderr, "%% Consumer message error: %s\n", rd_kafka_message_errstr(rkmessage));
return;
}
printf("Received message from topic %s [partition %d] at offset %d:\n",
rd_kafka_topic_name(rkmessage->rkt),
rkmessage->partition,
rkmessage->offset);
// 消息内容
fwrite(rkmessage->payload, 1, rkmessage->len, stdout);
printf("\n");
}
int main(int argc, char **argv) {
if (argc != 3) {
fprintf(stderr, "Usage: %s <broker> <topic>\n", argv[0]);
return 1;
}
const char *brokers = argv[1];
const char *topic = argv[2];
signal(SIGINT, sigint_handler);
rd_kafka_conf_t *conf = rd_kafka_conf_new();
char errstr[512];
// 设置 group.id,用于消费者组
if (rd_kafka_conf_set(conf, "group.id", "my-c-consumer-group", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
fprintf(stderr, "Error setting group.id: %s\n", errstr);
rd_kafka_conf_destroy(conf);
return 1;
}
// 设置自动提交偏移量为 false,以便更好地控制偏移量
if (rd_kafka_conf_set(conf, "enable.auto.commit", "false", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
fprintf(stderr, "Error setting enable.auto.commit: %s\n", errstr);
rd_kafka_conf_destroy(conf);
return 1;
}
// 创建消费者实例
rd_kafka_t *consumer = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr));
if (!consumer) {
fprintf(stderr, "Failed to create consumer: %s\n", errstr);
return 1;
}
printf("Consumer created successfully.\n");
// 订阅 Topic
rd_kafka_topic_partition_list_t *topics = rd_kafka_topic_partition_list_new(1);
rd_kafka_topic_partition_list_add(topics, topic, RD_KAFKA_PARTITION_UA); // RD_KAFKA_PARTITION_UA 表示使用分区分配策略
if (rd_kafka_subscribe(consumer, topics) != RD_KAFKA_RESP_ERR_NO_ERROR) {
fprintf(stderr, "Failed to subscribe to %s: %s\n", topic, rd_kafka_err2str(rd_kafka_last_error()));
rd_kafka_topic_partition_list_destroy(topics);
rd_kafka_destroy(consumer);
return 1;
}
printf("Subscribed to topic %s\n", topic);
rd_kafka_topic_partition_list_destroy(topics);
// 开始消费消息
while (run) {
rd_kafka_message_t *rkmessage = rd_kafka_consumer_poll(consumer, 1000); // 等待最多1秒
if (rkmessage) {
msg_consume_cb(consumer, rkmessage, NULL);
rd_kafka_message_destroy(rkmessage);
}
}
printf("\nUnsubscribing and closing consumer...\n");
rd_kafka_unsubscribe(consumer);
rd_kafka_consumer_close(consumer);
// 销毁消费者实例
rd_kafka_destroy(consumer);
printf("Consumer destroyed.\n");
return 0;
}
编译和运行:
确保你的生产者已经向 test-topic 发送了消息。
# 编译 gcc -o consumer consumer.c -lrdkafka # 运行 ./consumer localhost:9092 test-topic
你可以在终端看到消费者打印出的消息,按下 Ctrl+C 可以优雅地停止消费者。
高级特性简介
librdkafka 的强大之处在于其对高级特性的支持。
- 消费者组:通过设置
group.id,消费者可以自动地加入一个组,Kafka 会为你进行分区再平衡,实现负载均衡和高可用。 - 偏移量管理:
enable.auto.commit:控制是否自动提交偏移量。rd_kafka_consumer_commit():手动提交偏移量,这是实现精确一次消费的关键。
- Exactly-Once 语义:通过
rd_kafka_init_transaction(),rd_kafka_begin_transaction(),rd_kafka_send_offsets_to_transaction()等事务 API,可以在 Kafka 和外部数据库(如 PostgreSQL)之间实现端到端的精确一次处理。 - 消息拦截器:可以注册生产者或消费者的回调函数,在消息发送前/后、消息接收后执行自定义逻辑(如修改消息头、记录日志等)。
- Stats 和 Metrics:通过
stats_cb回调,可以获取客户端的详细统计信息(如消息速率、延迟、错误率等),用于监控和调试。
学习资源
librdkafka官方文档:这是最重要的资源,包含了所有 API 的详细说明和配置项列表。- 示例代码:官方库中提供了大量高质量的示例代码,是学习的最佳起点。
- Confluent Developer Blog:有很多关于 Kafka 和
librdkafka使用技巧的深入文章。
希望这份指南能帮助你顺利地在 C 语言项目中使用 Kafka!
