我正在编写使用confluent_kafka api(内部使用librdkafka)来计算kafka吞吐量的python应用程序,但是在这个过程中我观察到,即使我传递了虚拟代理名称,应用程序也不会抛出任何错误,所以我缺少一些东西或者这是api中的错误。还有什么可以更好地处理这个问题?
import confluent_kafka
import json
import argparse
import time
def cmd_line_parse():
parser = argparse.ArgumentParser("Arg Parser")
parser.add_argument('-j','--json_config',help='command line argument for kafka configuration',required=True)
parser.add_argument('-t','--topic_name',help='kafka topic name',required=True)
parser.add_argument('-c','--msg_count',help='messages that you want to send to kafka topic for calculating throughput',default=500000,required=False)
parser.add_argument('-s','--msg_size',help='message size(in bytes) against which you want to test',default=1024,required=False)
args = parser.parse_args()
return args
def print_throughput_stats(time_taken,msg_count,msg_size):
print("Processed {0} messsages in {1:.2f} seconds".format(msg_count, time_taken))
print("Throughput: {0:.2f} MB/s".format((msg_size*msg_count)/time_taken/(1024*1024)))
def main():
args = cmd_line_parse()
kafka_config_dict = json.loads(args.json_config)
kafka_topic_name = args.topic_name
producer = confluent_kafka.Producer(**kafka_config_dict)
msg_payload = 'K'*args.msg_size
start_time = time.time()
for i in xrange(args.msg_count):
producer.produce(kafka_topic_name,value=msg_payload)
producer.poll(0)
producer.flush()
end_time = time.time()
print_throughput_stats(end_time-start_time,args.msg_count,args.msg_size)
if __name__ == "__main__":
try:
main()
except Exception as e:
print("Exception Occured: {}".format(str(e)))
当我通过传递不正确的代理来测试此应用程序时,没有错误/异常:
python check_throughput.py -t perf_test -j '{"bootstrap.servers":"dummy.com"}'