Gearman C语言如何高效实现任务分发?

99ANYc3cd6
预计阅读时长 43 分钟
位置: 首页 C语言 正文

Gearman 是一个通用的分布式任务队列系统,它允许你将工作(任务)分发到多个工作节点上并行处理,C 语言是 Gearman 的原生语言之一,拥有高性能和低开销的特点。

gearman c语言
(图片来源网络,侵删)

本指南将涵盖以下内容:

  1. 核心概念回顾
  2. 环境准备
  3. 编写 C 语言 Worker(工作进程)
  4. 编写 C 语言 Client(客户端)
  5. 编译与运行
  6. 高级话题

核心概念回顾

在开始编码前,我们先快速回顾 Gearman 的三个核心角色:

  • Client (客户端):提交任务的程序,它不关心任务是如何完成的,只关心结果,一个 Web 服务器,当用户上传图片时,它将“图片处理”这个任务提交给 Gearman。
  • Worker (工作进程):执行任务的程序,它会连接到 Gearman 服务器,注册自己能处理的任务类型,并等待接收任务,一个专门负责缩略图生成的程序。
  • Job Server (任务服务器):中央调度器,它接收来自客户端的任务,并将其分发给合适的空闲 Worker。

环境准备

你需要安装 Gearman 的服务器端和 C 语言的客户端库。

安装 Gearman 服务器

在基于 Debian/Ubuntu 的系统上:

gearman c语言
(图片来源网络,侵删)
sudo apt-get update
sudo apt-get install gearman-server gearman-tools

安装后,启动 Gearman 服务:

sudo systemctl start gearman-server
# 或者
sudo service gearman-server start

安装 C 语言客户端库 (libgearman)

这个库提供了 C 语言 API,在基于 Debian/Ubuntu 的系统上:

sudo apt-get install libgearman-dev

libgearman-dev 包含了编译所需的头文件(.h)和链接库(.a.so)。


编写 C 语言 Worker (工作进程)

Worker 的主要工作是:

  1. 创建一个 gearman_client_st 结构体来连接到 Gearman 服务器。
  2. 注册一个或多个它能处理的任务类型("reverse")。
  3. 进入一个循环,等待任务。
  4. 当收到任务时,执行任务逻辑(反转一个字符串)。
  5. 将结果(或错误)发送回 Gearman 服务器。
  6. 清理资源。

worker.c

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h> // for sleep()
#include <gearman.h>
// 这是我们的任务处理函数
gearman_return_t reverse_handler(gearman_job_st *job, void *context, size_t *result_size, size_t *result_size_max)
{
    (void)context; // 我们不使用 context,所以忽略它
    // 获取任务的负载
    const char *workload = gearman_job_workload(job);
    size_t workload_size = strlen(workload);
    // 分配内存来存储结果
    char *result = (char *)malloc(workload_size + 1);
    if (!result) {
        fprintf(stderr, "Error: Failed to allocate memory for result\n");
        return GEARMAN_MEMORY_ALLOCATION_FAILURE;
    }
    // 执行任务:反转字符串
    for (size_t i = 0; i < workload_size; i++) {
        result[i] = workload[workload_size - 1 - i];
    }
    result[workload_size] = '\0';
    // 将结果发送回服务器
    gearman_job_return_result(job, GEARMAN_SUCCESS, result, strlen(result));
    free(result); // 释放内存
    printf("Worker: Reversed '%s' to '%s'\n", workload, result);
    return GEARMAN_SUCCESS;
}
int main(void)
{
    gearman_client_st *client;
    gearman_return_t ret;
    // 1. 创建客户端连接
    client = gearman_client_create(NULL);
    if (!client) {
        fprintf(stderr, "Error: Failed to create client\n");
        exit(EXIT_FAILURE);
    }
    // 2. 连接到 Gearman 服务器
    ret = gearman_client_add_server(client, NULL, GEARMAN_DEFAULT_TCP_PORT);
    if (ret != GEARMAN_SUCCESS) {
        fprintf(stderr, "Error: %s\n", gearman_client_error(client));
        gearman_client_free(client);
        exit(EXIT_FAILURE);
    }
    printf("Worker is running and waiting for jobs...\n");
    // 3. 注册任务并等待
    // gearman_worker_add_function 是更常见的 Worker API,但这里我们用 client 来演示
    // 更标准的方式是使用 gearman_worker_st,但为了保持简单,我们先这样写。
    // Worker 应该使用 gearman_worker_st,下面我们会用正确的 API 重写。
    // 让我们先暂停一下,然后用正确的 gearman_worker_st API 来重写。
    // --- 正确的 Worker API 使用方式 ---
    gearman_worker_st *worker;
    worker = gearman_worker_create(client);
    if (!worker) {
        fprintf(stderr, "Error: Failed to create worker\n");
        gearman_client_free(client);
        exit(EXIT_FAILURE);
    }
    // 注册我们想要处理的任务 "reverse"
    gearman_worker_add_function(worker, "reverse", 0, reverse_handler, NULL);
    // 进入工作循环
    while (1) {
        // 等待任务
        ret = gearman_worker_work(worker);
        if (ret != GEARMAN_SUCCESS && ret != GEARMAN_TIMEOUT) {
            fprintf(stderr, "Error: %s\n", gearman_worker_error(worker));
            break;
        }
    }
    // 4. 清理资源
    gearman_worker_free(worker);
    gearman_client_free(client);
    return EXIT_SUCCESS;
}

修正后的 Worker (worker_correct.c): 上面的例子为了简化,混用了 client 和 worker 的概念,标准的 Worker 应该使用 gearman_worker_st,以下是更规范的写法:

// worker_correct.c
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <gearman.h>
gearman_return_t reverse_handler(gearman_job_st *job, void *context, size_t *result_size, size_t *result_size_max)
{
    const char *workload = gearman_job_workload(job);
    size_t workload_len = strlen(workload);
    char *result = malloc(workload_len + 1);
    if (!result) return GEARMAN_MEMORY_ALLOCATION_FAILURE;
    for (size_t i = 0; i < workload_len; i++) {
        result[i] = workload[workload_len - 1 - i];
    }
    result[workload_len] = '\0';
    gearman_job_return_result(job, GEARMAN_SUCCESS, result, workload_len);
    free(result);
    printf("Worker: Processed job. Reversed '%s' to '%s'\n", workload, result);
    return GEARMAN_SUCCESS;
}
int main(void)
{
    gearman_worker_st *worker = gearman_worker_create(NULL);
    if (!worker) {
        fprintf(stderr, "Error: Failed to create worker\n");
        return EXIT_FAILURE;
    }
    if (gearman_worker_add_server(worker, NULL, GEARMAN_DEFAULT_TCP_PORT) != GEARMAN_SUCCESS) {
        fprintf(stderr, "Error: %s\n", gearman_worker_error(worker));
        gearman_worker_free(worker);
        return EXIT_FAILURE;
    }
    printf("Worker is running and waiting for 'reverse' jobs...\n");
    gearman_worker_add_function(worker, "reverse", 0, reverse_handler, NULL);
    while (1) {
        gearman_return_t ret = gearman_worker_work(worker);
        if (ret != GEARMAN_SUCCESS && ret != GEARMAN_TIMEOUT) {
            fprintf(stderr, "Worker error: %s\n", gearman_worker_error(worker));
            break;
        }
    }
    gearman_worker_free(worker);
    return EXIT_SUCCESS;
}

编写 C 语言 Client (客户端)

Client 的主要工作是:

  1. 创建一个 gearman_client_st 结构体来连接到 Gearman 服务器。
  2. 提交一个任务(可以同步或异步)。
  3. (如果是同步)等待并获取任务结果。
  4. 清理资源。

client.c

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <gearman.h>
int main(int argc, char *argv[])
{
    if (argc < 2) {
        fprintf(stderr, "Usage: %s <string_to_reverse>\n", argv[0]);
        return EXIT_FAILURE;
    }
    gearman_client_st *client;
    gearman_return_t ret;
    const char *task_name = "reverse";
    const char *workload = argv[1];
    char *result = NULL;
    size_t result_size;
    // 1. 创建客户端
    client = gearman_client_create(NULL);
    if (!client) {
        fprintf(stderr, "Error: Failed to create client\n");
        return EXIT_FAILURE;
    }
    // 2. 连接到服务器
    ret = gearman_client_add_server(client, NULL, GEARMAN_DEFAULT_TCP_PORT);
    if (ret != GEARMAN_SUCCESS) {
        fprintf(stderr, "Error: %s\n", gearman_client_error(client));
        gearman_client_free(client);
        return EXIT_FAILURE;
    }
    // 3. 提交任务(同步方式)
    // GEARMAN_JOB_PRIORITY_NORMAL, GEARMAN_JOB_BACKGROUND, GEARMAN_JOB_HIGH
    // GEARMAN_JOB_BACKGROUND 是异步的,不会等待结果
    // GEARMAN_JOB_PRIORITY_NORMAL 是同步的,会阻塞直到完成
    result = gearman_client_do(client, GEARMAN_JOB_PRIORITY_NORMAL, task_name, workload, strlen(workload), &result_size, &ret);
    // 4. 检查结果
    if (ret != GEARMAN_SUCCESS) {
        fprintf(stderr, "Error: %s\n", gearman_client_error(client));
    } else {
        printf("Client: Job submitted successfully.\n");
        printf("Client: Result: %s\n", result);
    }
    // 5. 清理资源
    free(result); // gearman_client_do 会为结果分配内存,需要手动释放
    gearman_client_free(client);
    return EXIT_SUCCESS;
}

编译与运行

现在我们来编译这两个程序。gearman_client_creategearman_worker_create 等函数在 libgearman 库中。

编译 Worker

gcc worker_correct.c -o worker_correct -lgearman

编译 Client

gcc client.c -o client -lgearman

运行步骤

  1. 确保 Gearman 服务器正在运行

    gearman -d

    -d 表示以后台守护进程模式运行。

  2. 启动 Worker(在一个终端中):

    ./worker_correct

    你会看到输出:

    Worker is running and waiting for 'reverse' jobs...
  3. 运行 Client(在另一个终端中),并传递一个字符串:

    ./client "hello gearman"

    你会在 Client 的终端看到:

    Client: Job submitted successfully.
    Client: Result: namraeg olleh

    在 Worker 的终端,你会看到:

    Worker: Processed job. Reversed 'hello gearman' to 'namraeg olleh'

高级话题

异步任务提交

上面的 Client 是同步的,它会阻塞直到任务完成,Gearman 也支持异步提交,这对于不希望等待响应的场景(如日志记录)非常有用。

client_async.c

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <gearman.h>
int main(void)
{
    gearman_client_st *client = gearman_client_create(NULL);
    if (!client) { /* ... error handling ... */ }
    if (gearman_client_add_server(client, NULL, GEARMAN_DEFAULT_TCP_PORT) != GEARMAN_SUCCESS) {
        fprintf(stderr, "Error: %s\n", gearman_client_error(client));
        gearman_client_free(client);
        return EXIT_FAILURE;
    }
    const char *workload = "async task";
    gearman_job_handle_t job_handle;
    printf("Client: Submitting async job...\n");
    // 异步提交任务
    gearman_return_t ret = gearman_client_do_background(client, "reverse", NULL, workload, strlen(workload), &job_handle);
    if (ret != GEARMAN_SUCCESS) {
        fprintf(stderr, "Error: %s\n", gearman_client_error(client));
        gearman_client_free(client);
        return EXIT_FAILURE;
    }
    printf("Client: Job submitted with handle: %s\n", job_handle);
    // 注意:对于异步任务,我们通常不在这里等待结果。
    // 结果可能通过其他方式(如另一个 Worker 或回调)处理。
    // 或者,我们可以轮询任务状态。
    // 轮询任务状态直到完成
    while (1) {
        gearman_job_state_t state = gearman_client_job_status(client, job_handle, NULL, NULL);
        if (state == GEARMAN_JOB_STATE_UNKNOWN) {
            // 任务可能已经完成并被服务器移除
            break;
        } else if (state == GEARMAN_JOB_STATE_FAILED) {
            printf("Client: Job failed.\n");
            break;
        } else if (state == GEARMAN_JOB_STATE_COMPLETE) {
            printf("Client: Job is complete.\n");
            break;
        }
        printf("Client: Job status: %d. Waiting...\n", state);
        sleep(1);
    }
    gearman_client_free(client);
    return EXIT_SUCCESS;
}

回调式异步处理

对于更复杂的异步场景,可以使用回调函数来处理任务完成事件,这避免了手动轮询。

// 回调函数原型
typedef gearman_return_t (*gearman_worker_fn)(gearman_job_st *, void *, size_t *, size_t *);
// 客户端回调函数
gearman_return_t complete_callback(gearman_task_st *task, void *context, size_t result_size, const void *result)
{
    (void)context;
    if (gearman_task_success(task)) {
        printf("Client (Callback): Job successful. Result: %.*s\n", (int)result_size, (const char *)result);
    } else {
        printf("Client (Callback): Job failed. Error: %s\n", gearman_task_error(task));
    }
    return GEARMAN_SUCCESS;
}
int main_callback(void)
{
    // ... (client and server setup) ...
    gearman_task_st *task = gearman_client_do_task(client, "reverse", NULL, workload, strlen(workload), complete_callback, NULL, &ret);
    if (!task || ret != GEARMAN_SUCCESS) { /* ... error ... */ }
    // 主循环,等待回调被触发
    while (gearman_client_run_tasks(client) == GEARMAN_IO_WAIT) {
        // 处理 I/O 事件
    }
    // ... (cleanup) ...
}
  • Worker:使用 gearman_worker_st,通过 gearman_worker_add_function 注册任务处理函数,然后在循环中调用 gearman_worker_work
  • Client:使用 gearman_client_st,通过 gearman_client_do (同步) 或 gearman_client_do_background (异步) 提交任务。
  • 编译:始终链接 -lgearman 库。
  • 调试gearman_client_error()gearman_worker_error() 是你最好的朋友,可以帮你定位连接或任务执行中的问题。

通过 C 语言使用 Gearman 可以构建高性能、可扩展的后台处理系统,非常适合 C/C++ 项目。

-- 展开阅读全文 --
头像
dede主页标题如何正确设置?
« 上一篇 昨天
C语言hex转str,如何实现?
下一篇 » 昨天

相关文章

取消
微信二维码
支付宝二维码

目录[+]