Commit f82d6c8a by 仲光辉

init: Initialization project

parents
# Created by .ignore support plugin (hsz.mobi)
### Java template
# Compiled class file
*.class
# Log file
*.log
# BlueJ files
*.ctxt
# Mobile Tools for Java (J2ME)
.mtj.tmp/
# Package Files #
*.war
*.nar
*.ear
*.zip
*.tar.gz
*.rar
# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml
hs_err_pid*
.gradle
build
.idea
*.iml
// gradle 自身需要的依赖/插件 etc. springboot,springboot依赖管理
buildscript {
// 依赖/插件仓库配置
repositories {
// GRADLE_USER_HOME : maven 本地仓库地址
mavenLocal()
maven {
url 'https://maven.aliyun.com/repository/gradle-plugin'
}
maven {
url 'https://maven.aliyun.com/repository/spring-plugin'
}
mavenCentral()
}
// 类似于 maven Properties
ext {
set('springBootVersion', '2.1.8.RELEASE')
set('dependencyManagementPlugin', '1.0.10.RELEASE')
}
// gradle 插件依赖
dependencies {
classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}")
classpath("io.spring.gradle:dependency-management-plugin:${dependencyManagementPlugin}")
}
}
apply plugin: 'java'
apply plugin: 'idea'
apply plugin: 'io.spring.dependency-management'
apply plugin: 'org.springframework.boot'
// jdk 相关配置
sourceCompatibility = '1.8'
targetCompatibility = '1.8'
group 'cn.dankal.test'
version '1.0.0.RELEASE'
// 类似于 maven Properties
ext {
set('springCloudAlibabaVersion', "2.1.3.RELEASE")
set('springCloudVersion', "Greenwich.SR5")
set('hutoolCoreVersion', "5.4.6")
set('kafkaClientVersion', '2.7.0')
}
// 依赖仓库配置
repositories {
// GRADLE_USER_HOME : maven 本地仓库地址
mavenLocal()
maven {
url 'https://maven.aliyun.com/repository/public/'
}
maven {
url 'https://maven.aliyun.com/repository/spring/'
}
maven {
url 'https://maven.aliyun.com/repository/gradle-plugin'
}
maven {
url 'https://maven.aliyun.com/repository/spring-plugin'
}
mavenCentral()
}
dependencies {
// spring dependence
implementation 'org.springframework.boot:spring-boot-starter-web'
implementation 'org.springframework.kafka:spring-kafka'
testImplementation('org.springframework.boot:spring-boot-starter-test') {
exclude group: 'org.junit.vintage', module: 'junit-vintage-engine'
}
// third party dependence
implementation "cn.hutool:hutool-core:${hutoolCoreVersion}"
compile 'org.projectlombok:lombok'
annotationProcessor 'org.projectlombok:lombok'
testCompileOnly 'org.projectlombok:lombok'
}
tasks.withType(JavaCompile) {
options.encoding = "UTF-8"
}
// 类似于 maven dependencyManagement
dependencyManagement {
imports {
// spring-cloud
mavenBom "org.springframework.cloud:spring-cloud-dependencies:${springCloudVersion}"
// spring-cloud-alibaba
mavenBom "com.alibaba.cloud:spring-cloud-alibaba-dependencies:${springCloudAlibabaVersion}"
}
}
// 单元测试配置
test {
useJUnitPlatform()
}
// package 配置
bootJar.enabled = false
jar.enabled = true
version: '3.7'
services:
kafka01:
image: wurstmeister/kafka
#restart: always
hostname: kafka01
container_name: kafka01
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.128.3:9092
KAFKA_ADVERTISED_HOST_NAME: 192.168.128.3
KAFKA_ADVERTISED_PORT: 9092
KAFKA_ZOOKEEPER_CONNECT: zookeeper01:2181,zookeeper02:2181,zookeeper03:2181
JMX_PORT: 9999
volumes:
- ./kafka01/logs:/kafka
external_links:
- zookeeper01
- zookeeper02
- zookeeper03
networks:
default:
ipv4_address: 172.128.0.20
kafka02:
image: wurstmeister/kafka
#restart: always
hostname: kafka02
container_name: kafka02
ports:
- "9093:9092"
environment:
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.128.3:9093
KAFKA_ADVERTISED_HOST_NAME: 192.168.128.3
KAFKA_ADVERTISED_PORT: 9093
KAFKA_ZOOKEEPER_CONNECT: zookeeper01:2181,zookeeper02:2181,zookeeper03:2181
JMX_PORT: 9988
volumes:
- ./kafka02/logs:/kafka
external_links:
- zookeeper01
- zookeeper02
- zookeeper03
networks:
default:
ipv4_address: 172.128.0.21
kafka03:
image: wurstmeister/kafka
#restart: always
hostname: kafka03
container_name: kafka03
ports:
- "9094:9092"
environment:
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.128.3:9094
KAFKA_ADVERTISED_HOST_NAME: 192.168.128.3
KAFKA_ADVERTISED_PORT: 9094
KAFKA_ZOOKEEPER_CONNECT: zookeeper01:2181,zookeeper02:2181,zookeeper03:2181
JMX_PORT: 9977
volumes:
- ./kafka03/logs:/kafka
external_links:
- zookeeper01
- zookeeper02
- zookeeper03
networks:
default:
ipv4_address: 172.128.0.22
networks:
default:
external:
name: zookeeper_network
\ No newline at end of file
#!/bin/bash
docker run -id --name kafka-manager --privileged --network=zookeeper_network --ip 172.128.0.30 -p 9000:9000 -e ZK_HOSTS="zookeeper01:2181,zookeeper02:2182,zookeeper03:2183" kafkamanager/kafka-manager
\ No newline at end of file
version: '3.7'
services:
zookeeper01:
image: zookeeper
restart: always
privileged: true
container_name: zookeeper01
hostname: zookeeper01
ports:
- 2181:2181
volumes: # 挂载数据
- ./zookeeper01/data:/data
- ./zookeeper01/datalog:/datalog
environment:
ZOO_MY_ID: 1
ZOO_SERVERS: server.1=0.0.0.0:2888:3888;2181 server.2=zookeeper02:2888:3888;2181 server.3=zookeeper03:2888:3888;2181
networks:
default:
ipv4_address: 172.128.0.10
zookeeper02:
image: zookeeper
restart: always
privileged: true
container_name: zookeeper02
hostname: zookeeper02
ports:
- 2182:2181
volumes: # 挂载数据
- ./zookeeper02/data:/data
- ./zookeeper02/datalog:/datalog
environment:
ZOO_MY_ID: 2
ZOO_SERVERS: server.1=zookeeper01:2888:3888;2181 server.2=0.0.0.0:2888:3888;2181 server.3=zookeeper03:2888:3888;2181
networks:
default:
ipv4_address: 172.128.0.11
zookeeper03:
image: zookeeper
restart: always
privileged: true
container_name: zookeeper03
hostname: zookeeper03
ports:
- 2183:2181
volumes: # 挂载数据
- ./zookeeper03/data:/data
- ./zookeeper03/datalog:/datalog
environment:
ZOO_MY_ID: 3
ZOO_SERVERS: server.1=zookeeper01:2888:3888;2181 server.2=zookeeper02:2888:3888;2181 server.3=0.0.0.0:2888:3888;2181
networks:
default:
ipv4_address: 172.128.0.12
networks: # 自定义网络
default:
external:
name: zookeeper_network
\ No newline at end of file
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-6.7.1-bin.zip
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
#!/usr/bin/env sh
#
# Copyright 2015 the original author or authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
##############################################################################
##
## Gradle start up script for UN*X
##
##############################################################################
# Attempt to set APP_HOME
# Resolve links: $0 may be a link
PRG="$0"
# Need this for relative symlinks.
while [ -h "$PRG" ] ; do
ls=`ls -ld "$PRG"`
link=`expr "$ls" : '.*-> \(.*\)$'`
if expr "$link" : '/.*' > /dev/null; then
PRG="$link"
else
PRG=`dirname "$PRG"`"/$link"
fi
done
SAVED="`pwd`"
cd "`dirname \"$PRG\"`/" >/dev/null
APP_HOME="`pwd -P`"
cd "$SAVED" >/dev/null
APP_NAME="Gradle"
APP_BASE_NAME=`basename "$0"`
# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script.
DEFAULT_JVM_OPTS='"-Xmx64m" "-Xms64m"'
# Use the maximum available, or set MAX_FD != -1 to use that value.
MAX_FD="maximum"
warn () {
echo "$*"
}
die () {
echo
echo "$*"
echo
exit 1
}
# OS specific support (must be 'true' or 'false').
cygwin=false
msys=false
darwin=false
nonstop=false
case "`uname`" in
CYGWIN* )
cygwin=true
;;
Darwin* )
darwin=true
;;
MINGW* )
msys=true
;;
NONSTOP* )
nonstop=true
;;
esac
CLASSPATH=$APP_HOME/gradle/wrapper/gradle-wrapper.jar
# Determine the Java command to use to start the JVM.
if [ -n "$JAVA_HOME" ] ; then
if [ -x "$JAVA_HOME/jre/sh/java" ] ; then
# IBM's JDK on AIX uses strange locations for the executables
JAVACMD="$JAVA_HOME/jre/sh/java"
else
JAVACMD="$JAVA_HOME/bin/java"
fi
if [ ! -x "$JAVACMD" ] ; then
die "ERROR: JAVA_HOME is set to an invalid directory: $JAVA_HOME
Please set the JAVA_HOME variable in your environment to match the
location of your Java installation."
fi
else
JAVACMD="java"
which java >/dev/null 2>&1 || die "ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH.
Please set the JAVA_HOME variable in your environment to match the
location of your Java installation."
fi
# Increase the maximum file descriptors if we can.
if [ "$cygwin" = "false" -a "$darwin" = "false" -a "$nonstop" = "false" ] ; then
MAX_FD_LIMIT=`ulimit -H -n`
if [ $? -eq 0 ] ; then
if [ "$MAX_FD" = "maximum" -o "$MAX_FD" = "max" ] ; then
MAX_FD="$MAX_FD_LIMIT"
fi
ulimit -n $MAX_FD
if [ $? -ne 0 ] ; then
warn "Could not set maximum file descriptor limit: $MAX_FD"
fi
else
warn "Could not query maximum file descriptor limit: $MAX_FD_LIMIT"
fi
fi
# For Darwin, add options to specify how the application appears in the dock
if $darwin; then
GRADLE_OPTS="$GRADLE_OPTS \"-Xdock:name=$APP_NAME\" \"-Xdock:icon=$APP_HOME/media/gradle.icns\""
fi
# For Cygwin or MSYS, switch paths to Windows format before running java
if [ "$cygwin" = "true" -o "$msys" = "true" ] ; then
APP_HOME=`cygpath --path --mixed "$APP_HOME"`
CLASSPATH=`cygpath --path --mixed "$CLASSPATH"`
JAVACMD=`cygpath --unix "$JAVACMD"`
# We build the pattern for arguments to be converted via cygpath
ROOTDIRSRAW=`find -L / -maxdepth 1 -mindepth 1 -type d 2>/dev/null`
SEP=""
for dir in $ROOTDIRSRAW ; do
ROOTDIRS="$ROOTDIRS$SEP$dir"
SEP="|"
done
OURCYGPATTERN="(^($ROOTDIRS))"
# Add a user-defined pattern to the cygpath arguments
if [ "$GRADLE_CYGPATTERN" != "" ] ; then
OURCYGPATTERN="$OURCYGPATTERN|($GRADLE_CYGPATTERN)"
fi
# Now convert the arguments - kludge to limit ourselves to /bin/sh
i=0
for arg in "$@" ; do
CHECK=`echo "$arg"|egrep -c "$OURCYGPATTERN" -`
CHECK2=`echo "$arg"|egrep -c "^-"` ### Determine if an option
if [ $CHECK -ne 0 ] && [ $CHECK2 -eq 0 ] ; then ### Added a condition
eval `echo args$i`=`cygpath --path --ignore --mixed "$arg"`
else
eval `echo args$i`="\"$arg\""
fi
i=`expr $i + 1`
done
case $i in
0) set -- ;;
1) set -- "$args0" ;;
2) set -- "$args0" "$args1" ;;
3) set -- "$args0" "$args1" "$args2" ;;
4) set -- "$args0" "$args1" "$args2" "$args3" ;;
5) set -- "$args0" "$args1" "$args2" "$args3" "$args4" ;;
6) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" ;;
7) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" ;;
8) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" "$args7" ;;
9) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" "$args7" "$args8" ;;
esac
fi
# Escape application args
save () {
for i do printf %s\\n "$i" | sed "s/'/'\\\\''/g;1s/^/'/;\$s/\$/' \\\\/" ; done
echo " "
}
APP_ARGS=`save "$@"`
# Collect all arguments for the java command, following the shell quoting and substitution rules
eval set -- $DEFAULT_JVM_OPTS $JAVA_OPTS $GRADLE_OPTS "\"-Dorg.gradle.appname=$APP_BASE_NAME\"" -classpath "\"$CLASSPATH\"" org.gradle.wrapper.GradleWrapperMain "$APP_ARGS"
exec "$JAVACMD" "$@"
@rem
@rem Copyright 2015 the original author or authors.
@rem
@rem Licensed under the Apache License, Version 2.0 (the "License");
@rem you may not use this file except in compliance with the License.
@rem You may obtain a copy of the License at
@rem
@rem https://www.apache.org/licenses/LICENSE-2.0
@rem
@rem Unless required by applicable law or agreed to in writing, software
@rem distributed under the License is distributed on an "AS IS" BASIS,
@rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@rem See the License for the specific language governing permissions and
@rem limitations under the License.
@rem
@if "%DEBUG%" == "" @echo off
@rem ##########################################################################
@rem
@rem Gradle startup script for Windows
@rem
@rem ##########################################################################
@rem Set local scope for the variables with windows NT shell
if "%OS%"=="Windows_NT" setlocal
set DIRNAME=%~dp0
if "%DIRNAME%" == "" set DIRNAME=.
set APP_BASE_NAME=%~n0
set APP_HOME=%DIRNAME%
@rem Resolve any "." and ".." in APP_HOME to make it shorter.
for %%i in ("%APP_HOME%") do set APP_HOME=%%~fi
@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script.
set DEFAULT_JVM_OPTS="-Xmx64m" "-Xms64m"
@rem Find java.exe
if defined JAVA_HOME goto findJavaFromJavaHome
set JAVA_EXE=java.exe
%JAVA_EXE% -version >NUL 2>&1
if "%ERRORLEVEL%" == "0" goto execute
echo.
echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH.
echo.
echo Please set the JAVA_HOME variable in your environment to match the
echo location of your Java installation.
goto fail
:findJavaFromJavaHome
set JAVA_HOME=%JAVA_HOME:"=%
set JAVA_EXE=%JAVA_HOME%/bin/java.exe
if exist "%JAVA_EXE%" goto execute
echo.
echo ERROR: JAVA_HOME is set to an invalid directory: %JAVA_HOME%
echo.
echo Please set the JAVA_HOME variable in your environment to match the
echo location of your Java installation.
goto fail
:execute
@rem Setup the command line
set CLASSPATH=%APP_HOME%\gradle\wrapper\gradle-wrapper.jar
@rem Execute Gradle
"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" org.gradle.wrapper.GradleWrapperMain %*
:end
@rem End local scope for the variables with windows NT shell
if "%ERRORLEVEL%"=="0" goto mainEnd
:fail
rem Set variable GRADLE_EXIT_CONSOLE if you need the _script_ return code instead of
rem the _cmd.exe /c_ return code!
if not "" == "%GRADLE_EXIT_CONSOLE%" exit 1
exit /b 1
:mainEnd
if "%OS%"=="Windows_NT" endlocal
:omega
// 解决有时候 插件无法下载问题
pluginManagement {
repositories {
mavenLocal()
maven { url "https://maven.aliyun.com/repository/gradle-plugin" }
maven { url "https://maven.aliyun.com/repository/spring-plugin" }
maven { url "https://maven.aliyun.com/repository/public" }
maven { url 'https://maven.aliyun.com/repository/spring' }
mavenCentral()
gradlePluginPortal()
}
}
rootProject.name = 'dankal-test-kafka'
package cn.dankal.test;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* @author ZGH.MercyModest
* @version V1.0.0
* @create 2021-01-22
*/
@SpringBootApplication
public class KafkaTestApplication {
public static void main(String[] args) {
SpringApplication.run(KafkaTestApplication.class, args);
}
}
package cn.dankal.test.commandline;
import cn.dankal.test.common.Const;
import cn.dankal.test.service.KafkaMessageService;
import cn.hutool.core.util.RandomUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
import java.util.concurrent.TimeUnit;
/**
* @author ZGH.MercyModest
* @version V1.0.0
* @create 2021-01-22
*/
@Slf4j
@Component
public class KafkaMessageSendCommandLineRunner implements CommandLineRunner {
/**
* KafkaMessageService
*/
private final KafkaMessageService kafkaMessageService;
public KafkaMessageSendCommandLineRunner(KafkaMessageService kafkaMessageService) {
this.kafkaMessageService = kafkaMessageService;
}
@Override
public void run(String... args) throws Exception {
while (true) {
final String topic = Const.Kafka.Topic.TOPIC_SIMPLE_STRING_MESSAGE;
final String key = "dankal-test-" + RandomUtil.randomInt(100, 100000000);
final String message = "蛋壳创意科技-" + RandomUtil.randomString(Const.RANDOM_BASE_STRING, 1000);
TimeUnit.MILLISECONDS.sleep(230);
boolean sendSuccess = kafkaMessageService.sendStringMessage(topic, key, message);
if (sendSuccess) {
System.out.println("消息发送成功~");
if (log.isDebugEnabled()) {
log.debug("成功发送消息: key: {} value: {}", key, message);
}
}
}
}
}
package cn.dankal.test.common;
/**
* @author ZGH.MercyModest
* @version V1.0.0
* @create 2021-01-21
*/
public interface Const {
/**
* random base string
*/
String RANDOM_BASE_STRING = "zxcvbnmasdfghjklqwertyuiopQAZWSXEDCRFVTGBYHNIUJKLMOP1234567890-=[],.;'";
/**
* kafka 相关常量
*/
interface Kafka {
/**
* Kafka Consumer group id
*/
interface ConsumerGroupId {
/**
* simple-string-consumer
*/
String GROUP_ID_SIMPLE_STRING_CONSUMER = "simple-string-consumer";
}
/**
* Kafka Topic
*/
interface Topic {
/**
* simple-string-message
*/
String TOPIC_SIMPLE_STRING_MESSAGE = "simple-string-message";
}
}
}
package cn.dankal.test.config;
import cn.dankal.test.common.Const;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.util.StringUtils;
import java.util.HashMap;
import java.util.Map;
/**
* @author ZGH.MercyModest
* @version V1.0.0
* @create 2021-01-22
*/
@Configuration
public class KafkaTopicConfig {
/**
* KafkaProperties
*/
private final KafkaProperties kafkaProperties;
public KafkaTopicConfig(KafkaProperties kafkaProperties) {
this.kafkaProperties = kafkaProperties;
}
@Bean
public KafkaAdmin admin() {
Map<String, Object> configs = new HashMap<>(1 << 5);
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
StringUtils.arrayToCommaDelimitedString(kafkaProperties.getBootstrapServers().toArray()));
return new KafkaAdmin(configs);
}
@Bean
public NewTopic simpleStringTopic() {
return new NewTopic(
Const.Kafka.Topic.TOPIC_SIMPLE_STRING_MESSAGE,
kafkaProperties.getListener().getConcurrency(),
(short) 3);
}
}
package cn.dankal.test.listener;
import cn.dankal.test.common.Const;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
/**
* @author ZGH.MercyModest
* @version V1.0.0
* @create 2021-01-21
*/
@Slf4j
@Component
public class KafkaMessageListener {
@KafkaListener(
topics = {Const.Kafka.Topic.TOPIC_SIMPLE_STRING_MESSAGE},
groupId = Const.Kafka.ConsumerGroupId.GROUP_ID_SIMPLE_STRING_CONSUMER
)
public void stringMessageListener(ConsumerRecord<?, ?> data,
Acknowledgment acknowledgment,
Consumer<?, ?> consumer) {
if (log.isInfoEnabled()) {
log.info("{} 消费消息 ==> 主题: {} 分区: {} 偏移量: {} key: {} value: {}",
Thread.currentThread().getName(),
data.topic(),
data.partition(),
data.offset(),
data.key(),
data.value());
}
acknowledgment.acknowledge();
}
}
package cn.dankal.test.service;
/**
* @author ZGH.MercyModest
* @version V1.0.0
* @create 2021-01-21
*/
public interface KafkaMessageService {
/**
* send message to assign topic
*
* @param topic 目标 topic
* @param key 消息 key
* @param message 需要发送的消息
* @return 是否执行成功
*/
boolean sendStringMessage(String topic,String key, String message);
}
package cn.dankal.test.service.impl;
import cn.dankal.test.service.KafkaMessageService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Repository;
/**
* @author ZGH.MercyModest
* @version V1.0.0
* @create 2021-01-21
*/
@Slf4j
@Repository
public class KafkaMessageServiceImpl implements KafkaMessageService {
/**
* kafkaTemplate
*/
private final KafkaTemplate<String, String> kafkaTemplate;
public KafkaMessageServiceImpl(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
@Override
public boolean sendStringMessage(String topic, String key, String message) {
kafkaTemplate.send(topic, key, message)
.addCallback(System.out::println, Throwable::printStackTrace);
return true;
}
}
server:
port: 9001
spring:
kafka:
bootstrap-servers:
- 192.168.128.3:9092
- 192.168.128.3:9093
- 192.168.128.3:9094
producer:
retries: 5
listener:
ack-mode: manual_immediate
concurrency: 3
ack-count: 3
type: single
ack-time: 60
template:
default-topic: dankal-test-default-topic
consumer:
enable-auto-commit: false
logging:
level:
cn.dankal.test: info
\ No newline at end of file
spring:
application:
name: dankal-test-kafka
profiles:
active: native
\ No newline at end of file
package cn.dankal.test.service;
import cn.dankal.test.KafkaTestApplication;
import cn.dankal.test.common.Const;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
/**
* @author ZGH.MercyModest
* @version V1.0.0
* @create 2021-01-21
*/
@RunWith(SpringRunner.class)
@SpringBootTest(classes = {KafkaTestApplication.class})
public class KafkaMessageServiceTest {
/**
* KafkaMessageService
*/
@Autowired
private KafkaMessageService kafkaMessageService;
/**
* 测试 发送 string 类型的 Kafka 消息
*/
@Test
public void testSendStringMessageToKafka() {
final String topic = Const.Kafka.Topic.TOPIC_SIMPLE_STRING_MESSAGE;
final String key = "dankal-test";
final String message = "Hello,Kafka~(蛋壳创意科技)";
boolean sendSuccess = kafkaMessageService.sendStringMessage(topic, key, message);
if (sendSuccess) {
System.out.println("消息发送成功");
}
}
}
# 蛋壳创意科技-技术测试-Kafka
# 蛋壳创意科技-技术测试-Kafka
> https://www.jianshu.com/p/6a592d558812
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment