我们一直在测试kafka(基于云)实现一个多生产者(大约27000台linux机器)、一个消费者(spring kafka listener)和一个10个partition的topic的数据传输系统,问题是当9500个生产者同时传输所有节点的 CPU 消耗高达 100%,集群出现故障并停止响应。这个 kafka 是为这种类型的架构设计的,或者你应该寻找其他选择。
这是我的设置:
Kafka 集群:每个节点 4 个基于云 karafka 的节点(4GB ram + 900GB 磁盘)
生产者:kafka-clients-1.1.1.jar + JDK 1.7
Properties config = new Properties();
config.put("bootstrap.servers", "***");
config.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
config.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
config.put("security.protocol", "SASL_SSL");
config.put("sasl.mechanism", "SCRAM-SHA-256");
config.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"username\" password=\"password\";");
config.put("delivery.timeout.ms", 0);
config.put("transactional.id", UUID.randomUUID().toString());
config.put("enable.idempotence", true);
config.put("compression.type", "gzip");
try ( KafkaProducer<String, String> producer = new KafkaProducer<>(config)) {
String dataJson = "700 bytes json String";
ProducerRecord<String, String> data = new ProducerRecord<>("test-topic", dataJson);
producer.initTransactions();
try {
log.info("behin " + data);
producer.beginTransaction();
producer.send(data);
producer.commitTransaction();
log.info("commit " + data);
config = null;
producer.close();
} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
log.error("kafka exception"+e);
producer.close();
} catch (KafkaException e) {
log.error("kafka exception"+e);
log.info("ABORT");
producer.abortTransaction();
}
} finally {
log.info("finally");
}
春季启动消费者配置:
spring:
datasource:
type: com.zaxxer.hikari.HikariDataSource
driver-class-name: oracle.jdbc.OracleDriver
url: jdbc:oracle:thin:@x.x.x.251:1521/xe
username: username
password: password
hikari:
maximum-pool-size: 12
pool-name: test-pool
jpa:
database-platform: org.hibernate.dialect.Oracle10gDialect
show-sql: true
hibernate:
ddl-auto: none
properties:
hibernate:
default_schema: hr
type: trace
format_sql: trace
kafka:
bootstrap-servers: ***********
consumer:
group-id: group-id
topic: test-topic
properties:
security:
protocol: SASL_SSL
sasl:
mechanism: SCRAM-SHA-256
jaas:
config: org.apache.kafka.common.security.scram.ScramLoginModule required username="username" password="password";
jass:
enabled: false
key-serializer: org.apache.kafka.common.serialization.StringDeserializer
value-serializer: io.confluent.kafka.serializers.KafkaJsonSerializer
@KafkaListener(topics = "${spring.kafka.consumer.topic}")
public void processMessage(String dataJson, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,
@Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics, @Header(KafkaHeaders.OFFSET) List<Long> offsets) {
try {
log.info("json: "+dataJson);
service.save(dataJson);
} catch (SQLException | JsonProcessingException ex) {
service.trackError(dataJson, ex.getLocalizedMessage());
}
}
任何帮助将不胜感激。谢谢