Commit d2f61582 by 仲光辉

feat: finished spring-boot-kafka and spring-cloud-stream-kafka

parent 7cc9b3ed
description 'dankal-test-kafka-spring'
description 'dankal-test-kafka-spring-boot'
version '1.0.0.RELEASE'
dependencies {
......@@ -8,6 +8,7 @@ dependencies {
testImplementation('org.springframework.boot:spring-boot-starter-test') {
exclude group: 'org.junit.vintage', module: 'junit-vintage-engine'
}
//implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.11.2'
}
// package 配置
......
package cn.dankal.test;
package cn.dankal.test.boot;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
......
package cn.dankal.test.boot.common;
import cn.hutool.core.util.StrUtil;
/**
* @author ZGH.MercyModest
* @version V1.0.0
* @create 2021-01-28
*/
public class Common {
/**
* 获取 Spring Kafka Consumer 线程名称
*
* @param rowThreadName 未处理的 thread name
* @return 处理后的线程名称
*/
public static String getSpringKafkaConsumerName(String rowThreadName) {
if (StrUtil.isBlank(rowThreadName)) {
return rowThreadName;
}
return rowThreadName.split("#")[1];
}
}
package cn.dankal.test.common;
package cn.dankal.test.boot.common;
import java.time.format.DateTimeFormatter;
/**
* @author ZGH.MercyModest
......@@ -12,6 +14,12 @@ public interface Const {
*/
String RANDOM_BASE_STRING = "zxcvbnmasdfghjklqwertyuiopQAZWSXEDCRFVTGBYHNIUJKLMOP1234567890-=[],.;'";
/**
* format pattern: yyyy-MM-dd HH:mm:ss.SSS
*/
DateTimeFormatter FULL_DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
/**
* kafka 相关常量
*/
......@@ -24,6 +32,16 @@ public interface Const {
* simple-string-consumer
*/
String GROUP_ID_SIMPLE_STRING_CONSUMER = "simple-string-consumer";
/**
* hello-spring-kafka-group
*/
String GROUP_ID_HELLO_SPRING_KAFKA_CONSUMER = "hello-spring-kafka-group";
/**
* test-string-message-handler-error
*/
String GROUP_ID_TEST_STRING_MESSAGE_HANDLER_ERROR_CONSUMER = "test-string-message-handler-error";
}
/**
......@@ -32,8 +50,28 @@ public interface Const {
interface Topic {
/**
* simple-string-message
* <p>
* 简单 String message 主题
* </p>
*/
String TOPIC_SIMPLE_STRING_MESSAGE = "simple-string-message";
/**
* hello-spring-kafka
* <p>
* 测试 spring kafka 的主题
* </p>
*/
String TOPIC_HELLO_SPRING_KAFKA = "hello-spring-kafka";
/**
* handler-string-message-error-message
* <p>
* 存放 出现 string message 异常信息的主题
* </p>
*/
String TOPIC_HANDLER_STRING_MESSAGE_ERROR_MESSAGE = "handler-string-message-error-message";
}
}
}
package cn.dankal.test.boot.component;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.FailureCallback;
/**
* @author ZGH.MercyModest
* @version V1.0.0
* @create 2021-01-28
*/
@Slf4j
@Component
public class AsyncSendMessageFailureCallback implements FailureCallback {
@Override
public void onFailure(Throwable ex) {
Throwable exCause = ex.getCause();
log.error("异步:发送消息失败 ==>异常原因:{} 异常消息: {}", exCause != null ? exCause.getMessage() : "", ex.getMessage());
}
}
package cn.dankal.test.boot.component;
import cn.dankal.test.boot.entity.DankalSimpleMessage;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.SuccessCallback;
/**
* @author ZGH.MercyModest
* @version V1.0.0
* @create 2021-01-28
*/
@Slf4j
@Component
public class DankalMessageSuccessCallback implements SuccessCallback<SendResult<String, DankalSimpleMessage>> {
@Override
public void onSuccess(SendResult<String, DankalSimpleMessage> result) {
if (null == result) {
if (log.isWarnEnabled()) {
log.warn("current result is null!!!");
}
return;
}
RecordMetadata recordMetadata = result.getRecordMetadata();
if (log.isDebugEnabled() && null != recordMetadata) {
log.debug(
"成功发送消息:主题: {}, 分区: {},偏移量: {} ,key: {}, value: {}",
recordMetadata.topic(),
recordMetadata.partition(),
recordMetadata.offset(),
result.getProducerRecord().key(),
result.getProducerRecord().value()
);
}
}
}
package cn.dankal.test.boot.config;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
import org.springframework.kafka.listener.ErrorHandler;
import org.springframework.kafka.listener.SeekToCurrentErrorHandler;
/**
* @author ZGH.MercyModest
* @version V1.0.0
* @create 2021-01-29
*/
@Configuration
public class KafkaBaseConfig {
/**
* retryCount
*/
@Value("${spring.kafka.retry-count:3}")
private Integer retryCount;
/**
* 配置 {@link SeekToCurrentErrorHandler} 用于消息消费重试
*
* @param template KafkaTemplate
* @return ErrorHandler
* @see DeadLetterPublishingRecoverer
*/
@Bean
@Primary
public ErrorHandler kafkaErrorHandler(KafkaTemplate<Object, Object> template) {
DeadLetterPublishingRecoverer deadLetterPublishingRecoverer = new DeadLetterPublishingRecoverer(template);
return new SeekToCurrentErrorHandler(deadLetterPublishingRecoverer, retryCount);
}
}
package cn.dankal.test.config;
package cn.dankal.test.boot.config;
import cn.dankal.test.common.Const;
import cn.dankal.test.boot.common.Const;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
......@@ -25,6 +26,12 @@ public class KafkaTopicConfig {
*/
private final KafkaProperties kafkaProperties;
/**
* Kafka 集群数量
*/
@Value("${spring.kafka.broker-count}")
private Short brokerCount;
public KafkaTopicConfig(KafkaProperties kafkaProperties) {
this.kafkaProperties = kafkaProperties;
}
......@@ -37,11 +44,47 @@ public class KafkaTopicConfig {
return new KafkaAdmin(configs);
}
/**
* 新建一个kafka主题: hello-spring-kafka 如果需要话
*
* @return NewTopic
* @see Const.Kafka.Topic#TOPIC_HELLO_SPRING_KAFKA
*/
@Bean
public NewTopic simpleStringTopic() {
public NewTopic helloSpringKafkaTopic() {
return new NewTopic(
Const.Kafka.Topic.TOPIC_HELLO_SPRING_KAFKA,
kafkaProperties.getListener().getConcurrency(),
brokerCount);
}
/**
* 新建一个kafka主题: simple-string-message 如果需要话
*
* @return NewTopic
* @see Const.Kafka.Topic#TOPIC_SIMPLE_STRING_MESSAGE
*/
@Bean
public NewTopic simpleStringKafkaTopic() {
return new NewTopic(
Const.Kafka.Topic.TOPIC_SIMPLE_STRING_MESSAGE,
kafkaProperties.getListener().getConcurrency(),
(short) 3);
brokerCount);
}
/**
* 新建一个kafka主题: handler-string-message-error-message 如果需要话
*
* @return NewTopic
* @see Const.Kafka.Topic#TOPIC_HANDLER_STRING_MESSAGE_ERROR_MESSAGE
*/
@Bean
public NewTopic handlerStringMessageErrorMessageKafkaTopic() {
return new NewTopic(
Const.Kafka.Topic.TOPIC_HANDLER_STRING_MESSAGE_ERROR_MESSAGE,
kafkaProperties.getListener().getConcurrency(),
brokerCount);
}
}
package cn.dankal.test.boot.controller;
import cn.dankal.test.boot.common.Const;
import cn.dankal.test.boot.entity.DankalSimpleMessage;
import cn.dankal.test.boot.factory.ThreadExecutorFactory;
import cn.dankal.test.boot.service.KafkaDankalMessageService;
import cn.dankal.test.boot.service.KafkaStringMessageService;
import cn.hutool.core.util.RandomUtil;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;
import java.time.LocalDateTime;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ThreadPoolExecutor;
/**
* @author ZGH.MercyModest
* @version V1.0.0
* @create 2021-01-28
*/
@RestController
public class SendMessageController {
/**
* KafkaMessageService
*/
private final KafkaDankalMessageService kafkaDankalMessageService;
/**
* KafkaStringMessageService
*/
private final KafkaStringMessageService kafkaStringMessageService;
/**
* ThreadPoolExecutor
*/
private final static ThreadPoolExecutor THREAD_POOL_EXECUTOR = ThreadExecutorFactory.getThreadPoolExecutor("dankal-test-spring-kafka");
public SendMessageController(KafkaDankalMessageService kafkaDankalMessageService, KafkaStringMessageService kafkaStringMessageService) {
this.kafkaDankalMessageService = kafkaDankalMessageService;
this.kafkaStringMessageService = kafkaStringMessageService;
}
@GetMapping("/hello/simple/{count}")
public String sendHelloMessageBySimple(@PathVariable("count") Integer count) {
final String topic = Const.Kafka.Topic.TOPIC_HELLO_SPRING_KAFKA;
if (count <= 0) {
count = 10;
}
for (int i = 0; i < count; i++) {
final String key = "dankal-test-" + RandomUtil.randomInt(1, 10000);
DankalSimpleMessage dankalSimpleMessage = new DankalSimpleMessage()
.setId(i + 1)
.setKey(key)
.setContent(RandomUtil.randomString(Const.RANDOM_BASE_STRING, 15))
.setSendTime(LocalDateTime.now());
try {
kafkaDankalMessageService.sendMessageBySimple(topic, key, dankalSimpleMessage);
} catch (Exception e) {
e.printStackTrace();
return "error";
}
}
return "success";
}
@GetMapping("/hello/sync/{count}")
public String sendHelloMessageBySync(@PathVariable("count") Integer count) {
final String topic = Const.Kafka.Topic.TOPIC_HELLO_SPRING_KAFKA;
if (count <= 0) {
count = 10;
}
for (int i = 0; i < count; i++) {
final String key = "dankal-test-" + RandomUtil.randomInt(1, 10000);
DankalSimpleMessage dankalSimpleMessage = new DankalSimpleMessage()
.setId(i + 1)
.setKey(key)
.setContent(RandomUtil.randomString(Const.RANDOM_BASE_STRING, 15))
.setSendTime(LocalDateTime.now());
try {
kafkaDankalMessageService.sendMessageBySync(topic, key, dankalSimpleMessage);
} catch (Exception e) {
e.printStackTrace();
return "error";
}
}
return "success";
}
@GetMapping("/hello/async/{count}")
public String sendHelloMessageByAsync(@PathVariable("count") Integer count) {
final String topic = Const.Kafka.Topic.TOPIC_HELLO_SPRING_KAFKA;
if (count <= 0) {
count = 10;
}
for (int i = 0; i < count; i++) {
final String key = "dankal-test-" + RandomUtil.randomInt(1, 10000);
DankalSimpleMessage dankalSimpleMessage = new DankalSimpleMessage()
.setId(i + 1)
.setKey(key)
.setContent(RandomUtil.randomString(Const.RANDOM_BASE_STRING, 15))
.setSendTime(LocalDateTime.now());
try {
kafkaDankalMessageService.sendMessageByAsync(topic, key, dankalSimpleMessage);
} catch (Exception e) {
e.printStackTrace();
return "error";
}
}
return "success";
}
@GetMapping("/test/string")
public String testSendStringMessage() {
final String topic = Const.Kafka.Topic.TOPIC_SIMPLE_STRING_MESSAGE;
final String key = "dankal-test-str-" + RandomUtil.randomInt(10, 500);
final String message = RandomUtil.randomString(20);
kafkaStringMessageService.sendStringMessage(topic, key, message);
return "success";
}
@GetMapping("/string/batch/{count}")
public String testBatchSendMessage(@PathVariable("count") Integer batchCount) {
if (batchCount <= 0) {
batchCount = 10;
}
CyclicBarrier cyclicBarrier = new CyclicBarrier(batchCount);
for (int i = 0; i < batchCount; i++) {
THREAD_POOL_EXECUTOR.submit(() -> {
final String topic = Const.Kafka.Topic.TOPIC_SIMPLE_STRING_MESSAGE;
final String key = "dankal-test";
final String message = RandomUtil.randomString(20);
try {
// 保证一起发送
cyclicBarrier.await();
kafkaStringMessageService.sendStringMessage(topic, key, message);
} catch (Exception e) {
//ignored
}
});
}
return "success";
}
}
package cn.dankal.test.boot.entity;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.fasterxml.jackson.datatype.jsr310.deser.LocalDateTimeDeserializer;
import com.fasterxml.jackson.datatype.jsr310.ser.LocalDateTimeSerializer;
import lombok.Data;
import lombok.experimental.Accessors;
import org.springframework.kafka.support.serializer.JsonSerializer;
import java.io.Serializable;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.util.Date;
/**
* @author ZGH.MercyModest
* @version V1.0.0
* @create 2021-01-28
*/
@Data
@Accessors(chain = true)
public class DankalSimpleMessage implements Serializable {
private static final long serialVersionUID = -81606093254258919L;
/**
* message is
*/
private Integer id;
/**
* message key
*/
private String key;
/**
* message value
*/
private String content;
/**
* message send time
* <pre>
* 因为 {@link JsonSerializer} 默认使用支持 {@link Date} 的序列化和反序列化对于Java8提供的诸如
* {@link LocalDateTime}
* {@link LocalDate}
* {@link LocalTime}
* 新的时间API需要通过注解指定相关序列化和反序列化器
* </pre>
*/
@JsonSerialize(using = LocalDateTimeSerializer.class)
@JsonDeserialize(using = LocalDateTimeDeserializer.class)
private LocalDateTime sendTime;
}
package cn.dankal.test.boot.factory;
import cn.hutool.core.util.StrUtil;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* ThreadExecutorFactory
* @author ZGH.MercyModest
* @version V1.0.0
* @create 2020-12-29
*/
public class ThreadExecutorFactory {
/**
* 获取 ThreadPoolExecutor
*
* @param corePoolSize 核心线程数
* @param maximumPoolSize 最大线程数
* @param keepAliveTime 超过核心线程数的线程存活时间
* @param timeUnit 超过核心线程数的线程存活时间单位
* @param threadNamePrefix 创建线程的名称前缀
* @param blockQueueCapacity 线程池阻塞队列的容量
* @return ThreadPoolExecutor
* @see ThreadPoolExecutor
*/
public static ThreadPoolExecutor getThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit timeUnit,
String threadNamePrefix,
int blockQueueCapacity) {
// 参数校验
if (corePoolSize <= 0) {
throw new IllegalArgumentException("corePoolSize 必须大于0");
}
if (maximumPoolSize <= 0) {
throw new IllegalArgumentException("maximumPoolSize 必须大于0");
}
if (keepAliveTime < 0) {
throw new IllegalArgumentException("keepAliveTime 必须大于0");
}
if (null == timeUnit) {
throw new IllegalArgumentException("timeUnit 不能为空");
}
if (corePoolSize > maximumPoolSize) {
throw new IllegalArgumentException("corePoolSize 必须小于等于 maximumPoolSize");
}
if (StrUtil.isBlank(threadNamePrefix)) {
throw new IllegalArgumentException("threadNamePrefix 不能为空");
}
if (blockQueueCapacity <= 0) {
throw new IllegalArgumentException("blockQueueCapacity 必须大于0");
}
return new ThreadPoolExecutor(
corePoolSize,
maximumPoolSize,
keepAliveTime,
timeUnit,
new ArrayBlockingQueue<>(blockQueueCapacity),
threadFactory(threadNamePrefix)
);
}
/**
* 初始化 ThreadFactory 并为 创建的 线程 命名
*
* @param namePrefix 线程名称前缀
* @return ThreadFactory
*/
private static ThreadFactory threadFactory(String namePrefix) {
return (runnable -> {
Thread thread = new Thread(runnable);
long id = thread.getId();
thread.setName(namePrefix + "-" + id);
return thread;
});
}
/**
* 获取一个 <code>ThreadPoolExecutor</code>
* <p>
* coreSize: 10
* <br/>
* maximumPoolSize: 50
* <br/>
* KeepAliveTime: 10
* <br/>
* unit: TimeUnit.MILLISECONDS
* <br/>
* blockQueueCapacity: 100
* <br/>
* </p>
*
* @param threadNamePrefix <code>threadNamePrefix</code>
* @return ThreadPoolExecutor
*/
public static ThreadPoolExecutor getThreadPoolExecutor(final String threadNamePrefix) {
return ThreadExecutorFactory.getThreadPoolExecutor(10,
50,
10,
TimeUnit.MILLISECONDS,
threadNamePrefix,
100);
}
}
package cn.dankal.test.boot.handler;
import cn.dankal.test.boot.common.Common;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.kafka.listener.ConsumerAwareListenerErrorHandler;
import org.springframework.kafka.listener.ListenerExecutionFailedException;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.stereotype.Component;
/**
* {@link ConsumerAwareListenerErrorHandler}
*
* @author ZGH.MercyModest
* @version V1.0.0
* @create 2021-01-29
* @see SendTo
*/
@Slf4j
@Component
@ConditionalOnBean(name = {"testErrorHandler"})
public class KafkaStringMessageListenerErrorHandler implements ConsumerAwareListenerErrorHandler {
@Override
public Object handleError(Message<?> message, ListenerExecutionFailedException exception, Consumer<?, ?> consumer) {
Throwable exceptionCause = exception.getCause();
String errorMessage = exception.getMessage();
if (null != exceptionCause) {
errorMessage = exceptionCause.getMessage();
}
// TODO 消息信息获取
// TODO org.springframework.kafka.support.KafkaHeaders
MessageHeaders messageHeaders = message.getHeaders();
// doc: The header containing the topic from which the message was received.
String receivedTopic = messageHeaders.get(KafkaHeaders.RECEIVED_TOPIC, String.class);
Integer partitionId = messageHeaders.get(KafkaHeaders.RECEIVED_PARTITION_ID, Integer.class);
Long offset = messageHeaders.get(KafkaHeaders.OFFSET, Long.class);
if (log.isDebugEnabled()) {
log.debug(
"线程 id: {} receivedTopic: {} partitionId: {} offset: {}",
Common.getSpringKafkaConsumerName(Thread.currentThread().getName()),
receivedTopic,
partitionId,
offset
);
}
// TODO 处理异常消息,做一些补偿操作
// TODO 诸如 使用 Consumer#seek(TopicPartition partition, long offset) 重置 偏移量
// TODO 让消费者重新消费
if (log.isInfoEnabled()) {
log.info(
"{} 处理字符串消息: {} 出现异常:{} ",
Thread.currentThread().getName(),
message.getPayload(),
errorMessage);
}
// TODO 因为测试用例 使用 JsonSerializer 处理的数据 无法实现 使用 String 消费
// 异常摘要: Cannot convert from [cn.dankal.test.boot.entity.DankalSimpleMessage] to [java.lang.String]
// 故在此出选择提交当前消息的偏移量 提交避免重复通知
consumer.commitSync();
// @SendTo
// 此方法返回将会被 SendTo 注解作为消息发送到指定主题
return "出现异常啦: " + errorMessage + " !!!";
}
}
package cn.dankal.test.boot.listener;
import cn.dankal.test.boot.common.Common;
import cn.dankal.test.boot.common.Const;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
import java.util.List;
/**
* @author ZGH.MercyModest
* @version V1.0.0
* @create 2021-01-29
*/
@Slf4j
@Component
@ConditionalOnProperty(prefix = "spring.kafka",
name = {"listener.type"}, havingValue = "batch")
public class BatchStringKafkaMessageListener {
@KafkaListener(
topics = {Const.Kafka.Topic.TOPIC_SIMPLE_STRING_MESSAGE},
groupId = Const.Kafka.ConsumerGroupId.GROUP_ID_SIMPLE_STRING_CONSUMER
)
public void batchHandlerStringMessage(List<String> messageList, Acknowledgment acknowledgment) {
if (log.isDebugEnabled()) {
log.debug(
"{} 成功获取到 {} 条数据",
Const.FULL_DATE_TIME_FORMATTER.format(LocalDateTime.now()),
messageList.size()
);
}
for (String message : messageList) {
if (log.isInfoEnabled()) {
log.info(
"{} {} 成功消费消息: {}",
Const.FULL_DATE_TIME_FORMATTER.format(LocalDateTime.now()),
Common.getSpringKafkaConsumerName(Thread.currentThread().getName()),
message);
}
}
acknowledgment.acknowledge();
}
}
package cn.dankal.test.boot.listener;
import cn.dankal.test.boot.common.Common;
import cn.dankal.test.boot.common.Const;
import cn.dankal.test.boot.entity.DankalSimpleMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
/**
* @author ZGH.MercyModest
* @version V1.0.0
* @create 2021-01-21
*/
@Slf4j
@Component
@ConditionalOnMissingBean(name = {"testErrorHandler"})
public class KafkaDankalMessageListener {
@KafkaListener(
topics = {Const.Kafka.Topic.TOPIC_HELLO_SPRING_KAFKA},
groupId = Const.Kafka.ConsumerGroupId.GROUP_ID_HELLO_SPRING_KAFKA_CONSUMER
)
public void dankalMessageListener(DankalSimpleMessage dankalSimpleMessage,
Acknowledgment acknowledgment) {
if (log.isInfoEnabled()) {
log.info("{} 执行线程: {} 消费消息 ==> 主题: {}",
Const.FULL_DATE_TIME_FORMATTER.format(LocalDateTime.now()),
Common.getSpringKafkaConsumerName(Thread.currentThread().getName()),
dankalSimpleMessage);
}
acknowledgment.acknowledge();
}
}
package cn.dankal.test.boot.listener;
import cn.dankal.test.boot.common.Common;
import cn.dankal.test.boot.common.Const;
import cn.dankal.test.boot.handler.KafkaStringMessageListenerErrorHandler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.messaging.handler.annotation.SendTo;
import java.time.LocalDateTime;
/**
* @author ZGH.MercyModest
* @version V1.0.0
* @create 2021-01-29
*/
@Slf4j
//@Component("testErrorHandler")
public class KafkaListenerErrorHandlerTestListener {
/**
* <pre>
* 此监听器用来测试 spring kafka 对 消息消费异常的处理机制
* 此监听器接收的 String 类型的 message,但是其订阅 主题 所传递的消息是: DankalMessage
* 故将会抛出异常,以达到测试 errorHandler 的作用
* </pre>
*
* @param message 需要消息消息
* @param acknowledgment 消息确认器
* @return 异常消息
*/
@KafkaListener(
topics = {Const.Kafka.Topic.TOPIC_HELLO_SPRING_KAFKA},
groupId = Const.Kafka.ConsumerGroupId.GROUP_ID_TEST_STRING_MESSAGE_HANDLER_ERROR_CONSUMER,
errorHandler = "kafkaStringMessageListenerErrorHandler"
)
@SendTo({Const.Kafka.Topic.TOPIC_HANDLER_STRING_MESSAGE_ERROR_MESSAGE})
public String testStringMessageHandlerError(String message, Acknowledgment acknowledgment) {
if (log.isInfoEnabled()) {
log.info("{} 执行线程: {} 消费消息 ==> 主题: {}",
Const.FULL_DATE_TIME_FORMATTER.format(LocalDateTime.now()),
Common.getSpringKafkaConsumerName(Thread.currentThread().getName()),
message);
}
acknowledgment.acknowledge();
return "handler message: " + message + " success";
}
/**
* <pre>
* 此监听器 是用来处理 {@link #testStringMessageHandlerError(String, Acknowledgment)} 消费消息时出现异常时,
* {@link KafkaStringMessageListenerErrorHandler} 处理方法的方法返回值.
* </pre>
*
* @param message 异常处理方法返回值
* @param acknowledgment 消息确认器
*/
@KafkaListener(
topics = {Const.Kafka.Topic.TOPIC_HANDLER_STRING_MESSAGE_ERROR_MESSAGE},
groupId = Const.Kafka.ConsumerGroupId.GROUP_ID_TEST_STRING_MESSAGE_HANDLER_ERROR_CONSUMER
)
public void handlerStringMessageError(String message, Acknowledgment acknowledgment) {
if (log.isInfoEnabled()) {
log.info("handlerStringMessageError ==> {} 执行线程: {} 处理异常信息 ==> 异常消息: {}",
Const.FULL_DATE_TIME_FORMATTER.format(LocalDateTime.now()),
Common.getSpringKafkaConsumerName(Thread.currentThread().getName()),
message);
}
acknowledgment.acknowledge();
}
}
package cn.dankal.test.boot.listener;
import cn.dankal.test.boot.common.Common;
import cn.dankal.test.boot.common.Const;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
/**
* @author ZGH.MercyModest
* @version V1.0.0
* @create 2021-01-29
*/
@Slf4j
@Component("test-retry")
public class SpringKafkaRetryTestListener {
@KafkaListener(
topics = {Const.Kafka.Topic.TOPIC_SIMPLE_STRING_MESSAGE},
groupId = Const.Kafka.ConsumerGroupId.GROUP_ID_SIMPLE_STRING_CONSUMER
)
public void stringMessageListener(String message,
Acknowledgment acknowledgment) {
if (log.isInfoEnabled()) {
log.info("{} 执行线程: {} 消费消息 ==> 消息: {}",
Const.FULL_DATE_TIME_FORMATTER.format(LocalDateTime.now()),
Common.getSpringKafkaConsumerName(Thread.currentThread().getName()),
message);
}
throw new RuntimeException("throw an exception when consumer the message~");
//acknowledgment.acknowledge();
}
}
package cn.dankal.test.listener;
package cn.dankal.test.boot.listener;
import cn.dankal.test.common.Const;
import cn.dankal.test.boot.common.Common;
import cn.dankal.test.boot.common.Const;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
/**
* @author ZGH.MercyModest
* @version V1.0.0
* @create 2021-01-21
* @create 2021-01-28
*/
@Slf4j
@Component
public class KafkaMessageListener {
@ConditionalOnMissingBean(name = {"testErrorHandler","test-retry"})
@ConditionalOnProperty(prefix = "spring.kafka",
name = {"listener.type"},havingValue = "single")
public class StringKafkaMessageListener {
@KafkaListener(
topics = {Const.Kafka.Topic.TOPIC_SIMPLE_STRING_MESSAGE},
groupId = Const.Kafka.ConsumerGroupId.GROUP_ID_SIMPLE_STRING_CONSUMER
)
public void stringMessageListener(ConsumerRecord<?, ?> data,
Acknowledgment acknowledgment,
Consumer<?, ?> consumer) {
public void stringMessageListener(String message,
Acknowledgment acknowledgment) {
if (log.isInfoEnabled()) {
log.info("{} 消费消息 ==> 主题: {} 分区: {} 偏移量: {} key: {} value: {}",
Thread.currentThread().getName(),
data.topic(),
data.partition(),
data.offset(),
data.key(),
data.value());
log.info("{} 执行线程: {} 消费消息 ==> 主题: {}",
Const.FULL_DATE_TIME_FORMATTER.format(LocalDateTime.now()),
Common.getSpringKafkaConsumerName(Thread.currentThread().getName()),
message);
}
acknowledgment.acknowledge();
}
......
package cn.dankal.test.boot.service;
import cn.dankal.test.boot.entity.DankalSimpleMessage;
import org.springframework.util.concurrent.ListenableFuture;
import java.util.concurrent.ExecutionException;
/**
* @author ZGH.MercyModest
* @version V1.0.0
* @create 2021-01-21
*/
public interface KafkaDankalMessageService {
/**
* 发送 Kafka 消息: 发送并忘记
*
* @param topic 目标 topic
* @param key 消息 key
* @param dankalSimpleMessage 需要发送的消息
* @return 是否执行成功
*/
boolean sendMessageBySimple(String topic, String key, DankalSimpleMessage dankalSimpleMessage);
/**
* 同步发送 Kafka 消息
*
* @param topic 目标 topic
* @param key 消息 key
* @param dankalSimpleMessage 需要发送的消息
* @return 是否执行成功
* @throws ExecutionException @see {@link ListenableFuture#get()}
* @throws InterruptedException @see {@link ListenableFuture#get()}
*/
boolean sendMessageBySync(String topic, String key, DankalSimpleMessage dankalSimpleMessage) throws ExecutionException, InterruptedException;
/**
* 异步发送 Kafka 消息
*
* @param topic 目标 topic
* @param key 消息 key
* @param dankalSimpleMessage 需要发送的消息
* @return 是否执行成功过
*/
boolean sendMessageByAsync(String topic, String key, DankalSimpleMessage dankalSimpleMessage);
}
package cn.dankal.test.service;
package cn.dankal.test.boot.service;
/**
* @author ZGH.MercyModest
* @version V1.0.0
* @create 2021-01-21
* @create 2021-01-28
*/
public interface KafkaMessageService {
public interface KafkaStringMessageService {
/**
* send message to assign topic
* 测试 发送 string 类型的 message
*
* @param topic 目标 topic
* @param key 消息 key
* @param key 消息 key
* @param message 需要发送的消息
* @return 是否执行成功
* @return 是否发送成功
*/
boolean sendStringMessage(String topic,String key, String message);
boolean sendStringMessage(String topic, String key, String message);
}
package cn.dankal.test.boot.service.impl;
import cn.dankal.test.boot.component.AsyncSendMessageFailureCallback;
import cn.dankal.test.boot.component.DankalMessageSuccessCallback;
import cn.dankal.test.boot.entity.DankalSimpleMessage;
import cn.dankal.test.boot.service.KafkaDankalMessageService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
import java.util.concurrent.ExecutionException;
/**
* @author ZGH.MercyModest
* @version V1.0.0
* @create 2021-01-21
*/
@Slf4j
@Service
public class KafkaDankalMessageServiceImpl implements KafkaDankalMessageService {
/**
* KafkaTemplate<String, DankalSimpleMessage>
*/
private final KafkaTemplate<String, DankalSimpleMessage> kafkaTemplate;
/**
* DankalMessageSuccessCallback
*/
private final DankalMessageSuccessCallback dankalMessageSuccessCallback;
/**
* DankalMessageFailureCallback
*/
private final AsyncSendMessageFailureCallback asyncSendMessageFailureCallback;
public KafkaDankalMessageServiceImpl(KafkaTemplate<String, DankalSimpleMessage> kafkaTemplate, DankalMessageSuccessCallback dankalMessageSuccessCallback, AsyncSendMessageFailureCallback asyncSendMessageFailureCallback) {
this.kafkaTemplate = kafkaTemplate;
this.dankalMessageSuccessCallback = dankalMessageSuccessCallback;
this.asyncSendMessageFailureCallback = asyncSendMessageFailureCallback;
}
@Override
public boolean sendMessageBySimple(String topic, String key, DankalSimpleMessage dankalSimpleMessage) {
// 发送并忘记
kafkaTemplate.send(topic, key, dankalSimpleMessage);
return true;
}
@Override
public boolean sendMessageBySync(String topic, String key, DankalSimpleMessage dankalSimpleMessage) throws ExecutionException, InterruptedException {
// 同步发送
SendResult<String, DankalSimpleMessage> sendResult = kafkaTemplate.send(topic, key, dankalSimpleMessage).get();
return sendResult.getRecordMetadata().offset() > 0;
}
@Override
public boolean sendMessageByAsync(String topic, String key, DankalSimpleMessage dankalSimpleMessage) {
// 异步发送
kafkaTemplate
.send(topic, key, dankalSimpleMessage)
// 添加异步回调处理
.addCallback(dankalMessageSuccessCallback, asyncSendMessageFailureCallback);
return true;
}
}
package cn.dankal.test.boot.service.impl;
import cn.dankal.test.boot.service.KafkaStringMessageService;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
/**
* @author ZGH.MercyModest
* @version V1.0.0
* @create 2021-01-28
*/
@Service
public class KafkaStringMessageServiceImpl implements KafkaStringMessageService {
/**
* KafkaTemplate<String, String>
*/
private final KafkaTemplate<String, String> stringKafkaTemplate;
public KafkaStringMessageServiceImpl(KafkaTemplate<String, String> stringKafkaTemplate) {
this.stringKafkaTemplate = stringKafkaTemplate;
}
@Override
public boolean sendStringMessage(String topic, String key, String message) {
stringKafkaTemplate.send(topic, key, message);
return true;
}
}
server:
port: 9001
spring:
kafka:
# kafka集群数量
# 用于配置 replicationFactor : 显示创建 topic 时,设置的 partition 副本基数
broker-count: 1
# Kafka 集群信息
bootstrap-servers:
- 192.168.128.3:9092
# - 192.168.128.3:9093
# - 192.168.128.3:9094
listener:
# 消费者确认模式: 手动
ack-mode: manual_immediate
# 开启多少个消费者线程
concurrency: 3
# 一次确认消费的数量
ack-count: 1
type: single
ack-time: 60
template:
# 默认 topic 名称
default-topic: dankal-test-default-topic
consumer:
# 采用手动提交
enable-auto-commit: false
# 消息使用 Spring 提供的 JSON 发序列化器
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
# 设置为最初的消费进度
# 其取值具体区别可以参阅
# https://blog.csdn.net/lishuangzhe7047/article/details/74530417
auto-offset-reset: earliest
properties:
# 默认信任 java.util,java.lang 包下面的类
# 因此我们需要添加 spring json trusted 配置
spring:
json:
trusted:
packages: cn.dankal.test.boot.entity
## 一下配置用于测试 spring-kafka 的批量消费
# fetch-min-size: 16384
# max-poll-records: 3
# fetch-max-wait: 500
producer:
# 发送失败,重试次数
retries: 5
# 只需要 leader 应答即可
acks: 1
# 消息内容使用 Spring 提供的 JSON 序列化器
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
## 以下配置用于测试 spring-kafka 的批量发送消息
# batch-size: 16384
# buffer-memory: 33554432
# properties:
# 超过收集的时间的最大等待时长
# linger:
# ms: 30000
logging:
level:
cn:
dankal:
test: debug
# 日志太多了,方便观察测试日志 --> 先调高日志级别 ==> error
org:
apache:
kafka: error
springframework:
kafka: error
description 'dankal-test-kafka-spring-cloud-consumer'
version '1.0.0.RELEASE'
dependencies {
// spring boot dependence
implementation 'org.springframework.boot:spring-boot-starter-web'
// spring cloud dependence
implementation 'org.springframework.cloud:spring-cloud-starter-stream-kafka'
}
\ No newline at end of file
package cn.dankal.test.cloud.consumer;
import cn.dankal.test.cloud.consumer.input.DankalMessageInput;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
/**
* @author ZGH.MercyModest
* @version V1.0.0
* @create 2021-02-01
*/
@SpringBootApplication
@EnableBinding({DankalMessageInput.class})
public class SpringCloudStreamKafkaConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(SpringCloudStreamKafkaConsumerApplication.class, args);
}
}
package cn.dankal.test.cloud.consumer.common;
import cn.hutool.core.util.StrUtil;
/**
* @author ZGH.MercyModest
* @version V1.0.0
* @create 2021-01-28
*/
public class Common {
/**
* 获取 Spring Kafka Consumer 线程名称
*
* @param rowThreadName 未处理的 thread name
* @return 处理后的线程名称
*/
public static String getSpringKafkaConsumerName(String rowThreadName) {
if (StrUtil.isBlank(rowThreadName)) {
return rowThreadName;
}
return rowThreadName.split("container-")[1];
}
}
package cn.dankal.test.cloud.consumer.common;
import java.time.format.DateTimeFormatter;
/**
* @author ZGH.MercyModest
* @version V1.0.0
* @create 2021-01-21
*/
public interface Const {
/**
* random base string
*/
String RANDOM_BASE_STRING = "zxcvbnmasdfghjklqwertyuiopQAZWSXEDCRFVTGBYHNIUJKLMOP1234567890-=[],.;'";
/**
* format pattern: yyyy-MM-dd HH:mm:ss.SSS
*/
DateTimeFormatter FULL_DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
/**
* kafka 相关常量
*/
interface Kafka {
/**
* Kafka Consumer group id
*/
interface ConsumerGroupId {
/**
* simple-string-consumer
*/
String GROUP_ID_SIMPLE_STRING_CONSUMER = "simple-string-consumer";
/**
* hello-spring-kafka-group
*/
String GROUP_ID_HELLO_SPRING_KAFKA_CONSUMER = "hello-spring-kafka-group";
/**
* test-string-message-handler-error
*/
String GROUP_ID_TEST_STRING_MESSAGE_HANDLER_ERROR_CONSUMER = "test-string-message-handler-error";
}
/**
* Kafka Topic
*/
interface Topic {
/**
* simple-string-message
* <p>
* 简单 String message 主题
* </p>
*/
String TOPIC_SIMPLE_STRING_MESSAGE = "simple-string-message";
/**
* hello-spring-kafka
* <p>
* 测试 spring kafka 的主题
* </p>
*/
String TOPIC_HELLO_SPRING_KAFKA = "hello-spring-kafka";
/**
* handler-string-message-error-message
* <p>
* 存放 出现 string message 异常信息的主题
* </p>
*/
String TOPIC_HANDLER_STRING_MESSAGE_ERROR_MESSAGE = "handler-string-message-error-message";
}
}
}
package cn.dankal.test.cloud.consumer.consumer;
import cn.dankal.test.cloud.consumer.common.Common;
import cn.dankal.test.cloud.consumer.common.Const;
import cn.dankal.test.cloud.consumer.entity.DankalSimpleMessage;
import cn.dankal.test.cloud.consumer.input.DankalMessageInput;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
/**
* @author ZGH.MercyModest
* @version V1.0.0
* @create 2021-02-04
*/
@Slf4j
@Component
public class ConditionDankalMessageConsumer {
/**
* 此消费者只消费 message header 带有: messageTag 并且其值为: dankal-condition
* @param dankalSimpleMessage
*/
@StreamListener(value = DankalMessageInput.INPUT_NAME,
condition = "headers['messageTag']=='dankal-condition'")
public void onMessage(@Payload DankalSimpleMessage dankalSimpleMessage) {
log.info(
"ConditionDankalMessageConsumer: {} {} 成功消费: {}",
Const.FULL_DATE_TIME_FORMATTER.format(LocalDateTime.now()),
Common.getSpringKafkaConsumerName(Thread.currentThread().getName()),
dankalSimpleMessage
);
}
}
package cn.dankal.test.cloud.consumer.consumer;
import cn.dankal.test.cloud.consumer.common.Common;
import cn.dankal.test.cloud.consumer.common.Const;
import cn.dankal.test.cloud.consumer.entity.DankalSimpleMessage;
import cn.dankal.test.cloud.consumer.input.DankalMessageInput;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.context.IntegrationContextUtils;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.messaging.support.ErrorMessage;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
import java.util.Objects;
/**
* @author ZGH.MercyModest
* @version V1.0.0
* @create 2021-02-01
*/
@Slf4j
@Component
public class DankalMessageConsumer {
@StreamListener(DankalMessageInput.INPUT_NAME)
public void onMessage(
@Payload DankalSimpleMessage dankalSimpleMessage,
@Header(KafkaHeaders.ACKNOWLEDGMENT) Acknowledgment acknowledgment) {
log.info(
"DankalMessageConsumer: {} {} 成功消费: {}",
Const.FULL_DATE_TIME_FORMATTER.format(LocalDateTime.now()),
Common.getSpringKafkaConsumerName(Thread.currentThread().getName()),
dankalSimpleMessage
);
if (dankalSimpleMessage.getId() % 2 == 0) {
acknowledgment.acknowledge();
log.info("{} 消费确认", dankalSimpleMessage.getId());
}
// 测试 spring-cloud-stream-kafka 消息重试
//throw new RuntimeException("dankal test retry consumer message");
}
/**
* 处理指定 {@link org.springframework.cloud.stream.annotation.Input} 的异常
* <pre>
* 异常 input name : Channel(<destination>.<group>.errors)
* 举个栗子:
* {@link DankalMessageInput#dankalMessageInputChannel()}
* destination: cloud-kafka-dankal-message-topic
* group: cloud-kafka-dankal-message-consumer-group
* 则 此 input 对应消费异常管道为:
* cloud-kafka-dankal-message-topic.cloud-kafka-dankal-message-consumer-group.errors
*
* </pre>
*/
@ServiceActivator(inputChannel = "cloud-kafka-dankal-message-topic.cloud-kafka-dankal-message-consumer-group.errors")
public void handlerDankalMessageInputError(ErrorMessage errorMessage) {
log.info("current error is handler to DankalMessageInput ");
log.error("payload error message: {}", errorMessage.getPayload().getMessage());
Message<?> originalMessage = errorMessage.getOriginalMessage();
if (Objects.nonNull(originalMessage)) {
log.error("origin payload message:{}", originalMessage.getPayload());
}
log.error("error headers:{}", errorMessage.getHeaders());
}
/**
* input 全局异常异常处理
*/
@StreamListener(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)
public void globalInputErrorHandler(ErrorMessage errorMessage) {
log.info("current error is input error handler ");
log.error("payload error message: {}", errorMessage.getPayload().getMessage());
Message<?> originalMessage = errorMessage.getOriginalMessage();
if (Objects.nonNull(originalMessage)) {
log.error("origin payload message:{}", originalMessage.getPayload());
}
log.error("error headers:{}", errorMessage.getHeaders());
}
}
package cn.dankal.test.cloud.consumer.entity;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.fasterxml.jackson.datatype.jsr310.deser.LocalDateTimeDeserializer;
import com.fasterxml.jackson.datatype.jsr310.ser.LocalDateTimeSerializer;
import lombok.Data;
import lombok.experimental.Accessors;
import org.springframework.kafka.support.serializer.JsonSerializer;
import java.io.Serializable;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.util.Date;
/**
* @author ZGH.MercyModest
* @version V1.0.0
* @create 2021-01-28
*/
@Data
@Accessors(chain = true)
public class DankalSimpleMessage implements Serializable {
private static final long serialVersionUID = -81606093254258919L;
/**
* message is
*/
private Integer id;
/**
* message key
*/
private String key;
/**
* message value
*/
private String content;
/**
* message send time
* <pre>
* 因为 {@link JsonSerializer} 默认使用支持 {@link Date} 的序列化和反序列化对于Java8提供的诸如
* {@link LocalDateTime}
* {@link LocalDate}
* {@link LocalTime}
* 新的时间API需要通过注解指定相关序列化和反序列化器
* </pre>
*/
@JsonSerialize(using = LocalDateTimeSerializer.class)
@JsonDeserialize(using = LocalDateTimeDeserializer.class)
private LocalDateTime sendTime;
}
package cn.dankal.test.cloud.consumer.input;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;
/**
* @author ZGH.MercyModest
* @version V1.0.0
* @create 2021-02-01
*/
public interface DankalMessageInput {
/***
* input name
*/
String INPUT_NAME = "kafka-dankal-message-input";
/**
* 订阅 Kafka 管道消息
*
* @return SubscribableChannel
*/
@Input(INPUT_NAME)
SubscribableChannel dankalMessageInputChannel();
}
server:
port: 9003
spring:
cloud:
stream:
bindings:
# 需要对应 @Input 注解
kafka-dankal-message-input:
# 消费来源
destination: cloud-kafka-dankal-message-topic
# 消息内容格式
content-type: application/json
# 消费者组
group: cloud-kafka-dankal-message-consumer-group
# 消费者配置
# org.springframework.cloud.stream.binder.ConsumerProperties
consumer:
# 开始多个线程(一个线程一个消费者)
concurrency: 3
# 重试次数 default:3
maxAttempts: 4
# 重试时间间隔的初始值 default: 1000 单位: millisecond
backOffInitialInterval: 3000
# 重试间隔递增系数: default: 2.0
backOffMultiplier: 2.0
# 重试时间间隔的最大值: default 10000 单位: millisecond
backOffMaxInterval: 10000
kafka:
binder:
# kafka broker 地址,多地址逗号分隔
brokers: 192.168.128.3:9092,192.168.128.3:9093,192.168.128.3:9094
bindings:
kafka-dankal-message-input:
consumer:
# 开启死信队列
# default: false
enableDlq: true
# 死信队列名称: 默认为 error.{topicName}
#dlqName:
# 是否取消自动提交
# default: true
autoCommitOffset: false
# 是否每消费一条消息 提交一次偏移量
# default: false 批量提交
# 即 全部 消费完 单次 poll 的消息后,一次性提交偏移量
ackEachRecord: true
#日志级别控制
logging:
level:
org:
apache:
kafka: error
spring:
application:
name: dankal-test-cloud-stream-kafka-consumer
profiles:
active: native
\ No newline at end of file
description 'dankal-test-kafka-spring-cloud-producer'
version '1.0.0.RELEASE'
dependencies {
// spring boot dependence
implementation 'org.springframework.boot:spring-boot-starter-web'
// spring cloud dependence
implementation 'org.springframework.cloud:spring-cloud-starter-stream-kafka'
}
\ No newline at end of file
package cn.dankal.test.cloud.producer;
import cn.dankal.test.cloud.producer.output.KafkaDankalMessageOutput;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
/**
* @author ZGH.MercyModest
* @version V1.0.0
* @create 2021-01-30
*/
@SpringBootApplication
@EnableBinding({KafkaDankalMessageOutput.class})
public class SpringCloudStreamKafkaProducerApplication {
public static void main(String[] args) {
SpringApplication.run(SpringCloudStreamKafkaProducerApplication.class, args);
}
}
package cn.dankal.test.cloud.producer.common;
import cn.hutool.core.util.StrUtil;
/**
* @author ZGH.MercyModest
* @version V1.0.0
* @create 2021-01-28
*/
public class Common {
/**
* 获取 Spring Kafka Consumer 线程名称
*
* @param rowThreadName 未处理的 thread name
* @return 处理后的线程名称
*/
public static String getSpringKafkaConsumerName(String rowThreadName) {
if (StrUtil.isBlank(rowThreadName)) {
return rowThreadName;
}
return rowThreadName.split("#")[1];
}
}
package cn.dankal.test.cloud.producer.common;
import java.time.format.DateTimeFormatter;
/**
* @author ZGH.MercyModest
* @version V1.0.0
* @create 2021-01-21
*/
public interface Const {
/**
* random base string
*/
String RANDOM_BASE_STRING = "zxcvbnmasdfghjklqwertyuiopQAZWSXEDCRFVTGBYHNIUJKLMOP1234567890-=[],.;'";
/**
* format pattern: yyyy-MM-dd HH:mm:ss.SSS
*/
DateTimeFormatter FULL_DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
}
package cn.dankal.test.cloud.producer.controller;
import cn.dankal.test.cloud.producer.entity.DankalSimpleMessage;
import cn.dankal.test.cloud.producer.output.KafkaDankalMessageOutput;
import cn.hutool.core.util.RandomUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;
import java.time.LocalDateTime;
/**
* @author ZGH.MercyModest
* @version V1.0.0
* @create 2021-02-01
*/
@Slf4j
@RestController
public class SendMessageController {
/**
* KafkaDankalMessageOutput
*/
private final KafkaDankalMessageOutput kafkaDankalMessageOutput;
public SendMessageController(KafkaDankalMessageOutput kafkaDankalMessageOutput) {
this.kafkaDankalMessageOutput = kafkaDankalMessageOutput;
}
@GetMapping("/json/{count}")
public String sendAssignCountMessage(@PathVariable("count") Integer count) {
if (count <= 0) {
count = 10;
}
for (int i = 0; i < count; i++) {
final String key = "dankal-cloud-kafka-" + RandomUtil.randomInt(100, 100000);
final DankalSimpleMessage dankalSimpleMessage = new DankalSimpleMessage()
.setId(RandomUtil.randomInt(10, 1000000))
.setContent(RandomUtil.randomString(15))
.setSendTime(LocalDateTime.now())
.setKey(key);
Message<DankalSimpleMessage> dankalSimpleMessageMessage = MessageBuilder.withPayload(dankalSimpleMessage).build();
kafkaDankalMessageOutput.kafkaDankalMessageOutput()
.send(dankalSimpleMessageMessage);
}
return String.valueOf(count);
}
@GetMapping("/condition/{count}")
public String sendAssignCountConditionMessage(@PathVariable("count") Integer count) {
if (count <= 0) {
count = 10;
}
int setHeaderTagCount = 0;
for (int i = 0; i < count; i++) {
final String key = "dankal-cloud-kafka-" + RandomUtil.randomInt(100, 100000);
final DankalSimpleMessage dankalSimpleMessage = new DankalSimpleMessage()
.setId(RandomUtil.randomInt(10, 1000000))
.setContent(RandomUtil.randomString(15))
.setSendTime(LocalDateTime.now())
.setKey(key);
MessageBuilder<DankalSimpleMessage> messageBuilder = MessageBuilder
.withPayload(dankalSimpleMessage);
if (RandomUtil.randomBoolean()) {
// 随机添加 message header tag
messageBuilder.setHeader("messageTag", "dankal-condition");
setHeaderTagCount++;
}
Message<DankalSimpleMessage> dankalSimpleMessageMessage = messageBuilder
.build();
kafkaDankalMessageOutput.kafkaDankalMessageOutput()
.send(dankalSimpleMessageMessage);
}
log.info("共发送 {} 条消息,其中 {} 条 带有 header tag", count, setHeaderTagCount);
return String.valueOf(count);
}
}
package cn.dankal.test.cloud.producer.entity;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.fasterxml.jackson.datatype.jsr310.deser.LocalDateTimeDeserializer;
import com.fasterxml.jackson.datatype.jsr310.ser.LocalDateTimeSerializer;
import lombok.Data;
import lombok.experimental.Accessors;
import org.springframework.kafka.support.serializer.JsonSerializer;
import java.io.Serializable;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.util.Date;
/**
* @author ZGH.MercyModest
* @version V1.0.0
* @create 2021-01-28
*/
@Data
@Accessors(chain = true)
public class DankalSimpleMessage implements Serializable {
private static final long serialVersionUID = -2297003238834215250L;
/**
* message is
*/
private Integer id;
/**
* message key
*/
private String key;
/**
* message value
*/
private String content;
/**
* message send time
* <pre>
* 因为 {@link JsonSerializer} 默认使用支持 {@link Date} 的序列化和反序列化对于Java8提供的诸如
* {@link LocalDateTime}
* {@link LocalDate}
* {@link LocalTime}
* 新的时间API需要通过注解指定相关序列化和反序列化器
* </pre>
*/
@JsonSerialize(using = LocalDateTimeSerializer.class)
@JsonDeserialize(using = LocalDateTimeDeserializer.class)
private LocalDateTime sendTime;
}
package cn.dankal.test.cloud.producer.output;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
/**
* @author ZGH.MercyModest
* @version V1.0.0
* @create 2021-02-01
*/
public interface KafkaDankalMessageOutput {
/**
* spring-cloud-stream-kafka: 测试 JSON 消息
*
* @return MessageChannel
*/
@Output("kafka-dankal-message-output")
MessageChannel kafkaDankalMessageOutput();
}
server:
port: 9002
spring:
cloud:
stream:
# Bindings 配置项
# org.springframework.cloud.stream.config.BinderProperties
bindings:
# 消息生产管道名称
# 需要对应 @Output 注解
# cn.dankal.test.cloud.producer.output.KafkaDankalMessageOutput
kafka-dankal-message-output:
# 消息目的地
destination: cloud-kafka-dankal-message-topic
# 消息内容格式: json
content-type: application/json
# spring cloud stream kafka 配置项
kafka:
# org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties
binder:
# kafka broker 地址,多地址逗号分隔
brokers: 192.168.128.3:9092,192.168.128.3:9093,192.168.128.3:9094
bindings:
# 消息生产管道名称
# 需要对应 @Output 注解
# cn.dankal.test.cloud.producer.output.KafkaDankalMessageOutput
kafka-dankal-message-output:
# kafka producer 配置项
# org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties
producer:
# 是否同步发送消息
# false: 采用异步的形式发送消息
sync: true
# 日志输出级别控制
logging:
level:
org:
apache:
kafka: error
spring:
application:
name: dankal-test-cloud-stream-kafka-producer
profiles:
active: native
\ No newline at end of file
package cn.dankal.test.commandline;
import cn.dankal.test.common.Const;
import cn.dankal.test.service.KafkaMessageService;
import cn.hutool.core.util.RandomUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
import java.util.concurrent.TimeUnit;
/**
* @author ZGH.MercyModest
* @version V1.0.0
* @create 2021-01-22
*/
@Slf4j
@Component
public class KafkaMessageSendCommandLineRunner implements CommandLineRunner {
/**
* KafkaMessageService
*/
private final KafkaMessageService kafkaMessageService;
public KafkaMessageSendCommandLineRunner(KafkaMessageService kafkaMessageService) {
this.kafkaMessageService = kafkaMessageService;
}
@Override
public void run(String... args) throws Exception {
while (true) {
final String topic = Const.Kafka.Topic.TOPIC_SIMPLE_STRING_MESSAGE;
final String key = "dankal-test-" + RandomUtil.randomInt(100, 100000000);
final String message = "蛋壳创意科技-" + RandomUtil.randomString(Const.RANDOM_BASE_STRING, 1000);
TimeUnit.MILLISECONDS.sleep(230);
boolean sendSuccess = kafkaMessageService.sendStringMessage(topic, key, message);
if (sendSuccess) {
System.out.println("消息发送成功~");
if (log.isDebugEnabled()) {
log.debug("成功发送消息: key: {} value: {}", key, message);
}
}
}
}
}
package cn.dankal.test.service.impl;
import cn.dankal.test.service.KafkaMessageService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Repository;
/**
* @author ZGH.MercyModest
* @version V1.0.0
* @create 2021-01-21
*/
@Slf4j
@Repository
public class KafkaMessageServiceImpl implements KafkaMessageService {
/**
* kafkaTemplate
*/
private final KafkaTemplate<String, String> kafkaTemplate;
public KafkaMessageServiceImpl(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
@Override
public boolean sendStringMessage(String topic, String key, String message) {
kafkaTemplate.send(topic, key, message)
.addCallback(System.out::println, Throwable::printStackTrace);
return true;
}
}
server:
port: 9001
spring:
kafka:
bootstrap-servers:
- 192.168.128.3:9092
- 192.168.128.3:9093
- 192.168.128.3:9094
producer:
retries: 5
listener:
ack-mode: manual_immediate
concurrency: 3
ack-count: 3
type: single
ack-time: 60
template:
default-topic: dankal-test-default-topic
consumer:
enable-auto-commit: false
logging:
level:
cn.dankal.test: info
\ No newline at end of file
package cn.dankal.test.service;
import cn.dankal.test.KafkaTestApplication;
import cn.dankal.test.common.Const;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
/**
* @author ZGH.MercyModest
* @version V1.0.0
* @create 2021-01-21
*/
@RunWith(SpringRunner.class)
@SpringBootTest(classes = {KafkaTestApplication.class})
public class KafkaMessageServiceTest {
/**
* KafkaMessageService
*/
@Autowired
private KafkaMessageService kafkaMessageService;
/**
* 测试 发送 string 类型的 Kafka 消息
*/
@Test
public void testSendStringMessageToKafka() {
final String topic = Const.Kafka.Topic.TOPIC_SIMPLE_STRING_MESSAGE;
final String key = "dankal-test";
final String message = "Hello,Kafka~(蛋壳创意科技)";
boolean sendSuccess = kafkaMessageService.sendStringMessage(topic, key, message);
if (sendSuccess) {
System.out.println("消息发送成功");
}
}
}
......@@ -12,5 +12,7 @@ pluginManagement {
}
rootProject.name = 'dankal-test-kafka'
include 'dankal-test-kafka-protogenesis'
include 'dankal-test-kafka-spring'
include 'dankal-test-kafka-spring-boot'
include 'dankal-test-kafka-spring-cloud-producer'
include 'dankal-test-kafka-spring-cloud-consumer'
# 蛋壳创意科技-Kafka负载再均衡示例小析
# 蛋壳创意科技-Kafka负载再均衡示例小析
......@@ -41,7 +41,7 @@
```java
package cn.dankal.test.rebalance;
import cn.dankal.test.common.Const;
import cn.dankal.test.boot.common.Const;
import cn.hutool.core.collection.CollectionUtil;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.KafkaConsumer;
......@@ -194,7 +194,7 @@ public class RebalanceListener implements ConsumerRebalanceListener {
```java
package cn.dankal.test.rebalance;
import cn.dankal.test.common.Const;
import cn.dankal.test.boot.common.Const;
import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.map.MapUtil;
import org.apache.kafka.clients.consumer.*;
......@@ -355,7 +355,7 @@ public class ConsumerWorker {
```java
package cn.dankal.test.rebalance;
import cn.dankal.test.common.Const;
import cn.dankal.test.boot.common.Const;
import cn.hutool.core.util.RandomUtil;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
......
# 蛋壳创意科技-技术测试-Kafka
# 蛋壳创意科技-技术测试-Kafka
# 蛋壳创意科技-技术测试-Kafka
> company: [蛋壳创意科技](https://www.dankal.cn/)
>
> code: [dankal-test-kafka](http://git.dankal.cn/mercyModest/dankal-test-kafka.git)
>
> official website: **[Kafka](https://kafka.apache.org/)**
>
......@@ -381,7 +383,7 @@
```java
package cn.dankal.test.concurrent;
import cn.dankal.test.common.Const;
import cn.dankal.test.boot.common.Const;
import cn.dankal.test.factory.ThreadExecutorFactory;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
......@@ -695,7 +697,7 @@ public interface Partitioner extends Configurable, Closeable {
```java
package cn.dankal.test.consumergroup;
import cn.dankal.test.common.Const;
import cn.dankal.test.boot.common.Const;
import cn.dankal.test.factory.ThreadExecutorFactory;
import cn.hutool.core.collection.CollectionUtil;
import org.apache.kafka.clients.consumer.ConsumerConfig;
......@@ -1271,7 +1273,7 @@ implements org.apache.kafka.common.serialization.Deserializer
```java
package cn.dankal.test.independentconsumer;
import cn.dankal.test.common.Const;
import cn.dankal.test.boot.common.Const;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
......
# 蛋壳创意科技-技术测试-Kafka集成SpringBoot
# 蛋壳创意科技-技术测试-Kafka集成SpringBoot
> company: [蛋壳创意科技](https://www.dankal.cn/)
>
> code: [dankal-test-kafka](http://git.dankal.cn/mercyModest/dankal-test-kafka.git)
>
> spring-kafka-doc:
>
> - **html : [spring-kafka-2.2.8.RELEASE-html](https://docs.spring.io/spring-kafka/docs/2.2.8.RELEASE/reference/html/)**
> - **pdf: [spring-kafka-2.2.8.RELEASE-pdf](https://docs.spring.io/spring-kafka/docs/2.2.8.RELEASE/reference/pdf/index.pdf)**
## 测试用例依赖版本
> - ==spring-boot: 2.1.8.RELEASE==
> - spring-kafka: 2.2.8.RELEASE
> - kafka-clients: 2.0.1
>
> 其实 `spring-kafka`和`kafka-clients`的版本被统一管理了,我们只需要关注 spring boot 版本即可
>
> - jdk: 1.8.0_241
> - gradle: Gradle 6.7.1
>
> ### Kafka 服务相关
>
> - kafka server: 2.7.0 [集群:3]
> - zookeeper: 2.4.0 [集群: 3]
## spring-kafka: Hello World
具体可以参阅: [dankal-test-kafka-build.gradle](https://git.dankal.cn/mercyModest/dankal-test-kafka/blob/master/build.gradle)
### 添加依赖
```groovy
dependencies {
// spring dependence
implementation 'org.springframework.boot:spring-boot-starter-web'
implementation 'org.springframework.kafka:spring-kafka'
testImplementation('org.springframework.boot:spring-boot-starter-test') {
exclude group: 'org.junit.vintage', module: 'junit-vintage-engine'
}
}
```
### 主配置文件: `application-native.yml`
```yml
server:
port: 9001
spring:
kafka:
# kafka集群数量
# 用于配置 replicationFactor : 显示创建 topic 时,设置的 partition 副本基数
broker-count: 3
# Kafka 集群信息
bootstrap-servers:
- 192.168.128.3:9092
# - 192.168.128.3:9093
# - 192.168.128.3:9094
listener:
# 消费者确认模式: 手动
ack-mode: manual_immediate
# 开启多少个消费者线程
concurrency: 3
# 一次确认消费的数量
ack-count: 1
type: single
ack-time: 60
template:
# 默认 topic 名称
default-topic: dankal-test-default-topic
consumer:
# 采用手动提交
enable-auto-commit: false
# 消息使用 Spring 提供的 JSON 发序列化器
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
# 设置为最初的消费进度
# 其取值具体区别可以参阅
# https://blog.csdn.net/lishuangzhe7047/article/details/74530417
auto-offset-reset: earliest
properties:
# 默认信任 java.util,java.lang 包下面的类
# 因此我们需要添加 spring json trusted 配置
spring:
json:
trusted:
packages: cn.dankal.test.boot.entity
producer:
# 发送失败,重试次数
retries: 5
# 只需要 leader 应答即可
acks: 1
# 消息内容使用 Spring 提供的 JSON 序列化器
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
logging:
level:
cn:
dankal:
test: debug
# 日志太多了,方便观察测试日志 --> 先调高日志级别 ==> error
org:
apache:
kafka: error
springframework:
kafka: error
```
### 使用配置类创建需要的 topic
```java
package cn.dankal.test.boot.config;
import cn.dankal.test.boot.common.Const;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.util.StringUtils;
import java.util.HashMap;
import java.util.Map;
/**
* @author ZGH.MercyModest
* @version V1.0.0
* @create 2021-01-22
*/
@Configuration
public class KafkaTopicConfig {
/**
* KafkaProperties
*/
private final KafkaProperties kafkaProperties;
/**
* Kafka 集群数量
*/
@Value("${spring.kafka.broker-count}")
private Short brokerCount;
public KafkaTopicConfig(KafkaProperties kafkaProperties) {
this.kafkaProperties = kafkaProperties;
}
@Bean
public KafkaAdmin admin() {
Map<String, Object> configs = new HashMap<>(1 << 5);
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
StringUtils.arrayToCommaDelimitedString(kafkaProperties.getBootstrapServers().toArray()));
return new KafkaAdmin(configs);
}
/**
* 新建一个kafka主题: hello-spring-kafka 如果需要话
*
* @return NewTopic
*/
@Bean
public NewTopic helloSpringKafkaTopic() {
return new NewTopic(
Const.Kafka.Topic.TOPIC_HELLO_SPRING_KAFKA,
kafkaProperties.getListener().getConcurrency(),
brokerCount);
}
/**
* 新建一个kafka主题: simple-string-message 如果需要话
*
* @return NewTopic
*/
@Bean
public NewTopic simpleStringKafkaTopic() {
return new NewTopic(
Const.Kafka.Topic.TOPIC_SIMPLE_STRING_MESSAGE,
kafkaProperties.getListener().getConcurrency(),
brokerCount);
}
}
```
### 消息传递实体
```java
package cn.dankal.test.boot.entity;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.fasterxml.jackson.datatype.jsr310.deser.LocalDateTimeDeserializer;
import com.fasterxml.jackson.datatype.jsr310.ser.LocalDateTimeSerializer;
import lombok.Data;
import lombok.experimental.Accessors;
import org.springframework.kafka.support.serializer.JsonSerializer;
import java.io.Serializable;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.util.Date;
/**
* @author ZGH.MercyModest
* @version V1.0.0
* @create 2021-01-28
*/
@Data
@Accessors(chain = true)
public class DankalSimpleMessage implements Serializable {
private static final long serialVersionUID = -81606093254258919L;
/**
* message is
*/
private Integer id;
/**
* message key
*/
private String key;
/**
* message value
*/
private String content;
/**
* message send time
* <pre>
* 因为 {@link JsonSerializer} 默认使用支持 {@link Date} 的序列化和反序列化对于Java8提供的诸如
* {@link LocalDateTime}
* {@link LocalDate}
* {@link LocalTime}
* 新的时间API需要通过注解指定相关序列化和反序列化器
* </pre>
*/
@JsonSerialize(using = LocalDateTimeSerializer.class)
@JsonDeserialize(using = LocalDateTimeDeserializer.class)
private LocalDateTime sendTime;
}
```
### 使用SpringKafka通过三种形式发送消息
#### **类的头部**
```java
/**
* KafkaTemplate<String, DankalSimpleMessage>
*/
private final KafkaTemplate<String, DankalSimpleMessage> kafkaTemplate;
/**
* DankalMessageSuccessCallback
*/
private final DankalMessageSuccessCallback dankalMessageSuccessCallback;
/**
* DankalMessageFailureCallback
*/
private final AsyncSendMessageFailureCallback asyncSendMessageFailureCallback;
public KafkaDankalMessageServiceImpl(KafkaTemplate<String, DankalSimpleMessage> kafkaTemplate, DankalMessageSuccessCallback dankalMessageSuccessCallback, AsyncSendMessageFailureCallback asyncSendMessageFailureCallback) {
this.kafkaTemplate = kafkaTemplate;
this.dankalMessageSuccessCallback = dankalMessageSuccessCallback;
this.asyncSendMessageFailureCallback = asyncSendMessageFailureCallback;
}
```
#### **发送并忘记**
```java
@Override
public boolean sendMessageBySimple(String topic, String key, DankalSimpleMessage dankalSimpleMessage) {
// 发送并忘记
kafkaTemplate.send(topic, key, dankalSimpleMessage);
return true;
}
```
#### **同步发送**
```java
@Override
public boolean sendMessageBySync(String topic, String key, DankalSimpleMessage dankalSimpleMessage) throws ExecutionException, InterruptedException {
// 同步发送
SendResult<String, DankalSimpleMessage> sendResult = kafkaTemplate.send(topic, key, dankalSimpleMessage).get();
return sendResult.getRecordMetadata().offset() > 0;
}
```
#### **异步发送**
```java
@Override
public boolean sendMessageByAsync(String topic, String key, DankalSimpleMessage dankalSimpleMessage) {
// 异步发送
kafkaTemplate
.send(topic, key, dankalSimpleMessage)
// 添加异步回调处理
.addCallback(dankalMessageSuccessCallback, asyncSendMessageFailureCallback);
return true;
}
```
**对于异步发送,我们可以设置回调函数**
##### 举个栗子
###### 成功回调
```java
package cn.dankal.test.boot.component;
import cn.dankal.test.boot.entity.DankalSimpleMessage;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.SuccessCallback;
/**
* @author ZGH.MercyModest
* @version V1.0.0
* @create 2021-01-28
*/
@Slf4j
@Component
public class DankalMessageSuccessCallback implements SuccessCallback<SendResult<String, DankalSimpleMessage>> {
@Override
public void onSuccess(SendResult<String, DankalSimpleMessage> result) {
if (null == result) {
if (log.isWarnEnabled()) {
log.warn("current result is null!!!");
}
return;
}
RecordMetadata recordMetadata = result.getRecordMetadata();
if (log.isDebugEnabled() && null != recordMetadata) {
log.debug(
"成功发送消息:主题: {}, 分区: {},偏移量: {} ,key: {}, value: {}",
recordMetadata.topic(),
recordMetadata.partition(),
recordMetadata.offset(),
result.getProducerRecord().key(),
result.getProducerRecord().value()
);
}
}
}
```
###### 失败回调
```java
package cn.dankal.test.boot.component;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.FailureCallback;
/**
* @author ZGH.MercyModest
* @version V1.0.0
* @create 2021-01-28
*/
@Slf4j
@Component
public class AsyncSendMessageFailureCallback implements FailureCallback {
@Override
public void onFailure(Throwable ex) {
Throwable exCause = ex.getCause();
log.error("异步:发送消息失败 ==>异常原因:{} 异常消息: {}", exCause != null ? exCause.getMessage() : "", ex.getMessage());
}
}
```
### SpringKafka对消息的监听(消费)
> 具体的监听方法的参数形式,可以参阅文档
>
> [receiving-messages](https://docs.spring.io/spring-kafka/docs/2.2.8.RELEASE/reference/html/#receiving-messages)
>
> 传输截图
>
> ![](https://img.mercymodest.com/public/20210129100450.png)
>
> ![](https://img.mercymodest.com/public/20210129100504.png)
#### 举个栗子
```java
package cn.dankal.test.boot.listener;
import cn.dankal.test.boot.common.Common;
import cn.dankal.test.boot.common.Const;
import cn.dankal.test.boot.entity.DankalSimpleMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
/**
* @author ZGH.MercyModest
* @version V1.0.0
* @create 2021-01-21
*/
@Slf4j
@Component
public class KafkaDankalMessageListener {
@KafkaListener(
topics = {Const.Kafka.Topic.TOPIC_HELLO_SPRING_KAFKA},
groupId = Const.Kafka.ConsumerGroupId.GROUP_ID_HELLO_SPRING_KAFKA_CONSUMER
)
public void dankalMessageListener(DankalSimpleMessage dankalSimpleMessage,
Acknowledgment acknowledgment) {
if (log.isInfoEnabled()) {
log.info("{} 执行线程: {} 消费消息 ==> 主题: {}",
Const.FULL_DATE_TIME_FORMATTER.format(LocalDateTime.now()),
Common.getSpringKafkaConsumerName(Thread.currentThread().getName()),
dankalSimpleMessage);
}
acknowledgment.acknowledge();
}
}
```
### 小小节
以上呢就是Spring-Kafka 的基本使用示例啦,具体可以参阅官方文档
**传送门: [Using Spring for Apache Kafka](https://docs.spring.io/spring-kafka/docs/2.2.8.RELEASE/reference/html/#kafka)**
## spring-kafka: 消息异常处理机制示例
### 测试监听器
> ```java
> @KafkaListener(
> ... ...
> // 设置 errorHandler 的 spring bean 名称
> errorHandler = "kafkaStringMessageListenerErrorHandler"
> )
> // 将 errorHandler 的处理方法返回值发送到那个主题
> @SendTo({Const.Kafka.Topic.TOPIC_HANDLER_STRING_MESSAGE_ERROR_MESSAGE})
> public String testStringMessageHandlerError(String message, Acknowledgment acknowledgment)
> ```
```java
package cn.dankal.test.boot.listener;
import cn.dankal.test.boot.common.Common;
import cn.dankal.test.boot.common.Const;
import cn.dankal.test.boot.handler.KafkaStringMessageListenerErrorHandler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
/**
* @author ZGH.MercyModest
* @version V1.0.0
* @create 2021-01-29
*/
@Slf4j
@Component("testErrorHandler")
public class KafkaListenerErrorHandlerTestListener {
/**
* <pre>
* 此监听器用来测试 spring kafka 对 消息消费异常的处理机制
* 此监听器接收的 String 类型的 message,但是其订阅 主题 所传递的消息是: DankalMessage
* 故将会抛出异常,以达到测试 errorHandler 的作用
* </pre>
*
* @param message 需要消息消息
* @param acknowledgment 消息确认器
* @return 异常消息
*/
@KafkaListener(
topics = {Const.Kafka.Topic.TOPIC_HELLO_SPRING_KAFKA},
groupId = Const.Kafka.ConsumerGroupId.GROUP_ID_TEST_STRING_MESSAGE_HANDLER_ERROR_CONSUMER,
errorHandler = "kafkaStringMessageListenerErrorHandler"
)
@SendTo({Const.Kafka.Topic.TOPIC_HANDLER_STRING_MESSAGE_ERROR_MESSAGE})
public String testStringMessageHandlerError(String message, Acknowledgment acknowledgment) {
if (log.isInfoEnabled()) {
log.info("{} 执行线程: {} 消费消息 ==> 主题: {}",
Const.FULL_DATE_TIME_FORMATTER.format(LocalDateTime.now()),
Common.getSpringKafkaConsumerName(Thread.currentThread().getName()),
message);
}
acknowledgment.acknowledge();
return "handler message: " + message + " success";
}
/**
* <pre>
* 此监听器 是用来处理 {@link #testStringMessageHandlerError(String, Acknowledgment)} 消费消息时出现异常时,
* {@link KafkaStringMessageListenerErrorHandler} 处理方法的方法返回值.
* </pre>
*
* @param message 异常处理方法返回值
* @param acknowledgment 消息确认器
*/
@KafkaListener(
topics = {Const.Kafka.Topic.TOPIC_HANDLER_STRING_MESSAGE_ERROR_MESSAGE},
groupId = Const.Kafka.ConsumerGroupId.GROUP_ID_TEST_STRING_MESSAGE_HANDLER_ERROR_CONSUMER
)
public void handlerStringMessageError(String message, Acknowledgment acknowledgment) {
if (log.isInfoEnabled()) {
log.info("handlerStringMessageError ==> {} 执行线程: {} 处理异常信息 ==> 异常消息: {}",
Const.FULL_DATE_TIME_FORMATTER.format(LocalDateTime.now()),
Common.getSpringKafkaConsumerName(Thread.currentThread().getName()),
message);
}
acknowledgment.acknowledge();
}
}
```
### 测试异常处理器
```java
package cn.dankal.test.boot.handler;
import cn.dankal.test.boot.common.Common;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
import org.springframework.kafka.listener.ConsumerAwareListenerErrorHandler;
import org.springframework.kafka.listener.ListenerExecutionFailedException;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.stereotype.Component;
/**
* {@link ConsumerAwareListenerErrorHandler}
*
* @author ZGH.MercyModest
* @version V1.0.0
* @create 2021-01-29
* @see SendTo
*/
@Slf4j
@Component
public class KafkaStringMessageListenerErrorHandler implements ConsumerAwareListenerErrorHandler {
@Override
public Object handleError(Message<?> message, ListenerExecutionFailedException exception, Consumer<?, ?> consumer) {
Throwable exceptionCause = exception.getCause();
String errorMessage = exception.getMessage();
if (null != exceptionCause) {
errorMessage = exceptionCause.getMessage();
}
// TODO 消息信息获取
// TODO org.springframework.kafka.support.KafkaHeaders
MessageHeaders messageHeaders = message.getHeaders();
// doc: The header containing the topic from which the message was received.
String receivedTopic = messageHeaders.get(KafkaHeaders.RECEIVED_TOPIC, String.class);
Integer partitionId = messageHeaders.get(KafkaHeaders.RECEIVED_PARTITION_ID, Integer.class);
Long offset = messageHeaders.get(KafkaHeaders.OFFSET, Long.class);
if (log.isDebugEnabled()) {
log.debug(
"线程 id: {} receivedTopic: {} partitionId: {} offset: {}",
Common.getSpringKafkaConsumerName(Thread.currentThread().getName()),
receivedTopic,
partitionId,
offset
);
}
// TODO 处理异常消息,做一些补偿操作
// TODO 诸如 使用 Consumer#seek(TopicPartition partition, long offset) 重置 偏移量
// TODO 让消费者重新消费
if (log.isInfoEnabled()) {
log.info(
"{} 处理字符串消息: {} 出现异常:{} ",
Thread.currentThread().getName(),
message.getPayload(),
errorMessage);
}
// TODO 因为测试用例 使用 JsonSerializer 处理的数据 无法实现 使用 String 消费
// 异常摘要: Cannot convert from [cn.dankal.test.boot.entity.DankalSimpleMessage] to [java.lang.String]
// 故在此出选择提交当前消息的偏移量 提交避免重复通知
consumer.commitSync();
// @SendTo
// 此方法返回将会被 SendTo 注解作为消息发送到指定主题
return "出现异常啦: " + errorMessage + " !!!";
}
}
```
### 小说明
#### 关于 spring-kafka 的 异常处理
> **详情可以参阅官方文档: [Handling Exceptions](https://docs.spring.io/spring-kafka/docs/2.2.8.RELEASE/reference/html/#annotation-error-handling)**
#### 关于 spring-kafka 的 @SendTo
> **详情可以参阅官方文档:[Forwarding Listener Results using `@SendTo`](https://docs.spring.io/spring-kafka/docs/2.2.8.RELEASE/reference/html/#annotation-send-to)**
## spring-kafka: 批量发送消息
### spring-kafka 批量发送配置
> - 【数量】`batch-size` :超过收集的消息数量的最大条数
> - `spring.kafka.producer.batch-size`
> - 【空间】`buffer-memory` :超过收集的消息占用的最大内存
> - `spring.kafka.producer.buffer-memory`
> - 【时间】`linger.ms` :超过收集的时间的最大等待时长,单位:毫秒
> - `spring.kafka.producer.properties.linger.ms`
### etc.
**为了测试方便,测试用例使用的 `linger.ms`来测试 消息的批量发送的,设置的值为: ==30s==**
```yaml
server:
port: 9001
... ...
spring:
kafka:
... ...
producer:
... ...
## 以下配置用于测试 spring-kafka 的批量发送消息
batch-size: 16384
buffer-memory: 33554432
properties:
# 超过收集的时间的最大等待时长
linger:
ms: 30000
... ...
```
### controller
```java
@GetMapping("/test/string")
public String testSendStringMessage() {
final String topic = Const.Kafka.Topic.TOPIC_SIMPLE_STRING_MESSAGE;
final String key = "dankal-test-str-" + RandomUtil.randomInt(10, 500);
final String message = RandomUtil.randomString(20);
kafkaStringMessageService.sendStringMessage(topic, key, message);
return "success";
}
```
### service
```java
@Override
public boolean sendStringMessage(String topic, String key, String message) {
stringKafkaTemplate.send(topic, key, message);
return true;
}
```
### listener
```java
@KafkaListener(
topics = {Const.Kafka.Topic.TOPIC_SIMPLE_STRING_MESSAGE},
groupId = Const.Kafka.ConsumerGroupId.GROUP_ID_SIMPLE_STRING_CONSUMER
)
public void stringMessageListener(String message,
Acknowledgment acknowledgment) {
if (log.isInfoEnabled()) {
log.info("{} 执行线程: {} 消费消息 ==> 主题: {}",
Const.FULL_DATE_TIME_FORMATTER.format(LocalDateTime.now()),
Common.getSpringKafkaConsumerName(Thread.currentThread().getName()),
message);
}
acknowledgment.acknowledge();
}
```
### 执行测试
浏览器==间隔==发送三次请求
> http://localhost:9001/test/string
>
> http://localhost:9001/test/string
>
> http://localhost:9001/test/string
### console 控制台打印
```shell
2021-01-29 16:01:25.099 INFO 11620 --- [nio-9001-exec-1] o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring DispatcherServlet 'dispatcherServlet'
2021-01-29 16:01:25.099 INFO 11620 --- [nio-9001-exec-1] o.s.web.servlet.DispatcherServlet : Initializing Servlet 'dispatcherServlet'
2021-01-29 16:01:25.103 INFO 11620 --- [nio-9001-exec-1] o.s.web.servlet.DispatcherServlet : Completed initialization in 4 ms
2021-01-29 16:01:55.258 INFO 11620 --- [ntainer#0-1-C-1] c.d.t.b.l.KafkaDankalMessageListener : 2021-01-29 16:01:55.255 执行线程: 0-1-C-1 消费消息 ==> 主题: DankalSimpleMessage(id=1, key=dankal-test-3297, content=cMFcbdTL6nimirb, sendTime=2021-01-29T16:01:25.132)
2021-01-29 16:01:55.258 INFO 11620 --- [ntainer#0-2-C-1] c.d.t.b.l.KafkaDankalMessageListener : 2021-01-29 16:01:55.255 执行线程: 0-2-C-1 消费消息 ==> 主题: DankalSimpleMessage(id=1, key=dankal-test-7358, content=G.jLqFk]cY6]-1p, sendTime=2021-01-29T16:01:38.445)
2021-01-29 16:01:55.260 INFO 11620 --- [ntainer#0-1-C-1] c.d.t.b.l.KafkaDankalMessageListener : 2021-01-29 16:01:55.260 执行线程: 0-1-C-1 消费消息 ==> 主题: DankalSimpleMessage(id=1, key=dankal-test-8936, content=44b[HSMoI;faqMI, sendTime=2021-01-29T16:01:32.286)
```
### 小节小述
从控制台打印我们可以发现,我们发送完请求的时间大约在 `2021-01-29 16:01:25`左右,但是呢消费者是在`2021-01-29 16:01:55`,不多不少,发送请消息和消费消息的时间间隔刚好是`30s`
## spring-kafka:批量消费消息
### spring-kafka 批量消费配置
> - listener.type : 监听者的模式
>
> - ```java
> public enum Type {
>
> /**
> * Invokes the endpoint with one ConsumerRecord at a time.
> */
> SINGLE,
>
> /**
> * Invokes the endpoint with a batch of ConsumerRecords.
> */
> BATCH
>
> }
> ```
>
> - `spring.kafka.listener.type`
>
> - consumer.max-poll-records: 单次调用poll()时返回的最大记录数
>
> - `spring.kafka.consumer.max-poll-records`
>
> - consumer.fetch-min-size: 服务器对一个取数请求应该返回的最小数据量
>
> - `spring.kafka.consumer.fetch-min-size`
>
> - consumer.fetch-max-wait:如果没有足够的数据来立即满足 `fetch-min-size`给出的要求,服务器在回应fetch请求之前的最大阻塞时间。
>
> - `spring.kafka.consumer.fetch-max-wait`
### etc.
**测试用例通过测试 `max-poll-records`来控制 批量消费的最大消费量**
```yaml
server:
port: 9001
... ...
spring:
kafka:
... ...
## 一下配置用于测试 spring-kafka 的批量消费
fetch-min-size: 16384
max-poll-records: 3
fetch-max-wait: 500
... ...
```
### controller
```java
@GetMapping("/string/batch/{count}")
public String testBatchSendMessage(@PathVariable("count") Integer batchCount) {
if (batchCount <= 0) {
batchCount = 10;
}
CyclicBarrier cyclicBarrier = new CyclicBarrier(batchCount);
for (int i = 0; i < batchCount; i++) {
THREAD_POOL_EXECUTOR.submit(() -> {
final String topic = Const.Kafka.Topic.TOPIC_SIMPLE_STRING_MESSAGE;
final String key = "dankal-test";
final String message = RandomUtil.randomString(20);
try {
// 保证一起发送
cyclicBarrier.await();
kafkaStringMessageService.sendStringMessage(topic, key, message);
} catch (Exception e) {
//ignored
}
});
}
return "success";
}
```
### service
```java
@Override
public boolean sendStringMessage(String topic, String key, String message) {
stringKafkaTemplate.send(topic, key, message);
return true;
}
```
### listener
```java
package cn.dankal.test.boot.listener;
import cn.dankal.test.boot.common.Common;
import cn.dankal.test.boot.common.Const;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
import java.util.List;
/**
* @author ZGH.MercyModest
* @version V1.0.0
* @create 2021-01-29
*/
@Slf4j
@Component
@ConditionalOnProperty(prefix = "spring.kafka",
name = {"listener.type"}, havingValue = "batch")
public class BatchStringKafkaMessageListener {
@KafkaListener(
topics = {Const.Kafka.Topic.TOPIC_SIMPLE_STRING_MESSAGE},
groupId = Const.Kafka.ConsumerGroupId.GROUP_ID_SIMPLE_STRING_CONSUMER
)
public void batchHandlerStringMessage(List<String> messageList, Acknowledgment acknowledgment) {
if (log.isDebugEnabled()) {
log.debug(
"{} 成功获取到 {} 条数据",
Const.FULL_DATE_TIME_FORMATTER.format(LocalDateTime.now()),
messageList.size()
);
}
for (String message : messageList) {
if (log.isInfoEnabled()) {
log.info(
"{} {} 成功消费消息: {}",
Const.FULL_DATE_TIME_FORMATTER.format(LocalDateTime.now()),
Common.getSpringKafkaConsumerName(Thread.currentThread().getName()),
message);
}
}
acknowledgment.acknowledge();
}
}
```
### 执行请求
**http://localhost:9001/string/batch/7**
### 运行结果
![](https://img.mercymodest.com/public/20210129174507.png)
## spring-kafka: 消息消费重试
**官方文档传送门: [dead-letters](https://docs.spring.io/spring-kafka/docs/2.2.8.RELEASE/reference/html/#dead-letters)**
> ​ Spring-Kafka 提供**消费重试**的机制。在消息**消费失败**的时候,Spring-Kafka 会通过**消费重试**机制,重新投递该消息给 Consumer ,让 Consumer 有机会重新消费消息,实现消费成功。
>
> ​ 当然,Spring-Kafka 并不会无限重新投递消息给 Consumer 重新消费,而是在默认情况下,达到 N 次重试次数时,Consumer 还是消费失败时,该消息就会进入到**死信队列**。
### 消息重试配置示例
```java
package cn.dankal.test.boot.config;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
import org.springframework.kafka.listener.ErrorHandler;
import org.springframework.kafka.listener.SeekToCurrentErrorHandler;
/**
* @author ZGH.MercyModest
* @version V1.0.0
* @create 2021-01-29
*/
@Configuration
public class KafkaBaseConfig {
/**
* retryCount
*/
@Value("${spring.kafka.retry-count:3}")
private Integer retryCount;
/**
* 配置 {@link SeekToCurrentErrorHandler} 用于消息消费重试
*
* @param template KafkaTemplate
* @return ErrorHandler
* @see DeadLetterPublishingRecoverer
*/
@Bean
@Primary
public ErrorHandler kafkaErrorHandler(KafkaTemplate<Object, Object> template) {
DeadLetterPublishingRecoverer deadLetterPublishingRecoverer = new DeadLetterPublishingRecoverer(template);
return new SeekToCurrentErrorHandler(deadLetterPublishingRecoverer, retryCount);
}
}
```
### controller
```java
@GetMapping("/test/string")
public String testSendStringMessage() {
final String topic = Const.Kafka.Topic.TOPIC_SIMPLE_STRING_MESSAGE;
final String key = "dankal-test-str-" + RandomUtil.randomInt(10, 500);
final String message = RandomUtil.randomString(20);
kafkaStringMessageService.sendStringMessage(topic, key, message);
return "success";
}
```
### service
```java
@Override
public boolean sendStringMessage(String topic, String key, String message) {
stringKafkaTemplate.send(topic, key, message);
return true;
}
```
### listener
```java
package cn.dankal.test.boot.listener;
import cn.dankal.test.boot.common.Common;
import cn.dankal.test.boot.common.Const;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
/**
* @author ZGH.MercyModest
* @version V1.0.0
* @create 2021-01-29
*/
@Slf4j
@Component("test-retry")
public class SpringKafkaRetryTestListener {
@KafkaListener(
topics = {Const.Kafka.Topic.TOPIC_SIMPLE_STRING_MESSAGE},
groupId = Const.Kafka.ConsumerGroupId.GROUP_ID_SIMPLE_STRING_CONSUMER
)
public void stringMessageListener(String message,
Acknowledgment acknowledgment) {
if (log.isInfoEnabled()) {
log.info("{} 执行线程: {} 消费消息 ==> 消息: {}",
Const.FULL_DATE_TIME_FORMATTER.format(LocalDateTime.now()),
Common.getSpringKafkaConsumerName(Thread.currentThread().getName()),
message);
}
throw new RuntimeException("throw an exception when consumer the message~");
//acknowledgment.acknowledge();
}
}
```
> Spring-Kafka 的消费重试功能,通过实现自定义的`SeekToCurrentErrorHandler` ,在 Consumer 消费消息异常的时候,进行拦截处理:
>
> - 在重试小于最大次数时,重新投递该消息给 Consumer ,让 Consumer 有机会重新消费消息,实现消费成功。
> - 在重试到达最大次数时,Consumer 还是消费失败时,该消息就会发送到死信队列。例如说,我们测试的 Topic 是 `"simple-string-message"` ,则其对应的死信队列的 Topic 就是 `"simple-string-message.DLT"` ,即在原有 Topic 加上 `.DLT` 后缀,就是其死信队列的 Topic 。
### 执行测试
> http://localhost:9001/test/string
### 执行结果
![](https://img.mercymodest.com/public/20210130095724.png)
> 我们在配置`SeekToCurrentErrorHandler` 设置的 `maxFailures` 为 3 ,即消费消息失败之后会最多只会重试 3 次.
## 小述 spring-kafka 的 顺序消费
> 我们先来一起了解下顺序消息的**顺序消息**的定义:
>
> - 普通顺序消息 :Producer 将相关联的消息发送到相同的消息队列。
> - 完全严格顺序 :在【普通顺序消息】的基础上,Consumer 严格顺序消费。
>
> 在上述的示例中,我们看到 Spring-Kafka 在 Consumer 消费消息时,**天然**就支持按照 Topic 下的 Partition 下的消息,**顺序消费**。即使在并发消息时,也能保证如此。
>
> 那么此时,我们只需要考虑将 Producer 将相关联的消息发送到 Topic 下的相同的 Partition 即可。如果我们了解 Producer 发送消息的分区策略的话,只要我们发送消息时,指定了消息的 key ,Producer 则会根据 key 的哈希值取模来获取到其在 Topic 下对应的 Partition 。
>
> 如果大家想深入理解,可以参阅博文: [Kafka 发送消息分区选择策略详解](https://leokongwq.github.io/2017/02/27/mq-kafka-producer-partitioner.html)
\ No newline at end of file
# 蛋壳创意科技-技术测试-Kafka集成SpringCloudStream
# 蛋壳创意科技-技术测试-Kafka集成SpringCloudStream
> company: [蛋壳创意科技](https://www.dankal.cn/)
>
> code: [dankal-test-kafka](http://git.dankal.cn/mercyModest/dankal-test-kafka.git)
>
> spring-kafka-doc: [spring-cloud-stream-2.1.x](https://github.com/spring-cloud/spring-cloud-stream/blob/2.1.x/docs/src/main/asciidoc/spring-cloud-stream.adoc)
>
## 梗概小述
[Spring Cloud Stream](https://github.com/spring-cloud/spring-cloud-stream/tree/2.1.x) 是一个用于构建基于**消息**的微服务应用框架,使用 Spring Integration与 Broker 进行连接。
> 啥是`Broker`?
>
> 一般来说,消息队列中间件都有一个 **Broker Server**(代理服务器),消息中转角色,负责存储消息、转发消息。
>
> 例如说在 RocketMQ 中,Broker 负责接收从生产者发送来的消息并存储、同时为消费者的拉取请求作准备。另外,Broker 也存储消息相关的元数据,包括消费者组、消费进度偏移和主题和队列消息等。
Spring Cloud Stream 提供了消息中间件的**统一抽象**,推出了 publish-subscribe、consumer groups、partition 这些统一的概念。
Spring Cloud Stream 内部有两个概念:**Binder****Binding**
① Binder,跟消息中间件集成的组件,用来创建对应的 Binding。各消息中间件都有自己的 Binder 具体实现。
```java
public interface Binder<T,
// 消费者配置
C extends ConsumerProperties,
// 生产者配置
P extends ProducerProperties> {
// 创建消费者的 Binding
Binding<T> bindConsumer(String name, String group, T inboundBindTarget, C consumerProperties);
// 创建生产者的 Binding
Binding<T> bindProducer(String name, T outboundBindTarget, P producerProperties);
}
```
② Binding,包括 Input Binding 和 Output Binding。Binding 在消息中间件与应用程序提供的 Provider 和 Consumer 之间提供了一个桥梁,实现了开发者只需使用应用程序的 Provider 或 Consumer 生产或消费数据即可,屏蔽了开发者与底层消息中间件的接触。
![](https://img.mercymodest.com/public/20210130110316.png)
## 测试用例依赖版本
> - spring-boot: 2.1.8.RELEASE
> - ==spring-cloud: Greenwich.SR5==
> - spring-kafka: 2.2.8.RELEASE
> - kafka-clients: 2.0.1
>
> 其实 `spring-kafka`和`kafka-clients`的版本被统一管理了,我们只需要关注 ==spring-cloud 版本即可
>
> - jdk: 1.8.0_241
> - gradle: Gradle 6.7.1
>
> ### Kafka 服务相关
>
> - kafka server: 2.7.0 [集群:3]
> - zookeeper: 2.4.0 [集群: 3]
## spring-cloud-stream-kafka: helloWorld
### dependence: producer/consumer
```groovy
dependencies {
// spring boot dependence
implementation 'org.springframework.boot:spring-boot-starter-web'
// spring cloud dependence
implementation 'org.springframework.cloud:spring-cloud-starter-stream-kafka'
}
```
#### 详情相关配置
> 请参阅:
>
> - 父工程配置: [parent-build.gradle](https://git.dankal.cn/mercyModest/dankal-test-kafka/blob/master/build.gradle)
> - producer: []()
> - consumer: []()
### 搭建生产者
**dankal-test-kafka-spring-cloud-producer**
#### 生产者主配置文件
```yaml
server:
port: 9002
spring:
cloud:
stream:
# Bindings 配置项
# org.springframework.cloud.stream.config.BinderProperties
bindings:
# 消息生产管道名称
# 需要对应 @Output 注解
# cn.dankal.test.cloud.producer.output.KafkaDankalMessageOutput
kafka-dankal-message-output:
# 消息目的地
destination: cloud-kafka-dankal-message-topic
# 消息内容格式: json
content-type: application/json
# spring cloud stream kafka 配置项
kafka:
# org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties
binder:
# kafka broker 地址,多地址逗号分隔
brokers: 192.168.128.3:9092,192.168.128.3:9093,192.168.128.3:9094
bindings:
# 消息生产管道名称
# 需要对应 @Output 注解
# cn.dankal.test.cloud.producer.output.KafkaDankalMessageOutput
kafka-dankal-message-output:
# kafka producer 配置项
# org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties
producer:
# 是否同步发送消息
# false: 采用异步的形式发送消息
sync: true
```
#### 创建 Output Binding
```java
package cn.dankal.test.cloud.producer.output;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
/**
* @author ZGH.MercyModest
* @version V1.0.0
* @create 2021-02-01
*/
public interface KafkaDankalMessageOutput {
/**
* spring-cloud-stream-kafka: 测试 JSON 消息
*
* @return MessageChannel
*/
@Output("kafka-dankal-message-output")
MessageChannel kafkaDankalMessageOutput();
}
```
#### controller
```java
package cn.dankal.test.cloud.producer.controller;
import cn.dankal.test.cloud.producer.entity.DankalSimpleMessage;
import cn.dankal.test.cloud.producer.output.KafkaDankalMessageOutput;
import cn.hutool.core.util.RandomUtil;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;
import java.time.LocalDateTime;
/**
* @author ZGH.MercyModest
* @version V1.0.0
* @create 2021-02-01
*/
@RestController
public class SendMessageController {
/**
* KafkaDankalMessageOutput
*/
private final KafkaDankalMessageOutput kafkaDankalMessageOutput;
public SendMessageController(KafkaDankalMessageOutput kafkaDankalMessageOutput) {
this.kafkaDankalMessageOutput = kafkaDankalMessageOutput;
}
@GetMapping("/json/{count}")
public String sendAssignCountMessage(@PathVariable("count") Integer count) {
if (count <= 0) {
count = 10;
}
for (int i = 0; i < count; i++) {
final String key = "dankal-cloud-kafka-" + RandomUtil.randomInt(100, 100000);
final DankalSimpleMessage dankalSimpleMessage = new DankalSimpleMessage()
.setId(RandomUtil.randomInt(10, 1000000))
.setContent(RandomUtil.randomString(15))
.setSendTime(LocalDateTime.now())
.setKey(key);
Message<DankalSimpleMessage> dankalSimpleMessageMessage = MessageBuilder.withPayload(dankalSimpleMessage).build();
kafkaDankalMessageOutput.kafkaDankalMessageOutput()
.send(dankalSimpleMessageMessage);
}
return String.valueOf(count);
}
}
```
#### 启动类
```java
package cn.dankal.test.cloud.producer;
import cn.dankal.test.cloud.producer.output.KafkaDankalMessageOutput;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
/**
* @author ZGH.MercyModest
* @version V1.0.0
* @create 2021-01-30
*/
@SpringBootApplication
@EnableBinding({KafkaDankalMessageOutput.class})
public class SpringCloudStreamKafkaProducerApplication {
public static void main(String[] args) {
SpringApplication.run(SpringCloudStreamKafkaProducerApplication.class, args);
}
}
```
##### tips:
> 我们需要使用 `@EnableBinding`注解 声明指定开始 Binding 功能 扫描其 `@Output`注解,测试用例这里使用的是 `KafkaDankalMessageOutput`
### 搭建消费者
**dankal-test-kafka-spring-cloud-consumer**
#### 消息者主配置文件
```yaml
server:
port: 9003
spring:
cloud:
stream:
bindings:
# 需要对应 @Input 注解
kafka-dankal-message-input:
# 消费来源
destination: cloud-kafka-dankal-message-topic
# 消息内容格式
content-type: application/json
# 消费者组
group: cloud-kafka-dankal-message-consumer-group
# 消费者配置
consumer:
# 开始多个线程(一个线程一个消费者)
concurrency: 3
kafka:
binder:
# kafka broker 地址,多地址逗号分隔
brokers: 192.168.128.3:9092,192.168.128.3:9093,192.168.128.3:9094
```
#### 创建 接口 声明 Input Binding
```java
package cn.dankal.test.cloud.consumer.input;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;
/**
* @author ZGH.MercyModest
* @version V1.0.0
* @create 2021-02-01
*/
public interface DankalMessageInput {
/***
* input name
*/
String INPUT_NAME = "kafka-dankal-message-input";
/**
* 订阅 Kafka 管道消息
*
* @return SubscribableChannel
*/
@Input(INPUT_NAME)
SubscribableChannel dankalMessageInputChannel();
}
```
#### 创建`@StreamListener`作为消费者消费对应的 Input Binding
etc. DankalMessageInput#INPUT_NAME
```java
package cn.dankal.test.cloud.consumer.consumer;
import cn.dankal.test.cloud.consumer.common.Common;
import cn.dankal.test.cloud.consumer.common.Const;
import cn.dankal.test.cloud.consumer.entity.DankalSimpleMessage;
import cn.dankal.test.cloud.consumer.input.DankalMessageInput;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
/**
* @author ZGH.MercyModest
* @version V1.0.0
* @create 2021-02-01
*/
@Slf4j
@Component
public class DankalMessageConsumer {
@StreamListener(DankalMessageInput.INPUT_NAME)
public void onMessage(@Payload DankalSimpleMessage dankalSimpleMessage) {
log.info(
"{} {} 成功消费: {}",
Const.FULL_DATE_TIME_FORMATTER.format(LocalDateTime.now()),
Common.getSpringKafkaConsumerName(Thread.currentThread().getName()),
dankalSimpleMessage
);
}
}
```
#### 启动类
```java
package cn.dankal.test.cloud.consumer;
import cn.dankal.test.cloud.consumer.input.DankalMessageInput;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
/**
* @author ZGH.MercyModest
* @version V1.0.0
* @create 2021-02-01
*/
@SpringBootApplication
@EnableBinding({DankalMessageInput.class})
public class SpringCloudStreamKafkaConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(SpringCloudStreamKafkaConsumerApplication.class, args);
}
}
```
我们同样需要使用 `@EnableBinding`开始 Binding 功能,扫描对应的 `@Input`注解,测试用例使用的是`DankalMessageInput`
### 执行测试
> http://localhost:9002/json/10
### 消费者运行结果
![](https://img.mercymodest.com/public/20210201120144.png)
#### tips:
> 在搭建消费者的时候,我们能将消费的并发数设置为 3
>
> ![](https://img.mercymodest.com/public/20210201120420.png)
>
> 我们可以根据实践情况对参数进行相应调整,从而实现我们需要的并发消费
## spring-cloud-stream-kafka: 消息重试
Spring-Kafka 提供**消费重试**的机制。在消息**消费失败**的时候,Spring-Kafka 会通过**消费重试**机制,重新投递该消息给 Consumer ,让 Consumer 有机会重新消费消息,实现消费成功。
> tips:Spring Cloud Stream Kafka 是基于 spring-kafka,而是在默认情况下,达到 N 次重试次数时,Consumer 还是消费失败时,该消息就会进入到死信队列。
当然,Spring-Kafka 并不会无限重新投递消息给 Consumer 重新消费,而是在默认情况下,达到 N 次重试次数时,Consumer 还是消费失败时,该消息就会进入到**死信队列**
> 死信队列用于处理无法被正常消费的消息。当一条消息初次消费失败,Spring-Kafka 会自动进行消息重试;达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息,此时,Spring-Kafka 不会立刻将消息丢弃,而是将其发送到该消费者对应的特殊队列中。
>
> Spring-Kafka 将这种正常情况下无法被消费的消息称为死信消息(Dead-Letter Message),将存储死信消息的特殊队列称为死信队列(Dead-Letter Queue)。后续,我们可以通过对死信队列中的消息进行重发,来使得消费者实例再次进行消费。
**而在 Kafka 中,消费重试和死信队列,是由 Spring-Kafka 所封装提供的**
详情可以参阅[蛋壳创意科技-技术测试-Kafka集群安装.md](https://git.dankal.cn/mercyModest/dankal-test-kafka/blob/master/蛋壳创意科技-技术测试-Kafka集群安装.md)
因为 Spring boot 版本问题,我们在使用 `spring-boot`整合kafka 时,虽然我们可以通过 `SeekToCurrentErrorHandler`设置消息重试,但是却无法配置消息重试时间间隔.(高版本的SpringBoot 支持)。幸运的是 在当前的 `spring-cloud-stream-kafka`版本不仅支持 消息重试配置而且我们可以就消息重试做更多的配置.etc 消息重试时间间隔。
### 修改消费者配置文件
```yaml
spring:
cloud:
stream:
bindings:
# 需要对应 @Input 注解
kafka-dankal-message-input:
... ...
consumer:
# 开始多个线程(一个线程一个消费者)
concurrency: 3
# 重试次数 default:3
maxAttempts: 4
# 重试时间间隔的初始值 default: 1000 单位: millisecond
backOffInitialInterval: 3000
# 重试间隔递增系数: default: 2.0
backOffMultiplier: 2.0
# 重试时间间隔的最大值: default 10000 单位: millisecond
backOffMaxInterval: 10000
kafka:
... ...
bindings:
kafka-dankal-message-input:
consumer:
# 开启死信队列
# default: false
enableDlq: true
# 死信队列名称: 默认为 error.{topicName}
#dlqName:
```
### 消费者修改
```java
package cn.dankal.test.cloud.consumer.consumer;
import cn.dankal.test.cloud.consumer.common.Common;
import cn.dankal.test.cloud.consumer.common.Const;
import cn.dankal.test.cloud.consumer.entity.DankalSimpleMessage;
import cn.dankal.test.cloud.consumer.input.DankalMessageInput;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
/**
* @author ZGH.MercyModest
* @version V1.0.0
* @create 2021-02-01
*/
@Slf4j
@Component
public class DankalMessageConsumer {
@StreamListener(DankalMessageInput.INPUT_NAME)
public void onMessage(@Payload DankalSimpleMessage dankalSimpleMessage) {
log.info(
"{} {} 成功消费: {}",
Const.FULL_DATE_TIME_FORMATTER.format(LocalDateTime.now()),
Common.getSpringKafkaConsumerName(Thread.currentThread().getName()),
dankalSimpleMessage
);
// 测试 spring-cloud-stream-kafka 消息重试
throw new RuntimeException("dankal test retry consumer message");
}
}
```
### 浏览器执行
> http://localhost:9002/json/1
### 执行结果
![](https://img.mercymodest.com/public/20210203104303.png)
> 正如我们所看到的,消费失败之后,我们的消费者最大可以重试四次。
>
> 我们配置的初始递增时间为: `backOffInitialInterval=3`,
>
> > 接受消息的时间是: 2021-02-03 10:30:10 ==(第一次)==
> >
> > 第一次重试时间: 2021-02-03 10:30:13 (+3) ==(第二次)==
>
> 我们配置的重试递增次数为: `backOffMultiplier=2.0`
>
> > 第二次重试时间: 2021-02-03 10:30:19 (+6) ==(第三次)==
>
> 我们配置的最大重试次数:`maxAttempts=4`,重试间隔最大值: `backOffMaxInterval=10000`ms
>
> > 第三次重试时间:2021-02-03 10:30:29(+10) ==(第四次)==
## spring-cloud-stream-kafka: 异常处理
### 修改消费者
```java
package cn.dankal.test.cloud.consumer.consumer;
import cn.dankal.test.cloud.consumer.common.Common;
import cn.dankal.test.cloud.consumer.common.Const;
import cn.dankal.test.cloud.consumer.entity.DankalSimpleMessage;
import cn.dankal.test.cloud.consumer.input.DankalMessageInput;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.context.IntegrationContextUtils;
import org.springframework.messaging.Message;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.messaging.support.ErrorMessage;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
import java.util.Objects;
/**
* @author ZGH.MercyModest
* @version V1.0.0
* @create 2021-02-01
*/
@Slf4j
@Component
public class DankalMessageConsumer {
@StreamListener(DankalMessageInput.INPUT_NAME)
public void onMessage(@Payload DankalSimpleMessage dankalSimpleMessage) {
log.info(
"{} {} 成功消费: {}",
Const.FULL_DATE_TIME_FORMATTER.format(LocalDateTime.now()),
Common.getSpringKafkaConsumerName(Thread.currentThread().getName()),
dankalSimpleMessage
);
// 测试 spring-cloud-stream-kafka 消息重试
throw new RuntimeException("dankal test retry consumer message");
}
/**
* 处理指定 {@link org.springframework.cloud.stream.annotation.Input} 的异常
* <pre>
* 异常 input name : Channel(<destination>.<group>.errors)
* 举个栗子:
* {@link DankalMessageInput#dankalMessageInputChannel()}
* destination: cloud-kafka-dankal-message-topic
* group: cloud-kafka-dankal-message-consumer-group
* 则 此 input 对应消费异常管道为:
* cloud-kafka-dankal-message-topic.cloud-kafka-dankal-message-consumer-group.errors
*
* </pre>
*/
@ServiceActivator(inputChannel = "cloud-kafka-dankal-message-topic.cloud-kafka-dankal-message-consumer-group.errors")
public void handlerDankalMessageInputError(ErrorMessage errorMessage) {
log.info("current error is handler to DankalMessageInput ");
log.error("payload error message: {}", errorMessage.getPayload().getMessage());
Message<?> originalMessage = errorMessage.getOriginalMessage();
if (Objects.nonNull(originalMessage)) {
log.error("origin payload message:{}", originalMessage.getPayload());
}
log.error("error headers:{}", errorMessage.getHeaders());
}
/**
* input 全局异常异常处理
*/
@StreamListener(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)
public void globalInputErrorHandler(ErrorMessage errorMessage) {
log.info("current error is input error handler ");
log.error("payload error message: {}", errorMessage.getPayload().getMessage());
Message<?> originalMessage = errorMessage.getOriginalMessage();
if (Objects.nonNull(originalMessage)) {
log.error("origin payload message:{}", originalMessage.getPayload());
}
log.error("error headers:{}", errorMessage.getHeaders());
}
}
```
> 当 消费`input` 消息出现异常时,`spring-cloud-stream-kafak`会异常打包成`ErrorMessage`然后发送到管道`<destination>.<group>.errors`
>
> ```java
> /**
> * 处理指定 {@link org.springframework.cloud.stream.annotation.Input} 的异常
> * <pre>
> * 异常 input name : Channel(<destination>.<group>.errors)
> * 举个栗子:
> * {@link DankalMessageInput#dankalMessageInputChannel()}
> * destination: cloud-kafka-dankal-message-topic
> * group: cloud-kafka-dankal-message-consumer-group
> * 则 此 input 对应消费异常管道为:
> * cloud-kafka-dankal-message-topic.cloud-kafka-dankal-message-consumer-group.errors
> *
> * </pre>
> */
> ```
>
> 上述处理方式单一 `Channel`异常处理,我们也可以监听`spring-cloud-stream-kafka`的全局异常`Channel`
>
> ```java
>
> /**
> * input 全局异常异常处理
> */
> @StreamListener(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)
> public void globalInputErrorHandler(ErrorMessage errorMessage) {
> log.info("current error is input error handler ");
> log.error("payload error message: {}", errorMessage.getPayload().getMessage());
> Message<?> originalMessage = errorMessage.getOriginalMessage();
> if (Objects.nonNull(originalMessage)) {
> log.error("origin payload message:{}", originalMessage.getPayload());
> }
> log.error("error headers:{}", errorMessage.getHeaders());
> }
> ```
>
> 和`SpringMVC`类似,当某一`Channel`发生异常时,如果没有对应的`@ServiceActivator`,才会发送到全局异常`Channel`:
>
> ```java
> @StreamListener(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)
> ```
>
> > tips:
> >
> > ==不过要注意,如果异常处理方法成功,没有重新抛出异常,会认定为该消息被**消费成功**,所以就不会发到死信队列了噢~==
## spring-cloud-stream-kafka: 消息过滤
**spring-cloud-stream-kafka 提供了**通用的 **Consumer** 级别的效率过滤器机制。我们只需要使用 `@StreamListener` 注解的 `condition` 属性,设置消息满足指定 Spring EL 表达式的情况下,才进行消费
### 举个栗子
#### 新建一个消费者
使用 `@StreamListener``condition` 属性结合SpEL实现消息过滤
```java
@StreamListener(value = DankalMessageInput.INPUT_NAME,
condition = "headers['messageTag']=='dankal-condition'")
public void onMessage(@Payload DankalSimpleMessage dankalSimpleMessage){
... ...
}
```
```java
package cn.dankal.test.cloud.consumer.consumer;
import cn.dankal.test.cloud.consumer.common.Common;
import cn.dankal.test.cloud.consumer.common.Const;
import cn.dankal.test.cloud.consumer.entity.DankalSimpleMessage;
import cn.dankal.test.cloud.consumer.input.DankalMessageInput;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
/**
* @author ZGH.MercyModest
* @version V1.0.0
* @create 2021-02-04
*/
@Slf4j
@Component
public class ConditionDankalMessageConsumer {
/**
* 此消费者只消费 message header 带有: messageTag 并且其值为: dankal-condition
* @param dankalSimpleMessage
*/
@StreamListener(value = DankalMessageInput.INPUT_NAME,
condition = "headers['messageTag']=='dankal-condition'")
public void onMessage(@Payload DankalSimpleMessage dankalSimpleMessage) {
log.info(
"ConditionDankalMessageConsumer: {} {} 成功消费: {}",
Const.FULL_DATE_TIME_FORMATTER.format(LocalDateTime.now()),
Common.getSpringKafkaConsumerName(Thread.currentThread().getName()),
dankalSimpleMessage
);
}
}
```
#### controller
```java
@GetMapping("/condition/{count}")
public String sendAssignCountConditionMessage(@PathVariable("count") Integer count) {
if (count <= 0) {
count = 10;
}
int setHeaderTagCount = 0;
for (int i = 0; i < count; i++) {
final String key = "dankal-cloud-kafka-" + RandomUtil.randomInt(100, 100000);
final DankalSimpleMessage dankalSimpleMessage = new DankalSimpleMessage()
.setId(RandomUtil.randomInt(10, 1000000))
.setContent(RandomUtil.randomString(15))
.setSendTime(LocalDateTime.now())
.setKey(key);
MessageBuilder<DankalSimpleMessage> messageBuilder = MessageBuilder
.withPayload(dankalSimpleMessage);
if (RandomUtil.randomBoolean()) {
// 随机添加 message header tag
messageBuilder.setHeader("messageTag", "dankal-condition");
setHeaderTagCount++;
}
Message<DankalSimpleMessage> dankalSimpleMessageMessage = messageBuilder
.build();
kafkaDankalMessageOutput.kafkaDankalMessageOutput()
.send(dankalSimpleMessageMessage);
}
log.info("共发送 {} 条消息,其中 {} 条 带有 header tag", count, setHeaderTagCount);
return String.valueOf(count);
}
```
#### 执行测试
> http://localhost:9002/condition/5
#### producer 控制台
![](https://img.mercymodest.com/public/20210204180251.png)
#### consumer 控制台
##### ConditionDankalMessageConsumer
![](https://img.mercymodest.com/public/20210204180410.png)
##### DankalMessageConsumer
![](https://img.mercymodest.com/public/20210204180525.png)
## spring-cloud-stream-kafka: 消息的手动提交
**spring-cloud-stream-kafka 的提交机制**
spring-cloud-stream-kafka 在 spring-kafka 上进一步封装,在`spring.cloud.stream.kafka.bindings..<bindingName>.consumer` 下提供了两个配置项:
- `auto-commit-offset` 配置项,是否自动提交消费进度,默认为 `true` 自动提交。
- `ack-each-record` 配置项,是否每一条消息都进行提交消费进度,默认为 `false` 在每一批消费完成后一起提交。
我们进行下整理,将 Spring Cloud Stream Kafka 这两个配置项,和 Spring-Kafka 的 AckMode 对应上,如下表格:
| AckMode | `auto-commit-offset` | `ack-each-record` |
| :---------------------- | :------------------- | :---------------- |
| 自动 `RECORD` | `true` | `false` |
| 自动 `BATCH` | `true` | `true` |
| 手动 `MANUAL` | `false` | `false` |
| 手动 `MANUAL_IMMEDIATE` | `false` | `true` |
因此,**默认什么都不配置的情况下,也使用 Spring-Kafka 的 BATCH 模式:每一次消息被消费完成后,在下次拉取消息之前,自动提交**
### 修改配置文件
```yaml
server:
port: 9003
spring:
cloud:
stream:
... ...
kafka:
binder:
# kafka broker 地址,多地址逗号分隔
brokers: 192.168.128.3:9092,192.168.128.3:9093,192.168.128.3:9094
bindings:
kafka-dankal-message-input:
consumer:
# 开启死信队列
# default: false
enableDlq: true
# 死信队列名称: 默认为 error.{topicName}
#dlqName:
# 是否取消自动提交
# default: true
autoCommitOffset: false
# 是否每消费一条消息 提交一次偏移量
# default: false 批量提交
# 即 全部 消费完 单次 poll 的消息后,一次性提交偏移量
ackEachRecord: true
```
### 修改消费者
```java
@Slf4j
@Component
public class DankalMessageConsumer {
@StreamListener(DankalMessageInput.INPUT_NAME)
public void onMessage(
@Payload DankalSimpleMessage dankalSimpleMessage,
@Header(KafkaHeaders.ACKNOWLEDGMENT) Acknowledgment acknowledgment) {
log.info(
"DankalMessageConsumer: {} {} 成功消费: {}",
Const.FULL_DATE_TIME_FORMATTER.format(LocalDateTime.now()),
Common.getSpringKafkaConsumerName(Thread.currentThread().getName()),
dankalSimpleMessage
);
if (dankalSimpleMessage.getId() % 2 == 0) {
acknowledgment.acknowledge();
log.info("{} 消费确认", dankalSimpleMessage.getId());
}
// 测试 spring-cloud-stream-kafka 消息重试
//throw new RuntimeException("dankal test retry consumer message");
}
}
```
### controller
```java
@GetMapping("/json/{count}")
public String sendAssignCountMessage(@PathVariable("count") Integer count) {
if (count <= 0) {
count = 10;
}
for (int i = 0; i < count; i++) {
final String key = "dankal-cloud-kafka-" + RandomUtil.randomInt(100, 100000);
final DankalSimpleMessage dankalSimpleMessage = new DankalSimpleMessage()
.setId(RandomUtil.randomInt(10, 1000000))
.setContent(RandomUtil.randomString(15))
.setSendTime(LocalDateTime.now())
.setKey(key);
Message<DankalSimpleMessage> dankalSimpleMessageMessage = MessageBuilder.withPayload(dankalSimpleMessage).build();
kafkaDankalMessageOutput.kafkaDankalMessageOutput()
.send(dankalSimpleMessageMessage);
}
return String.valueOf(count);
}
```
### 请求执行
> http://localhost:9002/json/5
### consumer 截图
![](https://img.mercymodest.com/public/20210204183027.png)
> 全部消息了发送 5 条消息,但是只有 1条消息提交了偏移量
![](https://img.mercymodest.com/public/20210204183153.png)
\ No newline at end of file
# 蛋壳创意科技-技术测试-SpringCloud
# 蛋壳创意科技-技术测试-SpringCloud
> company: [蛋壳创意科技](https://www.dankal.cn/)
>
> code: [dankal-test-kafka](http://git.dankal.cn/mercyModest/dankal-test-kafka.git)
>
> spring-kafka-doc: [spring-cloud-stream-2.1.x](https://github.com/spring-cloud/spring-cloud-stream/blob/2.1.x/docs/src/main/asciidoc/spring-cloud-stream.adoc)
>
## 前景小述
[Spring Cloud Stream](https://github.com/spring-cloud/spring-cloud-stream/tree/2.1.x) 是一个用于构建基于**消息**的微服务应用框架,使用 Spring Integration与 Broker 进行连接。
> 啥是`Broker`?
>
> 一般来说,消息队列中间件都有一个 **Broker Server**(代理服务器),消息中转角色,负责存储消息、转发消息。
>
> 例如说在 RocketMQ 中,Broker 负责接收从生产者发送来的消息并存储、同时为消费者的拉取请求作准备。另外,Broker 也存储消息相关的元数据,包括消费者组、消费进度偏移和主题和队列消息等。
Spring Cloud Stream 提供了消息中间件的**统一抽象**,推出了 publish-subscribe、consumer groups、partition 这些统一的概念。
Spring Cloud Stream 内部有两个概念:**Binder****Binding**
① Binder,跟消息中间件集成的组件,用来创建对应的 Binding。各消息中间件都有自己的 Binder 具体实现。
```java
public interface Binder<T,
// 消费者配置
C extends ConsumerProperties,
// 生产者配置
P extends ProducerProperties> {
// 创建消费者的 Binding
Binding<T> bindConsumer(String name, String group, T inboundBindTarget, C consumerProperties);
// 创建生产者的 Binding
Binding<T> bindProducer(String name, T outboundBindTarget, P producerProperties);
}
```
② Binding,包括 Input Binding 和 Output Binding。Binding 在消息中间件与应用程序提供的 Provider 和 Consumer 之间提供了一个桥梁,实现了开发者只需使用应用程序的 Provider 或 Consumer 生产或消费数据即可,屏蔽了开发者与底层消息中间件的接触。
![](https://img.mercymodest.com/public/20210130110316.png)
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment