Commit 0574680c by 仲光辉

style: just test

parent 07ecc332
......@@ -2,15 +2,17 @@ package cn.dankal.test.first;
import cn.dankal.test.common.Const;
import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.util.RandomUtil;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.Objects;
......@@ -29,21 +31,23 @@ public class KafkaFirstConsumer {
/**
* KafkaConsumer
*/
private static KafkaConsumer<String, String> kafkaConsumer;
private static KafkaConsumer<byte[], byte[]> kafkaConsumer;
@BeforeAll
public static void before() {
Properties properties = new Properties();
final String bootstrapServers = "192.168.128.3:9092,192.168.128.3:9093,192.168.128.3:9094";
final String bootstrapServers = "39.100.35.15:9092";
// broker(Kafka服务器) 地址清单
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
// message key 反序列化器
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getCanonicalName());
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getCanonicalName());
// message value 反序列化器
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getCanonicalName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getCanonicalName());
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, RandomUtil.randomString(12));
properties.put(ConsumerConfig.GROUP_ID_CONFIG, Const.Kafka.ConsumerGroupId.HELLO_CONSUMER_GROUP_ID);
// 通过 Properties 配置 实例化 KafkaConsumer
kafkaConsumer = new KafkaConsumer<>(properties);
......@@ -55,18 +59,22 @@ public class KafkaFirstConsumer {
@Test
public void testKafkaFirstConsumer() {
// consumer 订阅 主题
kafkaConsumer.subscribe(CollectionUtil.newArrayList(Const.Kafka.Topic.HELLO_TOPIC));
kafkaConsumer.subscribe(CollectionUtil.newArrayList("cdc-test"));
while (true) {
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofMillis(500));
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
ConsumerRecords<byte[], byte[]> consumerRecords = kafkaConsumer.poll(Duration.ofMillis(500));
for (ConsumerRecord<byte[], byte[]> consumerRecord : consumerRecords) {
String key = new String(consumerRecord.key(), StandardCharsets.UTF_8);
String value = new String(consumerRecord.value(), StandardCharsets.UTF_8);
if (key.contains("test_master") || value.contains("test_master")) {
System.out.printf(
"%s 消息消息 ==> 主题: %s 分区: %s 偏移量: %s key: %s value: %s %n",
Const.FULL_DATE_TIME_FORMATTER.format(LocalDateTime.now()),
consumerRecord.topic(),
consumerRecord.partition(),
consumerRecord.offset(),
consumerRecord.key(),
consumerRecord.value());
key,
value);
}
}
}
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment