spring-kafka是spring-boot内置的依赖之一,如果操作kafka的话,还是比较推荐的。更上层的抽象,比如stream或者integration,写的质量都不太好,不推荐使用。
简单应用
直接在配置文件里面配好所有必须项,kv都是string。
在代码里使用@KafkaListener注解method,参数是Consumer<?,?> records,然后自己拿到string再反序列化。
批量消费的话,设置batchListener为true,然后使用List<String>)接受批量参数即可。
生产者需要注意的参数:
1
2
3
4
5
6
7
8
9
10
11
12
| //0,1或者all,为了消息可靠性,一般选all;如果消息丢失影响不大,可以选1提高吞吐量
props.put(ProducerConfig.ACKS_CONFIG, producerAckMode);
//打开批量写入,128k一批,默认16k,提高8倍,所以buffer那边也要提高8倍
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 131072);
//最多延迟xms,注意这里可能导致消息丢失
props.put(ProducerConfig.LINGER_MS_CONFIG, 100);
//缓存大,默认32M,提高8倍到256M
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 256 * 1024 * 1024);
//写入超时,120s
props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 120000);
//重试次数:无上限,为了保证消息不丢
props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
|
消费者需要注意的参数:
1
2
3
4
5
6
| //自动提交最好关了,不过对重复消费要做一些处理
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
//每次批量拉取时避免拉太多,内存
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50);
//避免频繁重平衡
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 120000);
|
为了提高并发消费速度,一般Listener线程数量应该>=topic分区数,可以使用factory.setConcurrency设置每个Listener的默认并发线程数,这个数值可以用@KafkaListener的concurrency参数覆盖。
当然,批量消费时,可以使用线程池并发消费,等全部完成后再统一ack即可。
由于kafka的分区只能增加不能减少,所以不要一下子把分区数开的太多,不然缩容很麻烦。kafka的topic数量太多也对其性能有很大影响。
上面的简单方案中,kv都是string,通过topic来进行业务路由。但是topic本身又不能建的太多,所以更好的办法是在topic中传递多种类型的数据结构。
如果使用json作为序列化方式,很容易想到的是在json最外层加一个type字段,然后使用该字段进行手动路由。实际上Kafka已经内置了header机制,可以将类型放在header里面,在push/poll时自动根据添加/根据type进行路由。
生产者配置:
1
2
3
| props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
props.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, true);
props.put(JsonSerializer.TYPE_MAPPINGS, KafkaUtils.genTypeMappingString(produceTypes));
|
消费者配置:
1
2
3
| props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
props.put(JsonDeserializer.TYPE_MAPPINGS, KafkaUtils.genTypeMappingString(consumeTypes));
|
其中genTypeMappingString用来将header中的类型字符串与实际类的名字关联起来:
1
2
3
4
5
6
7
8
| private String genTypeMappingString(Map<String, Class<?>> map) {
List<String> parts = new ArrayList<>();
for (Map.Entry<String, Class<?>> entry : map.entrySet()) {
parts.add(String.format("%s:%s", entry.getKey(),
entry.getValue().getCanonicalName()));
}
return Joiner.on(",").join(parts);
}
|
将@KafkaListener放在类上面,然后在method上面使用@KafkaHandler,method的参数使用具体的类就行,会自动路由的。
如果害怕路由写错,可以加一个@KafkaHandler(isDefault=true),配置一个默认路由,参数写Object类型就可以。
Json序列化/反序列化的自定义配置
有个问题是,JsonSerializer和JsonDeserializer默认使用的ObjectMapper不太行(不能正确序列化LocalDateTime到字符串),一般需要注入自定义的objectMapper。
这里没有什么特别方便的办法,要么改为使用代码配置:
1
2
| DefaultKafkaProducerFactory<Object, Object> factory = new DefaultKafkaProducerFactory<>(producerConfigs());
factory.setValueSerializer(new JsonSerializer(objectMapper));
|
此时所有直接通过props配置的json相关属性都要改为通过代码配置,而不能只修改这个。
要么自己继承JsonSerializer/JsonDeserializer,在ctor中注入ObjectMapper,此时这需要修改VALUE_DESERIALIZER_CLASS_CONFIG后面对应的类就可以。推荐使用后者,放在一个公共库里一劳永逸。
批量消费注意事项
上文说过,如果打开batchListener,监听参数必须变成List。
但是批量消费和自动路由是冲突的,这还是Java泛型类型擦除的锅,如果在批量场景多个KafkaHandler时,参数都是List<xxx>,由于类型擦除,所以实际上都变成了List参数,Runtime就无法判断如何进行分发。
此时只能退回将@KafkaListener加在method上的使用方式,并进行手动分发:
1
2
3
4
5
6
7
8
9
| @KafkaListener(topics = Constant.TOPIC_KAFKA_XXX)
public void onEvent(List<Object> records, Acknowledgment ack) {
for(Object record: records){
if(record instanceof TypeA){
//...
}
}
ack.acknowledge();
}
|
如果部分topic需要打开batchListener,部分又不需要。可以建立多个返回ConcurrentKafkaListenerContainerFactory<String, Object>的bean,然后再@KafkaListener里面手动指定containerFactory对应的bean名字。
一般情况下,只有吞吐量特别大、且不在意处理顺序的topic才需要打开batchListener,普通的topic只要消费结点数匹配分区数就行。如果消息顺序很重要的话,即使打开批量,也是要逐个处理消息,跟逐条poll没啥区别。
密码
kafka默认是没有密码的,需要在安装的时候指定密码,参考docker compose配置如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
| services:
kafka:
image: apache/kafka:latest
container_name: kafka
hostname: kafka
restart: always
ports:
- "9092:9092"
environment:
# --- KRaft 基本配置 ---
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka:9093
# --- 性能参数 ---
KAFKA_NUM_PARTITIONS: 3
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
KAFKA_DEFAULT_REPLICATION_FACTOR: 1
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
# --- 监听器配置 ---
# CONTROLLER: 集群内部控制器通信 (9093)
# SASL_PLAINTEXT: 外部客户端连接,带认证 (9092)
KAFKA_LISTENERS: CONTROLLER://:9093,SASL_PLAINTEXT://0.0.0.0:9092
KAFKA_ADVERTISED_LISTENERS: SASL_PLAINTEXT://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,SASL_PLAINTEXT:SASL_PLAINTEXT
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_INTER_BROKER_LISTENER_NAME: SASL_PLAINTEXT
# --- 认证配置 (SASL/PLAIN) ---
KAFKA_SASL_ENABLED_MECHANISMS: PLAIN
KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: PLAIN
# --- 关键:指向 JAAS 配置文件 ---
# 通过 JVM 参数告诉 Kafka 使用哪个文件进行认证
KAFKA_OPTS: "-Djava.security.auth.login.config=/etc/kafka/secrets/kafka_server_jaas.conf"
volumes:
# 将本地的认证文件挂载到容器内
- ./kafka_server_jaas.conf:/etc/kafka/secrets/kafka_server_jaas.conf
# 持久化数据(可选)
- kafka_data:/var/lib/kafka/data
volumes:
kafka_data:
|
kafka_server_jaas.conf内容如下:
1
2
3
4
5
6
| KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="123456"
user_admin="123456";
}
|
在java配置中,需要包含以下部分才能连接:
1
2
3
4
5
6
7
8
9
10
| spring:
kafka:
bootstrap-servers: localhost:9092
properties:
security.protocol: SASL_PLAINTEXT
sasl.mechanism: SCRAM-SHA-256
sasl.jaas.config: >-
org.apache.kafka.common.security.scram.ScramLoginModule required
username="admin"
password="123456";
|
设置了密码之后,所有相关工具都需要密码才能连接了,建议使用kafkactl。
其他
- debug模式下,kafka每次拉取都会生成日志,所以kafka模块的日志级别最好调到INFO以上,不然日志量太大;
- kafka的topic删除非常麻烦,即使打开了
delete.topic.enable,有时候也删除不了(原因不明)。此时需要到节点对应的zk中,rmr /brokers/topics/{topicName},并rmr /admin/deleted_topics/{topicName},后者不删的话,以后重建的同名topic还是会被自动标记为待删除。 - 如果配置
auto.create.topics.enable=true,切记配置好自动生成topic的分区和副本数,默认配置不适合集群环境。规模较小的情况下,一般N个节点配置2N个分区+N个副本。