Gearman 是一个通用的分布式任务队列系统,它允许你将工作(任务)分发到多个工作节点上并行处理,C 语言是 Gearman 的原生语言之一,拥有高性能和低开销的特点。

本指南将涵盖以下内容:
- 核心概念回顾
- 环境准备
- 编写 C 语言 Worker(工作进程)
- 编写 C 语言 Client(客户端)
- 编译与运行
- 高级话题
核心概念回顾
在开始编码前,我们先快速回顾 Gearman 的三个核心角色:
- Client (客户端):提交任务的程序,它不关心任务是如何完成的,只关心结果,一个 Web 服务器,当用户上传图片时,它将“图片处理”这个任务提交给 Gearman。
- Worker (工作进程):执行任务的程序,它会连接到 Gearman 服务器,注册自己能处理的任务类型,并等待接收任务,一个专门负责缩略图生成的程序。
- Job Server (任务服务器):中央调度器,它接收来自客户端的任务,并将其分发给合适的空闲 Worker。
环境准备
你需要安装 Gearman 的服务器端和 C 语言的客户端库。
安装 Gearman 服务器
在基于 Debian/Ubuntu 的系统上:

sudo apt-get update sudo apt-get install gearman-server gearman-tools
安装后,启动 Gearman 服务:
sudo systemctl start gearman-server # 或者 sudo service gearman-server start
安装 C 语言客户端库 (libgearman)
这个库提供了 C 语言 API,在基于 Debian/Ubuntu 的系统上:
sudo apt-get install libgearman-dev
libgearman-dev 包含了编译所需的头文件(.h)和链接库(.a 或 .so)。
编写 C 语言 Worker (工作进程)
Worker 的主要工作是:
- 创建一个
gearman_client_st结构体来连接到 Gearman 服务器。 - 注册一个或多个它能处理的任务类型("reverse")。
- 进入一个循环,等待任务。
- 当收到任务时,执行任务逻辑(反转一个字符串)。
- 将结果(或错误)发送回 Gearman 服务器。
- 清理资源。
worker.c
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h> // for sleep()
#include <gearman.h>
// 这是我们的任务处理函数
gearman_return_t reverse_handler(gearman_job_st *job, void *context, size_t *result_size, size_t *result_size_max)
{
(void)context; // 我们不使用 context,所以忽略它
// 获取任务的负载
const char *workload = gearman_job_workload(job);
size_t workload_size = strlen(workload);
// 分配内存来存储结果
char *result = (char *)malloc(workload_size + 1);
if (!result) {
fprintf(stderr, "Error: Failed to allocate memory for result\n");
return GEARMAN_MEMORY_ALLOCATION_FAILURE;
}
// 执行任务:反转字符串
for (size_t i = 0; i < workload_size; i++) {
result[i] = workload[workload_size - 1 - i];
}
result[workload_size] = '\0';
// 将结果发送回服务器
gearman_job_return_result(job, GEARMAN_SUCCESS, result, strlen(result));
free(result); // 释放内存
printf("Worker: Reversed '%s' to '%s'\n", workload, result);
return GEARMAN_SUCCESS;
}
int main(void)
{
gearman_client_st *client;
gearman_return_t ret;
// 1. 创建客户端连接
client = gearman_client_create(NULL);
if (!client) {
fprintf(stderr, "Error: Failed to create client\n");
exit(EXIT_FAILURE);
}
// 2. 连接到 Gearman 服务器
ret = gearman_client_add_server(client, NULL, GEARMAN_DEFAULT_TCP_PORT);
if (ret != GEARMAN_SUCCESS) {
fprintf(stderr, "Error: %s\n", gearman_client_error(client));
gearman_client_free(client);
exit(EXIT_FAILURE);
}
printf("Worker is running and waiting for jobs...\n");
// 3. 注册任务并等待
// gearman_worker_add_function 是更常见的 Worker API,但这里我们用 client 来演示
// 更标准的方式是使用 gearman_worker_st,但为了保持简单,我们先这样写。
// Worker 应该使用 gearman_worker_st,下面我们会用正确的 API 重写。
// 让我们先暂停一下,然后用正确的 gearman_worker_st API 来重写。
// --- 正确的 Worker API 使用方式 ---
gearman_worker_st *worker;
worker = gearman_worker_create(client);
if (!worker) {
fprintf(stderr, "Error: Failed to create worker\n");
gearman_client_free(client);
exit(EXIT_FAILURE);
}
// 注册我们想要处理的任务 "reverse"
gearman_worker_add_function(worker, "reverse", 0, reverse_handler, NULL);
// 进入工作循环
while (1) {
// 等待任务
ret = gearman_worker_work(worker);
if (ret != GEARMAN_SUCCESS && ret != GEARMAN_TIMEOUT) {
fprintf(stderr, "Error: %s\n", gearman_worker_error(worker));
break;
}
}
// 4. 清理资源
gearman_worker_free(worker);
gearman_client_free(client);
return EXIT_SUCCESS;
}
修正后的 Worker (worker_correct.c):
上面的例子为了简化,混用了 client 和 worker 的概念,标准的 Worker 应该使用 gearman_worker_st,以下是更规范的写法:
// worker_correct.c
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <gearman.h>
gearman_return_t reverse_handler(gearman_job_st *job, void *context, size_t *result_size, size_t *result_size_max)
{
const char *workload = gearman_job_workload(job);
size_t workload_len = strlen(workload);
char *result = malloc(workload_len + 1);
if (!result) return GEARMAN_MEMORY_ALLOCATION_FAILURE;
for (size_t i = 0; i < workload_len; i++) {
result[i] = workload[workload_len - 1 - i];
}
result[workload_len] = '\0';
gearman_job_return_result(job, GEARMAN_SUCCESS, result, workload_len);
free(result);
printf("Worker: Processed job. Reversed '%s' to '%s'\n", workload, result);
return GEARMAN_SUCCESS;
}
int main(void)
{
gearman_worker_st *worker = gearman_worker_create(NULL);
if (!worker) {
fprintf(stderr, "Error: Failed to create worker\n");
return EXIT_FAILURE;
}
if (gearman_worker_add_server(worker, NULL, GEARMAN_DEFAULT_TCP_PORT) != GEARMAN_SUCCESS) {
fprintf(stderr, "Error: %s\n", gearman_worker_error(worker));
gearman_worker_free(worker);
return EXIT_FAILURE;
}
printf("Worker is running and waiting for 'reverse' jobs...\n");
gearman_worker_add_function(worker, "reverse", 0, reverse_handler, NULL);
while (1) {
gearman_return_t ret = gearman_worker_work(worker);
if (ret != GEARMAN_SUCCESS && ret != GEARMAN_TIMEOUT) {
fprintf(stderr, "Worker error: %s\n", gearman_worker_error(worker));
break;
}
}
gearman_worker_free(worker);
return EXIT_SUCCESS;
}
编写 C 语言 Client (客户端)
Client 的主要工作是:
- 创建一个
gearman_client_st结构体来连接到 Gearman 服务器。 - 提交一个任务(可以同步或异步)。
- (如果是同步)等待并获取任务结果。
- 清理资源。
client.c
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <gearman.h>
int main(int argc, char *argv[])
{
if (argc < 2) {
fprintf(stderr, "Usage: %s <string_to_reverse>\n", argv[0]);
return EXIT_FAILURE;
}
gearman_client_st *client;
gearman_return_t ret;
const char *task_name = "reverse";
const char *workload = argv[1];
char *result = NULL;
size_t result_size;
// 1. 创建客户端
client = gearman_client_create(NULL);
if (!client) {
fprintf(stderr, "Error: Failed to create client\n");
return EXIT_FAILURE;
}
// 2. 连接到服务器
ret = gearman_client_add_server(client, NULL, GEARMAN_DEFAULT_TCP_PORT);
if (ret != GEARMAN_SUCCESS) {
fprintf(stderr, "Error: %s\n", gearman_client_error(client));
gearman_client_free(client);
return EXIT_FAILURE;
}
// 3. 提交任务(同步方式)
// GEARMAN_JOB_PRIORITY_NORMAL, GEARMAN_JOB_BACKGROUND, GEARMAN_JOB_HIGH
// GEARMAN_JOB_BACKGROUND 是异步的,不会等待结果
// GEARMAN_JOB_PRIORITY_NORMAL 是同步的,会阻塞直到完成
result = gearman_client_do(client, GEARMAN_JOB_PRIORITY_NORMAL, task_name, workload, strlen(workload), &result_size, &ret);
// 4. 检查结果
if (ret != GEARMAN_SUCCESS) {
fprintf(stderr, "Error: %s\n", gearman_client_error(client));
} else {
printf("Client: Job submitted successfully.\n");
printf("Client: Result: %s\n", result);
}
// 5. 清理资源
free(result); // gearman_client_do 会为结果分配内存,需要手动释放
gearman_client_free(client);
return EXIT_SUCCESS;
}
编译与运行
现在我们来编译这两个程序。gearman_client_create 和 gearman_worker_create 等函数在 libgearman 库中。
编译 Worker
gcc worker_correct.c -o worker_correct -lgearman
编译 Client
gcc client.c -o client -lgearman
运行步骤
-
确保 Gearman 服务器正在运行:
gearman -d
-d表示以后台守护进程模式运行。 -
启动 Worker(在一个终端中):
./worker_correct
你会看到输出:
Worker is running and waiting for 'reverse' jobs... -
运行 Client(在另一个终端中),并传递一个字符串:
./client "hello gearman"
你会在 Client 的终端看到:
Client: Job submitted successfully. Client: Result: namraeg olleh在 Worker 的终端,你会看到:
Worker: Processed job. Reversed 'hello gearman' to 'namraeg olleh'
高级话题
异步任务提交
上面的 Client 是同步的,它会阻塞直到任务完成,Gearman 也支持异步提交,这对于不希望等待响应的场景(如日志记录)非常有用。
client_async.c
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <gearman.h>
int main(void)
{
gearman_client_st *client = gearman_client_create(NULL);
if (!client) { /* ... error handling ... */ }
if (gearman_client_add_server(client, NULL, GEARMAN_DEFAULT_TCP_PORT) != GEARMAN_SUCCESS) {
fprintf(stderr, "Error: %s\n", gearman_client_error(client));
gearman_client_free(client);
return EXIT_FAILURE;
}
const char *workload = "async task";
gearman_job_handle_t job_handle;
printf("Client: Submitting async job...\n");
// 异步提交任务
gearman_return_t ret = gearman_client_do_background(client, "reverse", NULL, workload, strlen(workload), &job_handle);
if (ret != GEARMAN_SUCCESS) {
fprintf(stderr, "Error: %s\n", gearman_client_error(client));
gearman_client_free(client);
return EXIT_FAILURE;
}
printf("Client: Job submitted with handle: %s\n", job_handle);
// 注意:对于异步任务,我们通常不在这里等待结果。
// 结果可能通过其他方式(如另一个 Worker 或回调)处理。
// 或者,我们可以轮询任务状态。
// 轮询任务状态直到完成
while (1) {
gearman_job_state_t state = gearman_client_job_status(client, job_handle, NULL, NULL);
if (state == GEARMAN_JOB_STATE_UNKNOWN) {
// 任务可能已经完成并被服务器移除
break;
} else if (state == GEARMAN_JOB_STATE_FAILED) {
printf("Client: Job failed.\n");
break;
} else if (state == GEARMAN_JOB_STATE_COMPLETE) {
printf("Client: Job is complete.\n");
break;
}
printf("Client: Job status: %d. Waiting...\n", state);
sleep(1);
}
gearman_client_free(client);
return EXIT_SUCCESS;
}
回调式异步处理
对于更复杂的异步场景,可以使用回调函数来处理任务完成事件,这避免了手动轮询。
// 回调函数原型
typedef gearman_return_t (*gearman_worker_fn)(gearman_job_st *, void *, size_t *, size_t *);
// 客户端回调函数
gearman_return_t complete_callback(gearman_task_st *task, void *context, size_t result_size, const void *result)
{
(void)context;
if (gearman_task_success(task)) {
printf("Client (Callback): Job successful. Result: %.*s\n", (int)result_size, (const char *)result);
} else {
printf("Client (Callback): Job failed. Error: %s\n", gearman_task_error(task));
}
return GEARMAN_SUCCESS;
}
int main_callback(void)
{
// ... (client and server setup) ...
gearman_task_st *task = gearman_client_do_task(client, "reverse", NULL, workload, strlen(workload), complete_callback, NULL, &ret);
if (!task || ret != GEARMAN_SUCCESS) { /* ... error ... */ }
// 主循环,等待回调被触发
while (gearman_client_run_tasks(client) == GEARMAN_IO_WAIT) {
// 处理 I/O 事件
}
// ... (cleanup) ...
}
- Worker:使用
gearman_worker_st,通过gearman_worker_add_function注册任务处理函数,然后在循环中调用gearman_worker_work。 - Client:使用
gearman_client_st,通过gearman_client_do(同步) 或gearman_client_do_background(异步) 提交任务。 - 编译:始终链接
-lgearman库。 - 调试:
gearman_client_error()和gearman_worker_error()是你最好的朋友,可以帮你定位连接或任务执行中的问题。
通过 C 语言使用 Gearman 可以构建高性能、可扩展的后台处理系统,非常适合 C/C++ 项目。
