发布于 

Java 操作 Kafka 动态扩展主题分区

引入Maven依赖

一定要注意 spring-kafka 与 kafka-clients 版本的兼容性。

1
2
3
4
5
6
7
8
9
10
11
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.2.6.RELEASE</version>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.1.0</version>
</dependency>

Kafka主题枚举类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package com.baidu.citool.common.kafka.bean;

import java.util.ArrayList;
import java.util.List;

/**
* @author xiaoxuxuy
* @date 2021/6/4 10:57 上午
*/
public enum KafkaTopicEnum {
pipeline_scan("pipeline_scan"),
pipeline_build("pipeline_build");

private String name;

KafkaTopicEnum(String name) {
this.name = name;
}

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

public static List<String> toList() {
List<String> list = new ArrayList<>();
for (KafkaTopicEnum item : KafkaTopicEnum.values()) {
list.add(item.name);
}
return list;
}
}

Kafka配置属性类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import lombok.Data;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

/**
* @author xiaoxuxuy
* @date 2021/6/4 10:39 上午
*/
@Data
@Component
public class KafkaConfigProperties {

/**
* kafka集群servers
*/
@Value("${kafka.bootstrap-servers}")
private String bootstrapServers;

/**
* kafka分区数,分区个数不要超过集群连接个数
*/
@Value("${kafka.topic.numPartitions}")
private int numPartitions;

/**
* kafka副本数
*/
@Value("${kafka.topic.replicationFactor}")
private int replicationFactor;

/**
* kafka重试次数
*/
@Value("${kafka.producer.retries}")
private int retries;

/**
* kafka ack副本机制
*/
@Value("${kafka.producer.acks}")
private String acks;
}

创建单例对象管理生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
import com.baidu.citool.common.kafka.bean.KafkaTopicEnum;
import lombok.extern.slf4j.Slf4j;
import net.sf.json.JSONObject;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.clients.admin.NewPartitions;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

/**
* @author xiaoxuxuy
* @date 2021/6/4 10:55 上午
*/
@Slf4j
@Component
public class KafkaManager {

@Autowired
private KafkaConfigProperties kafkaConfigProperties;

private static KafkaProducer<String, String> producer;

public static AdminClient adminClient;

@PostConstruct
private void initKafkaManger() {
try {
// 创建kafka生产者的配置信息
Properties properties = new Properties();
// 指定连接的kafka集群
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfigProperties.getBootstrapServers());
// key和value序列化类
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// 指定ack副本机制
properties.put(ProducerConfig.ACKS_CONFIG, kafkaConfigProperties.getAcks());
// 如果接受ack超时,重试的次数
properties.put(ProducerConfig.RETRIES_CONFIG, kafkaConfigProperties.getRetries());
// 创建或者更新topics
refreshTopics(properties);
// 创建生产者对象
producer = new KafkaProducer<>(properties);
} catch (Exception e) {
log.error("创建KafkaProducer失败", e);
}
}

/**
* 创建或者更新topics
*
* @param properties
*/
private void refreshTopics(Properties properties) {
// 创建操作客户端
adminClient = KafkaAdminClient.create(properties);
// 检查心主题和分区
adminClient.listTopics().names()
.whenComplete((topics, throwable) -> {
List<String> newTopics = KafkaTopicEnum.toList();
log.info("当前已有主题:{}", topics);
newTopics.removeIf(str -> !topics.isEmpty() && topics.contains(str));
if (!newTopics.isEmpty()) {
log.info("创建新的主题:{}", newTopics);
List<NewTopic> newTopicList = new ArrayList<>();
for (String ntp : newTopics) {
NewTopic newTopic = new NewTopic(ntp, kafkaConfigProperties.getNumPartitions(), (short) kafkaConfigProperties.getReplicationFactor());
newTopicList.add(newTopic);
}
adminClient.createTopics(newTopicList);
}
// 已存在的topic检查分区
adminClient.describeTopics(topics).all().whenComplete((descriptionMap, throwable1) -> {
for (Map.Entry<String, TopicDescription> entry : descriptionMap.entrySet()) {
List<TopicPartitionInfo> tps = entry.getValue().partitions();
log.info("主题:{},描述:{}", entry.getKey(), tps);
log.info("当前主题分区数:{},即将修改的分区数:{}", tps.size(), kafkaConfigProperties.getNumPartitions());
List<String> allTopics = KafkaTopicEnum.toList();
log.info("所有主题:{}", allTopics);
if (tps.size() < kafkaConfigProperties.getNumPartitions() && allTopics.contains(entry.getKey())) {
log.info("主题扩展分区:{}", entry.getKey());
// 注意创建方式
NewPartitions newPartitions = NewPartitions.increaseTo(kafkaConfigProperties.getNumPartitions());
Map<String, NewPartitions> partitionsMap = new HashMap<>();
partitionsMap.put(entry.getKey(), newPartitions);
adminClient.createPartitions(partitionsMap);
}
}
});
});
}

/**
* 传入kafka约定的topicName,json格式字符串,发送给kafka集群
*
* @param topicName
* @param jsonMessage
*/
public static void sendMessage(String topicName, String jsonMessage) {
producer.send(new ProducerRecord<>(topicName, jsonMessage));
}

/**
* 传入kafka约定的topicName,json格式字符串数组,发送给kafka集群
* 用于批量发送消息,性能较高。
*
* @param topicName
* @param jsonMessages
* @throws InterruptedException
*/
public static void sendMessage(String topicName, String... jsonMessages) throws InterruptedException {
for (String jsonMessage : jsonMessages) {
producer.send(new ProducerRecord<>(topicName, jsonMessage));
}
}

/**
* 传入kafka约定的topicName,Map集合,内部转为json发送给kafka集群
* 用于批量发送消息,性能较高。
*
* @param topicName
* @param mapMessageToJSONForArray
*/
public static void sendMessage(String topicName, List<Map<Object, Object>> mapMessageToJSONForArray) {
for (Map<Object, Object> mapMessageToJSON : mapMessageToJSONForArray) {
String array = JSONObject.fromObject(mapMessageToJSON).toString();
producer.send(new ProducerRecord<>(topicName, array));
}
}

/**
* 传入kafka约定的topicName,Map,内部转为json发送给kafka集群
*
* @param topicName
* @param mapMessageToJSON
*/
public static void sendMessage(String topicName, Map<Object, Object> mapMessageToJSON) {
String array = JSONObject.fromObject(mapMessageToJSON).toString();
producer.send(new ProducerRecord<>(topicName, array));
}
}