Kafka C API如何高效实现消息收发?

99ANYc3cd6
预计阅读时长 32 分钟
位置: 首页 C语言 正文

对于 C 语言开发者,主要有以下几种与 Kafka 交互的方式:

kafka api c语言
(图片来源网络,侵删)
  1. librdkafka:这是最流行、功能最全、性能最高的 C/C++ 客户端库,由 Confluent(一家专注于 Kafka 的公司)开发和维护,是事实上的标准。
  2. Kafka C/C++ 客户端:这是由 Apache 官方维护的客户端库,但近年来更新和维护频率远低于 librdkafka,社区和功能也相对较少。
  3. 通过其他语言的绑定:使用 swiglibrdkafka 生成其他语言的绑定。

强烈推荐使用 librdkafka,本教程将主要围绕 librdkafka 展开。


为什么选择 librdkafka

  • 高性能:异步、非阻塞 I/O,充分利用了现代操作系统的能力。
  • 功能完整:支持生产者、消费者、事务、 Exactly-Once 语义、消费者组、偏移量管理等所有 Kafka 特性。
  • 稳定可靠:被广泛用于生产环境,经过大规模验证。
  • 易于使用:提供了清晰的 API 设计和丰富的配置选项。
  • 跨平台:支持 Linux, macOS, Windows。

环境准备与安装

1 安装 Kafka

你需要一个正在运行的 Kafka 集群,如果你没有,可以按照 Kafka 官方快速入门指南 快速搭建一个本地环境。

2 安装 librdkafka

Linux (Ubuntu/Debian) 使用包管理器安装最简单:

sudo apt-get update
sudo apt-get install -y librdkafka-dev

macOS (Homebrew)

kafka api c语言
(图片来源网络,侵删)
brew install librdkafka

从源码编译 如果需要最新版本或特定功能,可以从源码编译:

git clone https://github.com/confluentinc/librdkafka.git
cd librdkafka
./configure
make
sudo make install

3 项目配置

在你的 C 项目中,需要链接 librdkafka 库。

使用 gcc/g++ 编译时:

gcc -o my_producer my_producer.c -lrdkafka

-lrdkafka 告诉链接器链接 librdkafka 库。

使用 CMake (推荐) CMakeLists.txt 示例:

cmake_minimum_required(VERSION 3.10)
project(MyKafkaProject)
# 查找 librdkafka 包
find_package(PkgConfig REQUIRED)
pkg_check_modules(RDKAFKA REQUIRED librdkafka)
# 添加可执行文件
add_executable(my_producer my_producer.c)
# 链接库
target_link_libraries(my_producer ${RDKAFKA_LIBRARIES})

核心概念

librdkafka 的 API 主要围绕以下几个核心对象:

  • rd_kafka_t:代表一个 Kafka 客户端实例(生产者或消费者)。
  • rd_kafka_topic_t:代表一个 Kafka Topic,通常由 rd_kafka_t 对象通过 rd_kafka_topic_new() 创建。
  • rd_kafka_conf_t:客户端的配置对象,用于设置各种参数(如 Broker 地址、客户端 ID、序列化器等)。
  • rd_kafka_message_t:代表一条消息,用于生产者发送和消费者接收。
  • rd_kafka_queue_t:消息队列,用于异步接收消息或事件。
  • rd_kafka_error_t:错误对象,用于处理操作中发生的错误。

实战:编写一个简单的生产者

这个生产器会向指定的 Topic 发送一条消息。

producer.c

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <librdkafka/rdkafka.h>
int main(int argc, char **argv) {
    if (argc != 4) {
        fprintf(stderr, "Usage: %s <broker> <topic> <message>\n", argv[0]);
        return 1;
    }
    const char *brokers = argv[1];
    const char *topic = argv[2];
    const char *message = argv[3];
    rd_kafka_conf_t *conf = rd_kafka_conf_new();
    char errstr[512];
    // 设置客户端 ID,用于在 Broker 端标识客户端
    if (rd_kafka_conf_set(conf, "client.id", "my-c-producer", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
        fprintf(stderr, "Error setting client.id: %s\n", errstr);
        rd_kafka_conf_destroy(conf);
        return 1;
    }
    // 创建生产者实例
    rd_kafka_t *producer = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));
    if (!producer) {
        fprintf(stderr, "Failed to create producer: %s\n", errstr);
        return 1;
    }
    printf("Producer created successfully.\n");
    printf("Brokers: %s\n", brokers);
    printf("Topic: %s\n", topic);
    printf("Message: %s\n", message);
    // 发送消息
    // rd_kafka_producev 是一个方便的函数,可以一次性指定所有参数
    // RD_KAFKA_V_FLAG 是标志位,RD_KAFKA_V_TOPIC 是主题,RD_KAFKA_V_VALUE 是消息内容
    rd_kafka_producev(
        producer,
        RD_KAFKA_V_TOPIC(topic),
        RD_KAFKA_V_VALUE(message, strlen(message)),
        RD_KAFKA_V_KEY("my-key-1", 8), // 可选的消息键
        RD_KAFKA_V_OPAQUE(NULL), // 可选的用户数据,会在回调中收到
        RD_KAFKA_V_END
    );
    // 检查发送是否被接受到队列
    if (rd_kafka_last_error() != RD_KAFKA_RESP_ERR_NO_ERROR) {
        fprintf(stderr, "Failed to produce message: %s\n", rd_kafka_err2str(rd_kafka_last_error()));
        rd_kafka_destroy(producer);
        return 1;
    }
    printf("Message queued for delivery.\n");
    // 等待消息发送完成
    // 在实际应用中,通常会使用 poll() 或回调来处理发送结果
    // 这里为了简单,我们只是等待一小段时间
    rd_kafka_flush(producer, 10000); // 等待最多10秒,直到所有消息被发送
    if (rd_kafka_outq_len(producer) > 0) {
        fprintf(stderr, "%% %d message(s) still in queue\n", rd_kafka_outq_len(producer));
    }
    // 销毁生产者实例
    rd_kafka_destroy(producer);
    printf("Producer destroyed.\n");
    return 0;
}

编译和运行: 假设你的 Kafka Broker 运行在 localhost:9092,Topic 是 test-topic

# 编译
gcc -o producer producer.c -lrdkafka
# 运行
./producer localhost:9092 test-topic "Hello from C Producer!"

实战:编写一个简单的消费者

这个消费者会订阅一个 Topic 并持续打印接收到的消息。

consumer.c

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <librdkafka/rdkafka.h>
#include <signal.h>
volatile sig_atomic_t run = 1;
// 信号处理函数,用于优雅地退出
static void sigint_handler(int sig) {
    run = 0;
}
// 消费者消息回调函数
static void msg_consume_cb(rd_kafka_t *rk, const rd_kafka_message_t *rkmessage, void *opaque) {
    if (rkmessage->err) {
        fprintf(stderr, "%% Consumer message error: %s\n", rd_kafka_message_errstr(rkmessage));
        return;
    }
    printf("Received message from topic %s [partition %d] at offset %d:\n",
           rd_kafka_topic_name(rkmessage->rkt),
           rkmessage->partition,
           rkmessage->offset);
    // 消息内容
    fwrite(rkmessage->payload, 1, rkmessage->len, stdout);
    printf("\n");
}
int main(int argc, char **argv) {
    if (argc != 3) {
        fprintf(stderr, "Usage: %s <broker> <topic>\n", argv[0]);
        return 1;
    }
    const char *brokers = argv[1];
    const char *topic = argv[2];
    signal(SIGINT, sigint_handler);
    rd_kafka_conf_t *conf = rd_kafka_conf_new();
    char errstr[512];
    // 设置 group.id,用于消费者组
    if (rd_kafka_conf_set(conf, "group.id", "my-c-consumer-group", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
        fprintf(stderr, "Error setting group.id: %s\n", errstr);
        rd_kafka_conf_destroy(conf);
        return 1;
    }
    // 设置自动提交偏移量为 false,以便更好地控制偏移量
    if (rd_kafka_conf_set(conf, "enable.auto.commit", "false", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
        fprintf(stderr, "Error setting enable.auto.commit: %s\n", errstr);
        rd_kafka_conf_destroy(conf);
        return 1;
    }
    // 创建消费者实例
    rd_kafka_t *consumer = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr));
    if (!consumer) {
        fprintf(stderr, "Failed to create consumer: %s\n", errstr);
        return 1;
    }
    printf("Consumer created successfully.\n");
    // 订阅 Topic
    rd_kafka_topic_partition_list_t *topics = rd_kafka_topic_partition_list_new(1);
    rd_kafka_topic_partition_list_add(topics, topic, RD_KAFKA_PARTITION_UA); // RD_KAFKA_PARTITION_UA 表示使用分区分配策略
    if (rd_kafka_subscribe(consumer, topics) != RD_KAFKA_RESP_ERR_NO_ERROR) {
        fprintf(stderr, "Failed to subscribe to %s: %s\n", topic, rd_kafka_err2str(rd_kafka_last_error()));
        rd_kafka_topic_partition_list_destroy(topics);
        rd_kafka_destroy(consumer);
        return 1;
    }
    printf("Subscribed to topic %s\n", topic);
    rd_kafka_topic_partition_list_destroy(topics);
    // 开始消费消息
    while (run) {
        rd_kafka_message_t *rkmessage = rd_kafka_consumer_poll(consumer, 1000); // 等待最多1秒
        if (rkmessage) {
            msg_consume_cb(consumer, rkmessage, NULL);
            rd_kafka_message_destroy(rkmessage);
        }
    }
    printf("\nUnsubscribing and closing consumer...\n");
    rd_kafka_unsubscribe(consumer);
    rd_kafka_consumer_close(consumer);
    // 销毁消费者实例
    rd_kafka_destroy(consumer);
    printf("Consumer destroyed.\n");
    return 0;
}

编译和运行: 确保你的生产者已经向 test-topic 发送了消息。

# 编译
gcc -o consumer consumer.c -lrdkafka
# 运行
./consumer localhost:9092 test-topic

你可以在终端看到消费者打印出的消息,按下 Ctrl+C 可以优雅地停止消费者。


高级特性简介

librdkafka 的强大之处在于其对高级特性的支持。

  • 消费者组:通过设置 group.id,消费者可以自动地加入一个组,Kafka 会为你进行分区再平衡,实现负载均衡和高可用。
  • 偏移量管理
    • enable.auto.commit:控制是否自动提交偏移量。
    • rd_kafka_consumer_commit():手动提交偏移量,这是实现精确一次消费的关键。
  • Exactly-Once 语义:通过 rd_kafka_init_transaction(), rd_kafka_begin_transaction(), rd_kafka_send_offsets_to_transaction() 等事务 API,可以在 Kafka 和外部数据库(如 PostgreSQL)之间实现端到端的精确一次处理。
  • 消息拦截器:可以注册生产者或消费者的回调函数,在消息发送前/后、消息接收后执行自定义逻辑(如修改消息头、记录日志等)。
  • Stats 和 Metrics:通过 stats_cb 回调,可以获取客户端的详细统计信息(如消息速率、延迟、错误率等),用于监控和调试。

学习资源

希望这份指南能帮助你顺利地在 C 语言项目中使用 Kafka!

-- 展开阅读全文 --
头像
dede搭建网站,新手如何快速上手?
« 上一篇 02-26
dede5.7如何开启并配置二级域名?
下一篇 » 02-26

相关文章

取消
微信二维码
支付宝二维码

目录[+]