Commit 7cc9b3ed by 仲光辉

feat: 添加基本文档,已经 kafka-clients 原生API 的测试用例编写

parent 6229230e
...@@ -24,6 +24,11 @@ ...@@ -24,6 +24,11 @@
hs_err_pid* hs_err_pid*
.gradle .gradle
build build
**/build
**/**/build
.idea .idea
*.iml *.iml
...@@ -24,80 +24,75 @@ buildscript { ...@@ -24,80 +24,75 @@ buildscript {
} }
} }
description 'dankal-test-kafka'
version '1.0.0.RELEASE'
allprojects {
apply plugin: 'java'
apply plugin: 'idea'
apply plugin: 'io.spring.dependency-management'
apply plugin: 'org.springframework.boot'
apply plugin: 'java' // jdk 相关配置
apply plugin: 'idea' sourceCompatibility = '1.8'
apply plugin: 'io.spring.dependency-management' targetCompatibility = '1.8'
apply plugin: 'org.springframework.boot'
// jdk 相关配置
sourceCompatibility = '1.8'
targetCompatibility = '1.8'
group 'cn.dankal.test' group 'cn.dankal.test'
version '1.0.0.RELEASE'
// 类似于 maven Properties // 类似于 maven Properties
ext { ext {
set('springCloudAlibabaVersion', "2.1.3.RELEASE") set('springCloudAlibabaVersion', "2.1.3.RELEASE")
set('springCloudVersion', "Greenwich.SR5") set('springCloudVersion', "Greenwich.SR5")
set('hutoolCoreVersion', "5.4.6") set('hutoolCoreVersion', "5.4.6")
set('kafkaClientVersion', '2.7.0') set('kafkaClientVersion', '2.7.0')
}
// 依赖仓库配置
repositories {
// GRADLE_USER_HOME : maven 本地仓库地址
mavenLocal()
maven {
url 'https://maven.aliyun.com/repository/public/'
}
maven {
url 'https://maven.aliyun.com/repository/spring/'
}
maven {
url 'https://maven.aliyun.com/repository/gradle-plugin'
} }
maven { // 依赖仓库配置
url 'https://maven.aliyun.com/repository/spring-plugin' repositories {
// GRADLE_USER_HOME : maven 本地仓库地址
mavenLocal()
maven {
url 'https://maven.aliyun.com/repository/public/'
}
maven {
url 'https://maven.aliyun.com/repository/spring/'
}
maven {
url 'https://maven.aliyun.com/repository/gradle-plugin'
}
maven {
url 'https://maven.aliyun.com/repository/spring-plugin'
}
mavenCentral()
} }
mavenCentral()
}
dependencies { dependencies {
// spring dependence // third party dependence
implementation 'org.springframework.boot:spring-boot-starter-web' implementation "cn.hutool:hutool-core:${hutoolCoreVersion}"
implementation 'org.springframework.kafka:spring-kafka' compile 'org.projectlombok:lombok'
testImplementation('org.springframework.boot:spring-boot-starter-test') { annotationProcessor 'org.projectlombok:lombok'
exclude group: 'org.junit.vintage', module: 'junit-vintage-engine' testCompileOnly 'org.projectlombok:lombok'
} }
// third party dependence tasks.withType(JavaCompile) {
implementation "cn.hutool:hutool-core:${hutoolCoreVersion}" options.encoding = "UTF-8"
compile 'org.projectlombok:lombok' }
annotationProcessor 'org.projectlombok:lombok'
testCompileOnly 'org.projectlombok:lombok'
}
tasks.withType(JavaCompile) {
options.encoding = "UTF-8"
}
// 类似于 maven dependencyManagement // 类似于 maven dependencyManagement
dependencyManagement { dependencyManagement {
imports { imports {
// spring-cloud // spring-cloud
mavenBom "org.springframework.cloud:spring-cloud-dependencies:${springCloudVersion}" mavenBom "org.springframework.cloud:spring-cloud-dependencies:${springCloudVersion}"
// spring-cloud-alibaba // spring-cloud-alibaba
mavenBom "com.alibaba.cloud:spring-cloud-alibaba-dependencies:${springCloudAlibabaVersion}" mavenBom "com.alibaba.cloud:spring-cloud-alibaba-dependencies:${springCloudAlibabaVersion}"
}
} }
}
// 单元测试配置 // 单元测试配置
test { test {
useJUnitPlatform() useJUnitPlatform()
}
} }
// package 配置 // package 配置
bootJar.enabled = false bootJar.enabled = true
jar.enabled = true jar.enabled = true
description 'dankal-test-kafka-protogenesis'
version '1.0.0.RELEASE'
dependencies {
implementation "org.apache.kafka:kafka-clients:${kafkaClientVersion}"
testImplementation 'org.junit.jupiter:junit-jupiter-api'
}
bootJar.enabled = false
jar.enabled = true
\ No newline at end of file
package cn.dankal.test.common;
import org.apache.kafka.common.TopicPartition;
import java.time.format.DateTimeFormatter;
import java.util.concurrent.ConcurrentHashMap;
/**
* @author ZGH.MercyModest
* @version V1.0.0
* @create 2021-01-25
*/
public interface Const {
/**
* format pattern: yyyy-MM-dd HH:mm:ss.SSS
*/
DateTimeFormatter FULL_DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
/**
* TODO 由于只是测试 故就不做执行保存数据操作,此处使用 本地 Map 存储分区与对应偏移量信息
* <pre>
* key: TopicPartition 分区信息
* value: Long 分区偏移量信
* </pre>
*/
ConcurrentHashMap<TopicPartition, Long> PARTITION_OFFSET_INFO_MAP = new ConcurrentHashMap<>();
/**
* Kafka const
*/
interface Kafka {
/**
* kafka topic
*/
interface Topic {
/**
* kafka topic: dankal-test-hello-kafka
*/
String HELLO_TOPIC = "dankal-test-hello-kafka";
}
/**
* kafka consumer group id
*/
interface ConsumerGroupId {
/**
* hello-consumer-group-id
*/
String HELLO_CONSUMER_GROUP_ID = "hello-consumer-group-id";
/**
* test-group-a
*/
String TEST_GROUP_A = "test-group-a";
/**
* test-group-b
*/
String TEST_GROUP_B = "test-group-b";
/**
* test-group-c
*/
String TEST_GROUP_C = "test-group-c";
/**
* test-kafka-rebalance
*/
String TEST_KAFKA_REBALANCE = "test-kafka-rebalance";
}
}
}
package cn.dankal.test.factory;
import cn.hutool.core.util.StrUtil;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* ThreadExecutorFactory
* @author ZGH.MercyModest
* @version V1.0.0
* @create 2020-12-29
*/
public class ThreadExecutorFactory {
/**
* 获取 ThreadPoolExecutor
*
* @param corePoolSize 核心线程数
* @param maximumPoolSize 最大线程数
* @param keepAliveTime 超过核心线程数的线程存活时间
* @param timeUnit 超过核心线程数的线程存活时间单位
* @param threadNamePrefix 创建线程的名称前缀
* @param blockQueueCapacity 线程池阻塞队列的容量
* @return ThreadPoolExecutor
* @see ThreadPoolExecutor
*/
public static ThreadPoolExecutor getThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit timeUnit,
String threadNamePrefix,
int blockQueueCapacity) {
// 参数校验
if (corePoolSize <= 0) {
throw new IllegalArgumentException("corePoolSize 必须大于0");
}
if (maximumPoolSize <= 0) {
throw new IllegalArgumentException("maximumPoolSize 必须大于0");
}
if (keepAliveTime < 0) {
throw new IllegalArgumentException("keepAliveTime 必须大于0");
}
if (null == timeUnit) {
throw new IllegalArgumentException("timeUnit 不能为空");
}
if (corePoolSize > maximumPoolSize) {
throw new IllegalArgumentException("corePoolSize 必须小于等于 maximumPoolSize");
}
if (StrUtil.isBlank(threadNamePrefix)) {
throw new IllegalArgumentException("threadNamePrefix 不能为空");
}
if (blockQueueCapacity <= 0) {
throw new IllegalArgumentException("blockQueueCapacity 必须大于0");
}
return new ThreadPoolExecutor(
corePoolSize,
maximumPoolSize,
keepAliveTime,
timeUnit,
new ArrayBlockingQueue<>(blockQueueCapacity),
threadFactory(threadNamePrefix)
);
}
/**
* 初始化 ThreadFactory 并为 创建的 线程 命名
*
* @param namePrefix 线程名称前缀
* @return ThreadFactory
*/
private static ThreadFactory threadFactory(String namePrefix) {
return (runnable -> {
Thread thread = new Thread(runnable);
long id = thread.getId();
thread.setName(namePrefix + "-" + id);
return thread;
});
}
/**
* 获取一个 <code>ThreadPoolExecutor</code>
* <p>
* coreSize: 6
* <br/>
* maximumPoolSize: 10
* <br/>
* KeepAliveTime: 10
* <br/>
* unit: TimeUnit.MILLISECONDS
* <br/>
* blockQueueCapacity: 100
* <br/>
* </p>
*
* @param threadNamePrefix <code>threadNamePrefix</code>
* @return ThreadPoolExecutor
*/
public static ThreadPoolExecutor getThreadPoolExecutor(final String threadNamePrefix) {
return ThreadExecutorFactory.getThreadPoolExecutor(6,
10,
10,
TimeUnit.MILLISECONDS,
threadNamePrefix,
100);
}
}
package cn.dankal.test.concurrent;
import cn.dankal.test.common.Const;
import cn.dankal.test.factory.ThreadExecutorFactory;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ThreadPoolExecutor;
/**
* Kafka 测试: 多线程环境的 Kafka Producer
*
* @author ZGH.MercyModest
* @version V1.0.0
* @create 2021-01-19
*/
public class KafkaProducerOnConcurrentTest {
/**
* ThreadPoolExecutor
*/
private static ThreadPoolExecutor threadPoolExecutor;
/**
* KafkaProducer
*/
private static KafkaProducer<String, String> kafkaProducer;
@BeforeAll
public static void before() {
threadPoolExecutor = ThreadExecutorFactory.getThreadPoolExecutor("dankal-test-kafka");
Properties properties = new Properties();
final String bootstrapServers = "192.168.128.3:9092,192.168.128.3:9093,192.168.128.3:9094";
// broker(Kafka服务器) 地址清单
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
// message key 序列化器
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getCanonicalName());
// message value 序列化器
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getCanonicalName());
// 通过 Properties 配置 实例化 KafkaProducer
kafkaProducer = new KafkaProducer<>(properties);
}
/**
* 测试多线程环境下的 Kafka Producer
*/
@Test
public void test() throws InterruptedException {
final int threadCount = 5;
// 保证所有线程全部完成消息发送
final CountDownLatch countDownLatch = new CountDownLatch(threadCount);
// 保证 所有线程在同一起跑线
final CyclicBarrier cyclicBarrier = new CyclicBarrier(threadCount);
for (int i = 0; i < threadCount; i++) {
final int index = i + 1;
Runnable kafkaProducerRunnable = () -> {
final String key = "dankal-test-" + index;
final String message = "dankal-test-kafka-producer-on-concurrent";
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(
Const.Kafka.Topic.HELLO_TOPIC,
key,
message
);
try {
cyclicBarrier.await();
} catch (Exception e) {
// ignored
}
kafkaProducer.send(
producerRecord,
(recordMetadata, exception) -> {
countDownLatch.countDown();
if (null != exception) {
// 发送异常
exception.printStackTrace();
return;
}
// 发送成功
System.out.printf(
"完成消息发送 %d ==> 主题: %s 分区: %s 偏移量: %d%n",
recordMetadata.timestamp(),
recordMetadata.topic(),
recordMetadata.partition(),
recordMetadata.offset());
});
};
threadPoolExecutor.submit(kafkaProducerRunnable);
}
// 等待所有消息执行完成
countDownLatch.await();
}
@AfterAll
public static void after() {
if (Objects.nonNull(kafkaProducer)) {
kafkaProducer.close();
}
if (null != threadPoolExecutor) {
threadPoolExecutor.shutdown();
}
}
}
package cn.dankal.test.consumergroup;
import cn.dankal.test.common.Const;
import cn.dankal.test.factory.ThreadExecutorFactory;
import cn.hutool.core.collection.CollectionUtil;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadPoolExecutor;
/**
* 测试 Kafka: 消费群组
*
* @author ZGH.MercyModest
* @version V1.0.0
* @create 2021-01-26
*/
public class KafkaConsumerGroupTest {
/**
* Properties
*/
private Properties properties;
/**
* ThreadPoolExecutor
*/
private final ThreadPoolExecutor threadPoolExecutor = ThreadExecutorFactory.getThreadPoolExecutor("dankal-test-kafka-consumer-group");
@BeforeEach
public void before() {
this.properties = new Properties();
final String bootstrapServers = "192.168.128.3:9092,192.168.128.3:9093,192.168.128.3:9094";
// broker(Kafka服务器) 地址清单
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
// message key 反序列化器
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getCanonicalName());
// message value 反序列化器
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getCanonicalName());
}
/**
* 测试Kafka Consumer Group A
*/
@Test
public void testKafkaConsumerGroupA() throws InterruptedException {
properties.put(ConsumerConfig.GROUP_ID_CONFIG, Const.Kafka.ConsumerGroupId.TEST_GROUP_A);
threadPoolExecutor.submit(() -> {
KafkaConsumer<String, String> groupAConsumer01 = new KafkaConsumer<>(properties);
groupAConsumer01.subscribe(CollectionUtil.newArrayList(Const.Kafka.Topic.HELLO_TOPIC));
this.doConsumer("groupAConsumer01", groupAConsumer01);
});
threadPoolExecutor.submit(() -> {
KafkaConsumer<String, String> groupAConsumer02 = new KafkaConsumer<>(properties);
groupAConsumer02.subscribe(CollectionUtil.newArrayList(Const.Kafka.Topic.HELLO_TOPIC));
this.doConsumer("groupAConsumer02", groupAConsumer02);
});
threadPoolExecutor.submit(() -> {
KafkaConsumer<String, String> groupAConsumer03 = new KafkaConsumer<>(properties);
groupAConsumer03.subscribe(CollectionUtil.newArrayList(Const.Kafka.Topic.HELLO_TOPIC));
this.doConsumer("groupAConsumer03", groupAConsumer03);
});
// 不让程序终止运行
new CountDownLatch(1).await();
}
/**
* 测试Kafka Consumer Group B
*/
@Test
public void testKafkaConsumerGroupB() throws InterruptedException {
properties.put(ConsumerConfig.GROUP_ID_CONFIG, Const.Kafka.ConsumerGroupId.TEST_GROUP_B);
threadPoolExecutor.submit(() -> {
KafkaConsumer<String, String> groupBConsumer01 = new KafkaConsumer<>(properties);
groupBConsumer01.subscribe(CollectionUtil.newArrayList(Const.Kafka.Topic.HELLO_TOPIC));
this.doConsumer("groupBConsumer01", groupBConsumer01);
});
threadPoolExecutor.submit(() -> {
KafkaConsumer<String, String> groupBConsumer02 = new KafkaConsumer<>(properties);
groupBConsumer02.subscribe(CollectionUtil.newArrayList(Const.Kafka.Topic.HELLO_TOPIC));
this.doConsumer("groupBConsumer02", groupBConsumer02);
});
new CountDownLatch(1).await();
}
/**
* 测试Kafka Consumer Group C
*/
@Test
public void testKafkaConsumerGroupC() {
properties.put(ConsumerConfig.GROUP_ID_CONFIG, Const.Kafka.ConsumerGroupId.TEST_GROUP_C);
KafkaConsumer<String, String> groupCConsumer = new KafkaConsumer<>(properties);
groupCConsumer.subscribe(CollectionUtil.newArrayList(Const.Kafka.Topic.HELLO_TOPIC));
this.doConsumer("groupCConsumer", groupCConsumer);
}
/**
* 执行 Kafka 消息消费
*
* @param consumerName the consumer name
* @param kafkaConsumer the KafkaConsumer
*/
private void doConsumer(String consumerName, KafkaConsumer<String, String> kafkaConsumer) {
while (true) {
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofMillis(500));
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.printf(
"%s : %s 消费消息 ==> 主题: %s 分区: %s 偏移量: %s key: %s value: %s %n",
Const.FULL_DATE_TIME_FORMATTER.format(LocalDateTime.now()),
consumerName,
consumerRecord.topic(),
consumerRecord.partition(),
consumerRecord.offset(),
consumerRecord.key(),
consumerRecord.value());
}
}
}
}
package cn.dankal.test.first;
import cn.dankal.test.common.Const;
import cn.hutool.core.collection.CollectionUtil;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.Objects;
import java.util.Properties;
/**
* 测试 Kafka clients: 第一个 consumer
*
* @author ZGH.MercyModest
* @version V1.0.0
* @create 2021-01-25
*/
public class KafkaFirstConsumer {
/**
* KafkaConsumer
*/
private static KafkaConsumer<String, String> kafkaConsumer;
@BeforeAll
public static void before() {
Properties properties = new Properties();
final String bootstrapServers = "192.168.128.3:9092,192.168.128.3:9093,192.168.128.3:9094";
// broker(Kafka服务器) 地址清单
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
// message key 反序列化器
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getCanonicalName());
// message value 反序列化器
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getCanonicalName());
properties.put(ConsumerConfig.GROUP_ID_CONFIG, Const.Kafka.ConsumerGroupId.HELLO_CONSUMER_GROUP_ID);
// 通过 Properties 配置 实例化 KafkaConsumer
kafkaConsumer = new KafkaConsumer<>(properties);
}
/**
* 第一个 Kafka 消费者
*/
@Test
public void testKafkaFirstConsumer() {
// consumer 订阅 主题
kafkaConsumer.subscribe(CollectionUtil.newArrayList(Const.Kafka.Topic.HELLO_TOPIC));
while (true) {
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofMillis(500));
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.printf(
"%s 消息消息 ==> 主题: %s 分区: %s 偏移量: %s key: %s value: %s %n",
Const.FULL_DATE_TIME_FORMATTER.format(LocalDateTime.now()),
consumerRecord.topic(),
consumerRecord.partition(),
consumerRecord.offset(),
consumerRecord.key(),
consumerRecord.value());
}
}
}
@AfterAll
public static void after() {
if (Objects.nonNull(kafkaConsumer)) {
kafkaConsumer.close();
}
}
}
package cn.dankal.test.first;
import cn.dankal.test.common.Const;
import cn.hutool.core.util.RandomUtil;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
/**
* 测试 Kafka clients: 第一个 producer
*
* @author ZGH.MercyModest
* @version V1.0.0
* @create 2021-01-25
*/
public class KafkaFirstProducer {
/**
* KafkaProducer
*/
private static KafkaProducer<String, String> kafkaProducer;
@BeforeAll
public static void before() {
Properties properties = new Properties();
final String bootstrapServers = "192.168.128.3:9092,192.168.128.3:9093,192.168.128.3:9094";
// broker(Kafka服务器) 地址清单
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
// message key 序列化器
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getCanonicalName());
// message value 序列化器
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getCanonicalName());
// 通过 Properties 配置 实例化 KafkaProducer
kafkaProducer = new KafkaProducer<>(properties);
}
/**
* 第一个 Kafka 生产者
*/
@Test
public void testKafkaFirstProducer() throws ExecutionException, InterruptedException {
final int sendMessageCount = 6;
for (int i = 0; i < sendMessageCount; i++) {
// 消息key (可以为空)
final String key = "dankal-hello-" + RandomUtil.randomInt(10, 5000);
// 消息内容
final String message = RandomUtil.randomString(64);
ProducerRecord<String, String> stringProducerRecord =
new ProducerRecord<String, String>(
Const.Kafka.Topic.HELLO_TOPIC,
key,
message
);
kafkaProducer.send(stringProducerRecord).get();
}
}
@AfterAll
public static void after() {
if (Objects.nonNull(kafkaProducer)) {
kafkaProducer.close();
}
}
}
package cn.dankal.test.independentconsumer;
import cn.dankal.test.common.Const;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Objects;
import java.util.Properties;
import java.util.stream.Collectors;
/**
* 测试 Kafka : Kafka 独立消费者测试
*
* @author ZGH.MercyModest
* @version V1.0.0
* @create 2021-01-27
*/
public class IndependentConsumerTest {
/**
* KafkaConsumer
*/
private static KafkaConsumer<String, String> kafkaConsumer;
@BeforeAll
public static void before() {
Properties properties = new Properties();
final String bootstrapServers = "192.168.128.3:9092,192.168.128.3:9093,192.168.128.3:9094";
// broker(Kafka服务器) 地址清单
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
// message key 反序列化器
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getCanonicalName());
// message value 反序列化器
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getCanonicalName());
// 通过 Properties 配置 实例化 KafkaConsumer
kafkaConsumer = new KafkaConsumer<>(properties);
}
/**
* Kafka 独立消费者 测试
*/
@Test
public void testKafkaIndependentConsumer() {
// 获取分区信息
List<PartitionInfo> partitionInfoList = kafkaConsumer.partitionsFor(Const.Kafka.Topic.HELLO_TOPIC);
// 通过 PartitionInfoList 封装 topicPartitionList
List<TopicPartition> topicPartitionList = partitionInfoList.stream()
.map(
partitionInfo -> new TopicPartition(partitionInfo.topic(), partitionInfo.partition())
)
.collect(Collectors.toList());
// 给独立消费者分配分区
kafkaConsumer.assign(topicPartitionList);
while (true) {
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofMillis(500));
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.printf(
"%s 消息消息 ==> 主题: %s 分区: %s 偏移量: %s key: %s value: %s %n",
Const.FULL_DATE_TIME_FORMATTER.format(LocalDateTime.now()),
consumerRecord.topic(),
consumerRecord.partition(),
consumerRecord.offset(),
consumerRecord.key(),
consumerRecord.value());
// TODO 分区效果需要我们自己都过编码实现
}
}
}
@AfterAll
public static void after() {
if (Objects.nonNull(kafkaConsumer)) {
kafkaConsumer.close();
}
}
}
package cn.dankal.test.producersendmode;
import cn.dankal.test.common.Const;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
/**
* Kafka 生产者: 生产消息的三种形式
*
* @author ZGH.MercyModest
* @version V1.0.0
* @create 2021-01-26
*/
public class KafkaProducerSendModeTest {
/**
* KafkaProducer
*/
private static KafkaProducer<String, String> kafkaProducer;
@BeforeAll
public static void before() {
Properties properties = new Properties();
final String bootstrapServers = "192.168.128.3:9092,192.168.128.3:9093,192.168.128.3:9094";
// broker(Kafka服务器) 地址清单
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
// message key 序列化器
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getCanonicalName());
// message value 序列化器
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getCanonicalName());
// 通过 Properties 配置 实例化 KafkaProducer
kafkaProducer = new KafkaProducer<>(properties);
}
/**
* 测试: Kafka生产者发送模式: 发送并忘记
*/
@Test
public void testSendAndForget() {
final String topic = Const.Kafka.Topic.HELLO_TOPIC;
final String key = "dankal-test-send-mode";
final String message = "dankal";
ProducerRecord<String, String> producerRecord =
new ProducerRecord<>(
topic,
key,
message
);
// mode: 发送并忘记
kafkaProducer.send(producerRecord);
}
/**
* 测试: Kafka生产者发送模式: 同步发送
*/
@Test
public void testSyncSendMessage() throws ExecutionException, InterruptedException {
final String topic = Const.Kafka.Topic.HELLO_TOPIC;
final String key = "dankal-test-send-mode";
final String message = "dankal";
ProducerRecord<String, String> producerRecord =
new ProducerRecord<>(
topic,
key,
message
);
// mode: 同步发送
RecordMetadata recordMetadata = kafkaProducer.send(producerRecord)
// 调用 Future#get() 会阻塞当前线程
.get();
System.out.printf(
"完成消息发送 %d ==> 主题: %s 分区: %s 偏移量: %d%n",
recordMetadata.timestamp(),
recordMetadata.topic(),
recordMetadata.partition(),
recordMetadata.offset()
);
}
/**
* 测试: Kafka生产者发送模式: 异步发送
*/
@Test
public void testAsyncSendMessage() {
final String topic = Const.Kafka.Topic.HELLO_TOPIC;
final String key = "dankal-test-send-mode";
final String message = "dankal";
ProducerRecord<String, String> producerRecord =
new ProducerRecord<>(
topic,
key,
message
);
// mode: 异步发送
kafkaProducer.send(
producerRecord,
(recordMetadata, exception) -> {
// 发送消息失败: exception message
if (null != exception) {
exception.printStackTrace();
return;
}
// 发送消息成功
System.out.printf(
"异步发送消息成功 %d ==> 主题: %s 分区: %s 偏移量: %d%n",
recordMetadata.timestamp(),
recordMetadata.topic(),
recordMetadata.partition(),
recordMetadata.offset());
}
);
System.out.println("已经通过异步形式发送 Kafka 消息");
}
@AfterAll
public static void after() {
if (Objects.nonNull(kafkaProducer)) {
kafkaProducer.close();
}
}
}
package cn.dankal.test.rebalance;
import cn.dankal.test.common.Const;
import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.map.MapUtil;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
/**
* 测试 Kafka Rebalance: ConsumerWorker
*
* @author ZGH.MercyModest
* @version V1.0.0
* @create 2021-01-26
*/
public class ConsumerWorker {
/**
* Map<TopicPartition, OffsetAndMetadata> 当前消费者分区与偏移量信息
*/
private final Map<TopicPartition, OffsetAndMetadata> offsetAndMetadataMap;
/**
* current ConsumerWorker KafkaConsumer instance
*/
private final KafkaConsumer<String, String> kafkaConsumer;
/**
* 是否是临时工作者: 临时工作者在消费五条消息之后将退出当前消费群组
*/
private final boolean isTempWorker;
/**
* 临时工作者的消费消息的次数: 5
*/
private final static int TEMP_WORK_COUNT = 5;
/**
* KafkaConsumer config properties
*/
private static final Properties PROPERTIES;
//TODO maybe 事务类 提供事务支持
//private final Transactional transactional;
static {
PROPERTIES = new Properties();
final String bootstrapServers = "192.168.128.3:9092,192.168.128.3:9093,192.168.128.3:9094";
// broker(Kafka服务器) 地址清单
PROPERTIES.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
// message key 反序列化器
PROPERTIES.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getCanonicalName());
// message value 反序列化器
PROPERTIES.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getCanonicalName());
PROPERTIES.put(ConsumerConfig.GROUP_ID_CONFIG, Const.Kafka.ConsumerGroupId.TEST_KAFKA_REBALANCE);
}
public ConsumerWorker(boolean isTempWorker) {
this.offsetAndMetadataMap = new ConcurrentHashMap<>();
this.isTempWorker = isTempWorker;
// 消费者线程不安全,建议一个线程一个对应一个消费者实例
this.kafkaConsumer = new KafkaConsumer<>(PROPERTIES);
// 设置 分区再均衡监听器
this.kafkaConsumer.subscribe(
CollectionUtil.newArrayList(Const.Kafka.Topic.HELLO_TOPIC),
new RebalanceListener(offsetAndMetadataMap, kafkaConsumer)
);
}
/**
* 执行 Kafka message consumer work
*/
public void doWork() {
// 执行计数 ,搭配 isTempWorker 使用
int execCount = 0;
try {
while (true) {
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofMillis(500));
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
try {
// TODO Transaction: transaction begin
// transaction.begin();
// TODO 消费消息 处理业务
long currentMessageOffset = consumerRecord.offset();
System.out.printf(
"%s 消息消息 ==> 主题: %s 分区: %s 偏移量: %s key: %s value: %s %n",
Const.FULL_DATE_TIME_FORMATTER.format(LocalDateTime.now()),
consumerRecord.topic(),
consumerRecord.partition(),
currentMessageOffset,
consumerRecord.key(),
consumerRecord.value());
// TODO 保存分区与偏移量信息
TopicPartition topicPartition = new TopicPartition(
consumerRecord.topic(),
consumerRecord.partition()
);
OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(currentMessageOffset + 1, "no metadata");
offsetAndMetadataMap.put(topicPartition, offsetAndMetadata);
//TODO 保存偏移量与分区信息
if (MapUtil.isNotEmpty(offsetAndMetadataMap)) {
for (TopicPartition partition : offsetAndMetadataMap.keySet()) {
Const.PARTITION_OFFSET_INFO_MAP.put(
partition,
offsetAndMetadataMap.get(partition).offset()
);
}
}
// TODO Transaction: transaction commit
// transaction.commit();
kafkaConsumer.commitAsync();
//TODO 执行计数
execCount++;
// TODO just test kafka Rebalance
if (isTempWorker) {
if (execCount == TEMP_WORK_COUNT) {
// 触发分区再均衡
// 临时工作者工作完成~ 即将推出 当前消费群组
System.out.printf(
"%s 完成临时工作即将推出工作群组. offsetAndMetadataMap: %s%n",
Thread.currentThread().getName(),
offsetAndMetadataMap);
//提交自己已经完成的工作消息偏移量
kafkaConsumer.commitSync();
return;
}
}
} catch (Exception e) {
// TODO Transaction: transaction rollback
// transaction.rollback();
// TODO handler exception
e.printStackTrace();
}
}
}
} finally {
kafkaConsumer.commitSync();
kafkaConsumer.close();
}
}
}
package cn.dankal.test.rebalance;
import cn.dankal.test.factory.ThreadExecutorFactory;
import org.junit.jupiter.api.Test;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* 测试 Kafka 分区再均衡
*
* @author ZGH.MercyModest
* @version V1.0.0
* @create 2021-01-26
*/
public class KafkaRebalanceTest {
/**
* 测试 Kafka 分区再均衡
*/
@Test
public void testKafkaRebalance() throws InterruptedException {
final int workerCount = 2;
ThreadPoolExecutor threadPoolExecutor = ThreadExecutorFactory.getThreadPoolExecutor("dankal-test-kafka-rebalance");
for (int i = 0; i < workerCount; i++) {
ConsumerWorker consumerWorker = new ConsumerWorker(false);
threadPoolExecutor.execute(consumerWorker::doWork);
}
TimeUnit.SECONDS.sleep(30);
//启动一个临时消费者,触发分区再均衡
ConsumerWorker tempConsumerWorker = new ConsumerWorker(true);
threadPoolExecutor.execute(tempConsumerWorker::doWork);
new CountDownLatch(1).await();
}
}
package cn.dankal.test.rebalance;
import cn.dankal.test.common.Const;
import cn.hutool.core.util.RandomUtil;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
/**
* @author ZGH.MercyModest
* @version V1.0.0
* @create 2021-01-26
*/
public class ProducerWorkerTest {
/**
* KafkaProducer
*/
private static KafkaProducer<String, String> kafkaProducer;
@BeforeAll
public static void before() {
Properties properties = new Properties();
final String bootstrapServers = "192.168.128.3:9092,192.168.128.3:9093,192.168.128.3:9094";
// broker(Kafka服务器) 地址清单
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
// message key 序列化器
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getCanonicalName());
// message value 序列化器
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getCanonicalName());
// 通过 Properties 配置 实例化 KafkaProducer
kafkaProducer = new KafkaProducer<>(properties);
}
/**
* Kafka 生产者 : 用于 Kafka 分区再均衡的测试
*/
@Test
public void testKafkaFirstProducer() throws ExecutionException, InterruptedException {
for (; ; ) {
// 消息key (可以为空)
final String key = "dankal-hello-" + RandomUtil.randomInt(100, 50000000);
// 消息内容
final String message = RandomUtil.randomString(64);
ProducerRecord<String, String> stringProducerRecord =
new ProducerRecord<String, String>(
Const.Kafka.Topic.HELLO_TOPIC,
key,
message
);
TimeUnit.MILLISECONDS.sleep(300);
kafkaProducer.send(stringProducerRecord).get();
}
}
@AfterAll
public static void after() {
if (Objects.nonNull(kafkaProducer)) {
kafkaProducer.close();
}
}
}
package cn.dankal.test.rebalance;
import cn.dankal.test.common.Const;
import cn.hutool.core.collection.CollectionUtil;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
* 测试 Kafka 分区再均衡: RebalanceHandler
*
* @author ZGH.MercyModest
* @version V1.0.0
* @create 2021-01-26
*/
public class RebalanceListener implements ConsumerRebalanceListener {
/**
* Map<TopicPartition, OffsetAndMetadata> 当前消费者分区与偏移量信息
*/
private final Map<TopicPartition, OffsetAndMetadata> offsetAndMetadataMap;
/**
* current ConsumerWorker KafkaConsumer instance
*/
private final KafkaConsumer<String, String> kafkaConsumer;
public RebalanceListener(Map<TopicPartition, OffsetAndMetadata> offsetAndMetadataMap, KafkaConsumer<String, String> kafkaConsumer) {
this.offsetAndMetadataMap = offsetAndMetadataMap;
this.kafkaConsumer = kafkaConsumer;
}
/**
* 再执行分区再平衡之前,回调此方法
* <p>
* Collection<TopicPartition> ==> 当前 Consumer 所有拥有消费权的分区信息
* <p>
* 我们一般在此方法完成分区再平衡之前,分区偏移量的存储
*
* @param partitions Collection<TopicPartition>
*/
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
// 分区再均衡之前
System.out.printf(
"%s:onPartitionsRevoked 分区: %s%n",
Thread.currentThread().getName(),
partitions
);
System.out.printf(
"%s 即将开始参与分区再均衡,即将提交工作偏移量.偏移量信息为: %s%n",
Thread.currentThread().getName(),
this.offsetAndMetadataMap
);
System.out.printf("偏移量分区信息:%s%n", Const.PARTITION_OFFSET_INFO_MAP);
// TODO transaction begin
try {
for (TopicPartition partition : partitions) {
//TODO 将偏移量执行事务写入
Const.PARTITION_OFFSET_INFO_MAP.put(
partition,
offsetAndMetadataMap.get(partition).offset());
}
//TODO transaction commit
//可选 将偏移量信息提交到 Kafka 服务器
//因为发生分区再均衡 分区偏移量是我们自己在维护 Const.PARTITION_OFFSET_INFO_MAP
kafkaConsumer.commitSync(offsetAndMetadataMap);
} catch (Exception e) {
// TODO transaction rollback
// TODO handler exception
e.printStackTrace();
}
}
/**
* 完成分区再平衡之后,回调当前方法
* <p>
* Collection<TopicPartition> ==> 当前 Consumer 所有拥有消费权的分区信息
* <p>
* 我们一般在此方法内,通过在方法{@link #onPartitionsRevoked(Collection)}存储的分区偏移量信息,对分区偏移量信息进行矫正
*
* @param partitions Collection<TopicPartition>
*/
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
System.out.printf(
"%s:onPartitionsAssigned 分区: %s%n",
Thread.currentThread().getName(),
partitions
);
System.out.printf(
"%s 已完成参与分区再均衡.偏移量信息为: %s%n",
Thread.currentThread().getName(),
Const.PARTITION_OFFSET_INFO_MAP
);
//分区偏移量矫正
for (TopicPartition partition : partitions) {
System.out.printf("%s:topic %n", partition);
// 获取我们自己维护的分区偏移量信息
Long offset = Const.PARTITION_OFFSET_INFO_MAP.get(partition);
if (null == offset) {
continue;
}
// 调整分区偏移量
// 从特定偏移量开始记录分区偏移量
// 保证分区再均衡之后,数据偏移量不会错乱
kafkaConsumer.seek(partition, offset);
}
resetConsumerOffsetAndMetadataMap(partitions);
}
/**
* 重置当前 Kafka Consumer 的 <code>Map<TopicPartition, OffsetAndMetadata></code>
*
* @param partitions Collection<TopicPartition>
*/
private void resetConsumerOffsetAndMetadataMap(Collection<TopicPartition> partitions) {
if (CollectionUtil.isEmpty(partitions)) {
return;
}
List<Integer> ownPartitionIdList = partitions.stream()
.map(TopicPartition::partition)
.collect(Collectors.toList());
System.out.printf("%s before: resetConsumerOffsetAndMetadataMap %s %n", Thread.currentThread().getName(), this.offsetAndMetadataMap);
for (TopicPartition topicPartition : this.offsetAndMetadataMap.keySet()) {
if (!ownPartitionIdList.contains(topicPartition.partition())) {
// 分区再平衡时,当前消费者已经不再拥有当前 partition 的消费权
// 避免出现覆盖,故需要在当前 Consumer 的 offsetAndMetadataMap 移除 当前 partition 的信息
// 出现场景可以参阅项目目录: test-kafka-rebalance-log-error.html
this.offsetAndMetadataMap.remove(topicPartition);
}
}
System.out.printf("%s after: resetConsumerOffsetAndMetadataMap %s %n", Thread.currentThread().getName(), this.offsetAndMetadataMap);
}
}
description 'dankal-test-kafka-spring'
version '1.0.0.RELEASE'
dependencies {
// spring dependence
implementation 'org.springframework.boot:spring-boot-starter-web'
implementation 'org.springframework.kafka:spring-kafka'
testImplementation('org.springframework.boot:spring-boot-starter-test') {
exclude group: 'org.junit.vintage', module: 'junit-vintage-engine'
}
}
// package 配置
bootJar.enabled = true
jar.enabled = true
\ No newline at end of file
distributionBase=GRADLE_USER_HOME distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists distributionPath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-6.7.1-bin.zip distributionUrl=https\://res.mercymodest.com/gradle-6.7.1-bin.zip
zipStoreBase=GRADLE_USER_HOME zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists zipStorePath=wrapper/dists
...@@ -11,4 +11,6 @@ pluginManagement { ...@@ -11,4 +11,6 @@ pluginManagement {
} }
} }
rootProject.name = 'dankal-test-kafka' rootProject.name = 'dankal-test-kafka'
include 'dankal-test-kafka-protogenesis'
include 'dankal-test-kafka-spring'
++ "b/\350\233\213\345\243\263\345\210\233\346\204\217\347\247\221\346\212\200-\346\212\200\346\234\257\346\265\213\350\257\225-.md"
This source diff could not be displayed because it is too large. You can view the blob instead.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment