1

我需要在生产者中将输入作为结构而不是字符串,并使用 librdkafka 库在消费者中打印。我对生产者代码进行了更改,甚至能够获取结构,但是在消费者中,如果需要一些帮助,它不会显示 plz 帮助更多变化

这是代码生产者代码:

struct date
{
int d, m, y;
};


int main(int argc, char** argv) {
rd_kafka_t* rk;         /* Producer instance handle */
rd_kafka_conf_t* conf;  /* Temporary configuration object */
char errstr[512];       /* librdkafka API error reporting buffer */
//char buf[512];          /* Message value temporary buffer */
const char* brokers;    /* Argument: broker list */
const char* topic;      /* Argument: topic to produce to */
struct date  d1;
struct date* ptr ;
ptr = &d1;
int i;



brokers ="localhost:9092";
topic = "test";




/*
 * Create Kafka client configuration place-holder
 */
conf = rd_kafka_conf_new();

/* Set bootstrap broker(s) as a comma-separated list of
 * host or host:port (default port 9092).
 * librdkafka will use the bootstrap brokers to acquire the full
 * set of brokers from the cluster. */
if (rd_kafka_conf_set(conf, "bootstrap.servers", brokers,
    errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
    fprintf(stderr, "%s\n", errstr);
    return 1;
}

/* Set the delivery report callback.
 * This callback will be called once per message to inform
 * the application if delivery succeeded or failed.
 * See dr_msg_cb() above.
 * The callback is only triggered from rd_kafka_poll() and
 * rd_kafka_flush(). */
rd_kafka_conf_set_dr_msg_cb(conf, dr_msg_cb);

/*
 * Create producer instance.
 *
 * NOTE: rd_kafka_new() takes ownership of the conf object
 *       and the application must not reference it again after
 *       this call.
 */
rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));
if (!rk) {
    fprintf(stderr,
        "%% Failed to create new producer: %s\n", errstr);
    return 1;
}

/* Signal handler for clean shutdown */
signal(SIGINT, stop);

fprintf(stderr, "%% enter \n");
/*fprintf(stderr,
    "%% Type some text and hit enter to produce message\n"
    "%% Or just hit enter to only serve delivery reports\n"
    "%% Press Ctrl-C or Ctrl-D to exit\n");*/



while (run && fscanf_s(stdin, "%d/%d/%d", &d1.d, &d1.m,&d1.y))
{ 
    
    
    size_t len = sizeof(d1);
    rd_kafka_resp_err_t err;
    fprintf(stdout, "%d/%d/%d \n", d1.d, d1.m, d1.y);
    

    //if ([len - 1] == '\n') /* Remove newline */
       // d1[--len] = '\0';
    
    
    if (len == 0) {
        /* Empty line: only serve delivery reports */
        rd_kafka_poll(rk, 0/*non-blocking */);
        continue;
    }

retry:
    err = rd_kafka_producev(
        /* Producer handle */
        rk,
        /* Topic name */
        RD_KAFKA_V_TOPIC(topic),
        /* Make a copy of the payload. */
        RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
        /* Message value and length */
        RD_KAFKA_V_VALUE(ptr, len),
        /* Per-Message opaque, provided in
         * delivery report callback as
         * msg_opaque. */
        RD_KAFKA_V_OPAQUE(NULL),
        /* End sentinel */
        RD_KAFKA_V_END);

    if (err) {
        /*
         * Failed to *enqueue* message for producing.
         */
        fprintf(stderr,
            "%% Failed to produce to topic %s: %s\n",
            topic, rd_kafka_err2str(err));

        if (err == RD_KAFKA_RESP_ERR__QUEUE_FULL) {
            /* If the internal queue is full, wait for
             * messages to be delivered and then retry.
             * The internal queue represents both
             * messages to be sent and messages that have
             * been sent or failed, awaiting their
             * delivery report callback to be called.
             *
             * The internal queue is limited by the
             * configuration property
             * queue.buffering.max.messages */
            rd_kafka_poll(rk, 1000/*block for max 1000ms*/);
            goto retry;
        }
    }
    else {
        fprintf(stderr, "%% Enqueued message (%zd bytes) "
            "for topic %s\n",
            len, topic);
    }


    /* A producer application should continually serve
     * the delivery report queue by calling rd_kafka_poll()
     * at frequent intervals.
     * Either put the poll call in your main loop, or in a
     * dedicated thread, or call it after every
     * rd_kafka_produce() call.
     * Just make sure that rd_kafka_poll() is still called
     * during periods where you are not producing any messages
     * to make sure previously produced messages have their
     * delivery report callback served (and any other callbacks
     * you register). */
    rd_kafka_poll(rk, 0/*non-blocking*/);
}




/* Wait for final messages to be delivered or fail.
 * rd_kafka_flush() is an abstraction over rd_kafka_poll() which
 * waits for all messages to be delivered. */
fprintf(stderr, "%% Flushing final messages..\n");
rd_kafka_flush(rk, 10 * 1000 /* wait for max 10 seconds */);

/* If the output queue is still not empty there is an issue
 * with producing messages to the clusters. */
if (rd_kafka_outq_len(rk) > 0)
    fprintf(stderr, "%% %d message(s) were not delivered\n",
        rd_kafka_outq_len(rk));

/* Destroy the producer instance */
rd_kafka_destroy(rk);

return 0;
}

消费者代码:

#include <stdio.h>
#include <signal.h>
#include <string.h>
#include <ctype.h>


/* Typical include path would be <librdkafka/rdkafka.h>, but this program
* is builtin from within the librdkafka source tree and thus differs. */
//#include <librdkafka/rdkafka.h>
#include "rdkafka.h"


static volatile sig_atomic_t run = 1;

/**
* @brief Signal termination of program
*/
static void stop(int sig) {
run = 0;
}

struct date
{
int d, m, y;
};



/**
* @returns 1 if all bytes are printable, else 0.
*/
static int is_printable(struct date* ptr, size_t len)
{
size_t i;

for (i = 0; i < len; i++)
    if (!isdigit(d1.d && d1.m && d1.y))
        return 0;

return 1;
}




int main(int argc, char **argv) {
rd_kafka_t* rk;          /* Consumer instance handle */
rd_kafka_conf_t* conf;   /* Temporary configuration object */
rd_kafka_resp_err_t err; /* librdkafka API error code */
char errstr[512];        /* librdkafka API error reporting buffer */
const char* brokers;     /* Argument: broker list */
const char* groupid;     /* Argument: Consumer group id */
char** topics;           /* Argument: list of topics to subscribe to */
int topic_cnt;           /* Number of topics to subscribe to */
rd_kafka_topic_partition_list_t* subscription; /* Subscribed topics */
int i;
struct date  d1;
struct date* ptr;
ptr = &d1;


/*
 * Argument validation
 */
 /*if (argc < 4) {
     fprintf(stderr,
         "%% Usage: "
         "%s <broker> <group.id> <topic1> <topic2>..\n",
         argv[0]);
     return 1;
 }*/

brokers = "localhost:9092";
groupid = "test-consumer-group"; 
char* topic[] = {"test"};
topics = &topic;    
topic_cnt = 1;


/*
 * Create Kafka client configuration place-holder
 */
conf = rd_kafka_conf_new();

/* Set bootstrap broker(s) as a comma-separated list of
 * host or host:port (default port 9092).
 * librdkafka will use the bootstrap brokers to acquire the full
 * set of brokers from the cluster. */
if (rd_kafka_conf_set(conf, "bootstrap.servers", brokers,
    errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
    fprintf(stderr, "%s\n", errstr);
    rd_kafka_conf_destroy(conf);
    return 1;
}

/* Set the consumer group id.
 * All consumers sharing the same group id will join the same
 * group, and the subscribed topic' partitions will be assigned
 * according to the partition.assignment.strategy
 * (consumer config property) to the consumers in the group. */
if (rd_kafka_conf_set(conf, "group.id", groupid,
    errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
    fprintf(stderr, "%s\n", errstr);
    rd_kafka_conf_destroy(conf);
    return 1;
}

/* If there is no previously committed offset for a partition
 * the auto.offset.reset strategy will be used to decide where
 * in the partition to start fetching messages.
 * By setting this to earliest the consumer will read all messages
 * in the partition if there was no previously committed offset. */
if (rd_kafka_conf_set(conf, "auto.offset.reset", "earliest",
    errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
    fprintf(stderr, "%s\n", errstr);
    rd_kafka_conf_destroy(conf);
    return 1;
}

/*
 * Create consumer instance.
 *
 * NOTE: rd_kafka_new() takes ownership of the conf object
 *       and the application must not reference it again after
 *       this call.
 */
rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr));
if (!rk) {
    fprintf(stderr,
        "%% Failed to create new consumer: %s\n", errstr);
    return 1;
}

conf = NULL; /* Configuration object is now owned, and freed,
              * by the rd_kafka_t instance. */


              /* Redirect all messages from per-partition queues to
               * the main queue so that messages can be consumed with one
               * call from all assigned partitions.
               *
               * The alternative is to poll the main queue (for events)
               * and each partition queue separately, which requires setting
               * up a rebalance callback and keeping track of the assignment:
               * but that is more complex and typically not recommended. */
rd_kafka_poll_set_consumer(rk);


/* Convert the list of topics to a format suitable for librdkafka */
subscription = rd_kafka_topic_partition_list_new(topic_cnt);
for (i = 0; i < topic_cnt; i++)
    rd_kafka_topic_partition_list_add(subscription,
        topics[i],
        /* the partition is ignored
         * by subscribe() */
        RD_KAFKA_PARTITION_UA);

/* Subscribe to the list of topics */
err = rd_kafka_subscribe(rk, subscription);
if (err) {
    fprintf(stderr,
        "%% Failed to subscribe to %d topics: %s\n",
        subscription->cnt, rd_kafka_err2str(err));
    rd_kafka_topic_partition_list_destroy(subscription);
    rd_kafka_destroy(rk);
    return 1;
}

fprintf(stderr,
    "%% Subscribed to %d topic(s), "
    "waiting for rebalance and messages...\n",
    subscription->cnt);


rd_kafka_topic_partition_list_destroy(subscription);


/* Signal handler for clean shutdown */
signal(SIGINT, stop);

/* Subscribing to topics will trigger a group rebalance
 * which may take some time to finish, but there is no need
 * for the application to handle this idle period in a special way
 * since a rebalance may happen at any time.
 * Start polling for messages. */

while (run) {
    rd_kafka_message_t* rkm;
    rkm = rd_kafka_consumer_poll(rk, 100);
    

    if (!rkm)
        continue; /* Timeout: no message within 100ms,
                   *  try again. This short timeout allows
                   *  checking for `run` at frequent intervals.
                   */

                   /* consumer_poll() will return either a proper message
                    * or a consumer error (rkm->err is set). */
    if (rkm->err) {
        /* Consumer errors are generally to be considered
         * informational as the consumer will automatically
         * try to recover from all types of errors. */
        fprintf(stderr,
            "%% Consumer error: %s\n",
            rd_kafka_message_errstr(rkm));
        rd_kafka_message_destroy(rkm);
        continue;
    }

    /* Proper message. */
    printf("Message on %s [%" PRId32 "] at offset %" PRId64 ":\n",
        rd_kafka_topic_name(rkm->rkt), rkm->partition,
        rkm->offset);
    
   

    /* Print the message key. */
    if (rkm->key && is_printable((const char*)rkm->key, rkm->key_len))
        printf(" Key: %.*s\n",
            (int)rkm->key_len, (const char*)rkm->key);
    else if (rkm->key)
        printf(" Key: (%d bytes)\n", (int)rkm->key_len);

    /* Print the message value/payload. */
    if (rkm->payload && is_printable((const char*)rkm->payload, rkm->len))
        printf(" Value:%.*s\n",
            (int)rkm->len, (const char*)rkm->payload); 
    else if (rkm->payload)
        printf(" Value: (%d bytes)\n", (int)rkm->len); 

    rd_kafka_message_destroy(rkm);
}


/* Close the consumer: commit final offsets and leave the group. */
fprintf(stderr, "%% Closing consumer\n");
rd_kafka_consumer_close(rk);


/* Destroy the consumer */
rd_kafka_destroy(rk);

return 0;
}
4

1 回答 1

0

您需要在将数据发送到 Kafka 之前对其进行序列化。推荐使用 Protobuf、JSON 或 Avro。

于 2021-11-04T09:54:38.030 回答