1

当我向消费者查询分配的主题分区列表时,结果中的所有分区的偏移量为 -1001。如果我打印出接收到的消息的偏移量,则偏移量设置为正确的值。

这是我用来消费消息的代码:

static void print_partition_list(FILE* fp,
    const rd_kafka_topic_partition_list_t
    * partitions) {
    int i;
    for (i = 0; i < partitions->cnt; i++) {
        fprintf(fp, "%s %s [%d] offset %lld",
            i > 0 ? "," : "",
            partitions->elems[i].topic,
            partitions->elems[i].partition,
            partitions->elems[i].offset);
    }
    fprintf(fp, "\n");

}

static void rebalance_cb(rd_kafka_t* rk, rd_kafka_resp_err_t err, rd_kafka_topic_partition_list_t* partitions, void* opaque) {
    fprintf(stderr, "%% Consumer group rebalanced: ");
    switch (err) {
    case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
        fprintf(stderr, "assigned:\n");
        print_partition_list(stderr, partitions);
        rd_kafka_assign(rk, partitions);
        break;
    case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
        fprintf(stderr, "revoked:\n");
        print_partition_list(stderr, partitions);
        rd_kafka_assign(rk, NULL);
        break;
    default:
        fprintf(stderr, "failed: %s\n", rd_kafka_err2str(err));
        rd_kafka_assign(rk, NULL);
        break;
    }
}

int main()
{

    rd_kafka_t* rk;
    rd_kafka_conf_t* conf;
    rd_kafka_resp_err_t err;

    char errstr[512];
    const char* brokers{ "localhost:9092" };
    const char* groupid{ "OffsetTest" };
    const char* topics[] = { "OffsetTesting" };

    rd_kafka_topic_partition_list_t* subscription;

    conf = rd_kafka_conf_new();

    if (rd_kafka_conf_set(conf, "bootstrap.servers", brokers,
        errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
        fprintf(stderr, "%s\n", errstr);
        rd_kafka_conf_destroy(conf);
        return 1;
    }

    if (rd_kafka_conf_set(conf, "group.id", groupid,
        errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
        fprintf(stderr, "%s\n", errstr);
        rd_kafka_conf_destroy(conf);
        return 1;
    }

    if (rd_kafka_conf_set(conf, "enable.auto.commit", "false",
        errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
        fprintf(stderr, "%s\n", errstr);
        rd_kafka_conf_destroy(conf);
        return 1;
    }

    if (rd_kafka_conf_set(conf, "auto.offset.reset", "earliest",
        errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
        fprintf(stderr, "%s\n", errstr);
        rd_kafka_conf_destroy(conf);
        return 1;
    }

    if (rd_kafka_conf_set(conf, "offset.store.method", "broker",
        errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
        fprintf(stderr, "%s\n", errstr);
        rd_kafka_conf_destroy(conf);
        return 1;
    }

    rd_kafka_conf_set_rebalance_cb(conf, rebalance_cb);

    rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr));
    if (!rk) {
        fprintf(stderr, "%% Failed to create new consumer: %s\n", errstr);
        return 1;
    }


    conf = NULL;

    rd_kafka_poll_set_consumer(rk);

    subscription = rd_kafka_topic_partition_list_new(1);
    rd_kafka_topic_partition_list_add(subscription, topics[0], RD_KAFKA_PARTITION_UA);

    err = rd_kafka_subscribe(rk, subscription);
    if (err) {
        fprintf(stderr,
            "%% Failed to subscribe to %d topics: %s\n",
            subscription->cnt, rd_kafka_err2str(err));
        rd_kafka_topic_partition_list_destroy(subscription);
        rd_kafka_destroy(rk);
        return 1;
    }

    fprintf(stderr,
        "%% Subscribed to %d topic(s), "
        "waiting for rebalance and messages...\n",
        subscription->cnt);

    rd_kafka_topic_partition_list_destroy(subscription);

    int runningCounter = 0;

    while (runningCounter != 10) {
        rd_kafka_message_t* rkm;

        rkm = rd_kafka_consumer_poll(rk, 100);
        if (!rkm) {
            Sleep(2000);
            runningCounter++;
            continue;
        }
        if (rkm->err) {
            fprintf(stderr,
                "%% Consumer error: %s\n",
                rd_kafka_message_errstr(rkm));
            rd_kafka_message_destroy(rkm);
            continue;
        }

        rd_kafka_topic_partition_list_t* list;
        err = rd_kafka_assignment(rk, &list);

        if (err) {
            fprintf(stderr,
                "%% Failed to subscribe to %d topics: %s\n",
                subscription->cnt, rd_kafka_err2str(err));
            rd_kafka_topic_partition_list_destroy(subscription);
            return 1;
        }

        print_partition_list(stderr, list);

        rd_kafka_topic_partition_list_destroy(list);

        printf("Message on %s [%d] at offset %lld:\n",
            rd_kafka_topic_name(rkm->rkt), rkm->partition,
            rkm->offset);

        if (rkm->key)
            printf(" Key: %.*s\n",
            (int)rkm->key_len, (const char*)rkm->key);
        else if (rkm->key)
            printf(" Key: (%d bytes)\n", (int)rkm->key_len);

        if (rkm->payload)
            printf(" Value: %.*s\n",
            (int)rkm->len, (const char*)rkm->payload);
        else if (rkm->key)
            printf(" Value: (%d bytes)\n", (int)rkm->len);

        rd_kafka_commit_message(rk, rkm, 0);

        rd_kafka_message_destroy(rkm);

        runningCounter++;
    }

    fprintf(stderr, "%% Closing consumer\n");
    rd_kafka_consumer_close(rk);

    rd_kafka_destroy(rk);

    return 0;

}

我知道这里有一个类似问题的答案LibRdKafka: commited_offset always at -1001 但这无济于事。我将主题分区列表分配给rebalance_cb.

更新:

这是例如 2 条消息的输出:

> %4|1580198390.566|CONFWARN|rdkafka#consumer-1| [thrd:app]: Configuration property offset.store.method is deprecated: Offset commit store method: 'file' - DEPRECATED: local file store (offset.store.path, et.al), 'broker' - broker commit store (requires Apache Kafka 0.8.2 or later on the broker).

> % Subscribed to 1 topic(s), waiting for rebalance and messages...

> % Consumer group rebalanced: assigned:
> NewTestingTopic [0] offset -1001, NewTestingTopic [1] offset -1001, 
> NewTestingTopic [2] offset -1001, NewTestingTopic [3] offset -1001
> NewTestingTopic [0] offset -1001, NewTestingTopic [1] offset -1001, 
> NewTestingTopic [2] offset -1001, NewTestingTopic [3] offset -1001
> 
> Message on NewTestingTopic [0] at offset 25:
> Key: 0
> Value: ExampleMessage 0
> 
> NewTestingTopic [0] offset -1001, NewTestingTopic [1] offset -1001, 
> NewTestingTopic [2] offset -1001, NewTestingTopic [3] offset -1001
> 
> Message on NewTestingTopic [3] at offset 41:
> Key: 1
> Value: ExampleMessage 1
4

1 回答 1

0

我相信这可能是设计使然。

rd_kafka_assignment()方法返回通过提供的分配rd_kafka_assign()。当一个Consumer被分配一个组内的分区时,分配的只是一个分区列表,没有偏移。

类似地,在 Java 库中,assignment()return Set<TopicPartition>,这里也没有偏移量。在 librdkafka 中,rd_kafka_assignment()给出一个rd_kafka_topic_partition_list_t,它类似于Set<TopicPartition>. 主要区别在于它重用了rd_kafka_topic_partition_t具有一些额外字段的类型,例如offset.

rd_kafka_topic_partition_t类型在许多地方使用,并且它的所有字段在所有上下文中都没有意义。这是分配上下文的情况,因此某些字段被设置为“空白”值,这是-1001偏移量。

如果要获取分配的当前偏移量,则需要使用rd_kafka_position(). 同样,在 Java 中,您将使用position().

于 2020-02-01T12:43:01.753 回答