Commit fc117cf6 by 仲光辉

init: Initialization project

parents
# Created by .ignore support plugin (hsz.mobi)
### Java template
# Compiled class file
*.class
# Log file
*.log
# BlueJ files
*.ctxt
# Mobile Tools for Java (J2ME)
.mtj.tmp/
# Package Files #
*.war
*.nar
*.ear
*.zip
*.tar.gz
*.rar
# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml
hs_err_pid*
.gradle
.idea
*.iml
build
// java,IntelliJ IDEA 插件
plugins {
id 'java'
id 'idea'
}
group 'cn.dankal.share'
description 'dankal-share-java-concurrent-util'
version '1.0-SNAPSHOT'
// 依赖仓库配置
repositories {
// GRADLE_USER_HOME : maven 本地仓库地址
mavenLocal()
maven {
url 'https://maven.aliyun.com/repository/public/'
}
maven {
url 'https://maven.aliyun.com/repository/spring/'
}
maven {
url 'https://maven.aliyun.com/repository/gradle-plugin'
}
maven {
url 'https://maven.aliyun.com/repository/spring-plugin'
}
mavenCentral()
}
// 编译字符集
tasks.withType(JavaCompile) {
options.encoding = "UTF-8"
}
// 使用的 java 版本
java.sourceCompatibility = JavaVersion.VERSION_1_8
java.targetCompatibility = JavaVersion.VERSION_1_8
// 项目依赖
dependencies {
implementation 'cn.hutool:hutool-core:5.5.4'
testImplementation 'org.junit.jupiter:junit-jupiter-api:5.6.0'
testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine'
}
// 执行测试任务
test {
useJUnitPlatform()
}
\ No newline at end of file
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
distributionUrl=https\://res.mercymodest.com/gradle-6.7.1-bin.zip
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
#!/usr/bin/env sh
#
# Copyright 2015 the original author or authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
##############################################################################
##
## Gradle start up script for UN*X
##
##############################################################################
# Attempt to set APP_HOME
# Resolve links: $0 may be a link
PRG="$0"
# Need this for relative symlinks.
while [ -h "$PRG" ] ; do
ls=`ls -ld "$PRG"`
link=`expr "$ls" : '.*-> \(.*\)$'`
if expr "$link" : '/.*' > /dev/null; then
PRG="$link"
else
PRG=`dirname "$PRG"`"/$link"
fi
done
SAVED="`pwd`"
cd "`dirname \"$PRG\"`/" >/dev/null
APP_HOME="`pwd -P`"
cd "$SAVED" >/dev/null
APP_NAME="Gradle"
APP_BASE_NAME=`basename "$0"`
# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script.
DEFAULT_JVM_OPTS='"-Xmx64m" "-Xms64m"'
# Use the maximum available, or set MAX_FD != -1 to use that value.
MAX_FD="maximum"
warn () {
echo "$*"
}
die () {
echo
echo "$*"
echo
exit 1
}
# OS specific support (must be 'true' or 'false').
cygwin=false
msys=false
darwin=false
nonstop=false
case "`uname`" in
CYGWIN* )
cygwin=true
;;
Darwin* )
darwin=true
;;
MINGW* )
msys=true
;;
NONSTOP* )
nonstop=true
;;
esac
CLASSPATH=$APP_HOME/gradle/wrapper/gradle-wrapper.jar
# Determine the Java command to use to start the JVM.
if [ -n "$JAVA_HOME" ] ; then
if [ -x "$JAVA_HOME/jre/sh/java" ] ; then
# IBM's JDK on AIX uses strange locations for the executables
JAVACMD="$JAVA_HOME/jre/sh/java"
else
JAVACMD="$JAVA_HOME/bin/java"
fi
if [ ! -x "$JAVACMD" ] ; then
die "ERROR: JAVA_HOME is set to an invalid directory: $JAVA_HOME
Please set the JAVA_HOME variable in your environment to match the
location of your Java installation."
fi
else
JAVACMD="java"
which java >/dev/null 2>&1 || die "ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH.
Please set the JAVA_HOME variable in your environment to match the
location of your Java installation."
fi
# Increase the maximum file descriptors if we can.
if [ "$cygwin" = "false" -a "$darwin" = "false" -a "$nonstop" = "false" ] ; then
MAX_FD_LIMIT=`ulimit -H -n`
if [ $? -eq 0 ] ; then
if [ "$MAX_FD" = "maximum" -o "$MAX_FD" = "max" ] ; then
MAX_FD="$MAX_FD_LIMIT"
fi
ulimit -n $MAX_FD
if [ $? -ne 0 ] ; then
warn "Could not set maximum file descriptor limit: $MAX_FD"
fi
else
warn "Could not query maximum file descriptor limit: $MAX_FD_LIMIT"
fi
fi
# For Darwin, add options to specify how the application appears in the dock
if $darwin; then
GRADLE_OPTS="$GRADLE_OPTS \"-Xdock:name=$APP_NAME\" \"-Xdock:icon=$APP_HOME/media/gradle.icns\""
fi
# For Cygwin or MSYS, switch paths to Windows format before running java
if [ "$cygwin" = "true" -o "$msys" = "true" ] ; then
APP_HOME=`cygpath --path --mixed "$APP_HOME"`
CLASSPATH=`cygpath --path --mixed "$CLASSPATH"`
JAVACMD=`cygpath --unix "$JAVACMD"`
# We build the pattern for arguments to be converted via cygpath
ROOTDIRSRAW=`find -L / -maxdepth 1 -mindepth 1 -type d 2>/dev/null`
SEP=""
for dir in $ROOTDIRSRAW ; do
ROOTDIRS="$ROOTDIRS$SEP$dir"
SEP="|"
done
OURCYGPATTERN="(^($ROOTDIRS))"
# Add a user-defined pattern to the cygpath arguments
if [ "$GRADLE_CYGPATTERN" != "" ] ; then
OURCYGPATTERN="$OURCYGPATTERN|($GRADLE_CYGPATTERN)"
fi
# Now convert the arguments - kludge to limit ourselves to /bin/sh
i=0
for arg in "$@" ; do
CHECK=`echo "$arg"|egrep -c "$OURCYGPATTERN" -`
CHECK2=`echo "$arg"|egrep -c "^-"` ### Determine if an option
if [ $CHECK -ne 0 ] && [ $CHECK2 -eq 0 ] ; then ### Added a condition
eval `echo args$i`=`cygpath --path --ignore --mixed "$arg"`
else
eval `echo args$i`="\"$arg\""
fi
i=`expr $i + 1`
done
case $i in
0) set -- ;;
1) set -- "$args0" ;;
2) set -- "$args0" "$args1" ;;
3) set -- "$args0" "$args1" "$args2" ;;
4) set -- "$args0" "$args1" "$args2" "$args3" ;;
5) set -- "$args0" "$args1" "$args2" "$args3" "$args4" ;;
6) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" ;;
7) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" ;;
8) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" "$args7" ;;
9) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" "$args7" "$args8" ;;
esac
fi
# Escape application args
save () {
for i do printf %s\\n "$i" | sed "s/'/'\\\\''/g;1s/^/'/;\$s/\$/' \\\\/" ; done
echo " "
}
APP_ARGS=`save "$@"`
# Collect all arguments for the java command, following the shell quoting and substitution rules
eval set -- $DEFAULT_JVM_OPTS $JAVA_OPTS $GRADLE_OPTS "\"-Dorg.gradle.appname=$APP_BASE_NAME\"" -classpath "\"$CLASSPATH\"" org.gradle.wrapper.GradleWrapperMain "$APP_ARGS"
exec "$JAVACMD" "$@"
@rem
@rem Copyright 2015 the original author or authors.
@rem
@rem Licensed under the Apache License, Version 2.0 (the "License");
@rem you may not use this file except in compliance with the License.
@rem You may obtain a copy of the License at
@rem
@rem https://www.apache.org/licenses/LICENSE-2.0
@rem
@rem Unless required by applicable law or agreed to in writing, software
@rem distributed under the License is distributed on an "AS IS" BASIS,
@rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@rem See the License for the specific language governing permissions and
@rem limitations under the License.
@rem
@if "%DEBUG%" == "" @echo off
@rem ##########################################################################
@rem
@rem Gradle startup script for Windows
@rem
@rem ##########################################################################
@rem Set local scope for the variables with windows NT shell
if "%OS%"=="Windows_NT" setlocal
set DIRNAME=%~dp0
if "%DIRNAME%" == "" set DIRNAME=.
set APP_BASE_NAME=%~n0
set APP_HOME=%DIRNAME%
@rem Resolve any "." and ".." in APP_HOME to make it shorter.
for %%i in ("%APP_HOME%") do set APP_HOME=%%~fi
@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script.
set DEFAULT_JVM_OPTS="-Xmx64m" "-Xms64m"
@rem Find java.exe
if defined JAVA_HOME goto findJavaFromJavaHome
set JAVA_EXE=java.exe
%JAVA_EXE% -version >NUL 2>&1
if "%ERRORLEVEL%" == "0" goto execute
echo.
echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH.
echo.
echo Please set the JAVA_HOME variable in your environment to match the
echo location of your Java installation.
goto fail
:findJavaFromJavaHome
set JAVA_HOME=%JAVA_HOME:"=%
set JAVA_EXE=%JAVA_HOME%/bin/java.exe
if exist "%JAVA_EXE%" goto execute
echo.
echo ERROR: JAVA_HOME is set to an invalid directory: %JAVA_HOME%
echo.
echo Please set the JAVA_HOME variable in your environment to match the
echo location of your Java installation.
goto fail
:execute
@rem Setup the command line
set CLASSPATH=%APP_HOME%\gradle\wrapper\gradle-wrapper.jar
@rem Execute Gradle
"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" org.gradle.wrapper.GradleWrapperMain %*
:end
@rem End local scope for the variables with windows NT shell
if "%ERRORLEVEL%"=="0" goto mainEnd
:fail
rem Set variable GRADLE_EXIT_CONSOLE if you need the _script_ return code instead of
rem the _cmd.exe /c_ return code!
if not "" == "%GRADLE_EXIT_CONSOLE%" exit 1
exit /b 1
:mainEnd
if "%OS%"=="Windows_NT" endlocal
:omega
rootProject.name = 'dankal-share-java-concurrent-util'
package cn.dankal.share.common;
/**
* 插入排序简单实现
*
* @author ZGH.MercyModest
* @version V1.0.0
* @create 2021-01-02
*/
public class InsertionSort {
/**
* 排序方式
*/
public enum Sort {
/**
* 倒序排序
*/
DESC,
/**
* 顺序排序
*/
ASC
}
/**
* 使用插入排序顺序排序目标数组
*
* @param unSortedArray 未排序的数组
* @return int[] 已经排序的数组
*/
public static int[] doAscSort(int[] unSortedArray) {
return doSort(unSortedArray, Sort.ASC);
}
/**
* 使用插入排序倒序排序目标数组
*
* @param unSortedArray 未排序的数组
* @return int[] 已经排序的数组
*/
public static int[] doDescSort(int[] unSortedArray) {
return doSort(unSortedArray, Sort.DESC);
}
/**
* int 数组排序
*
* @param unSortedArray 待排序的数组
* @param sortType 排序类型
* @return 排序好的数组
* @see Sort
*/
public static int[] doSort(int[] unSortedArray, Sort sortType) {
if (unSortedArray.length == 0) {
return unSortedArray;
}
// 默认是 顺序排序
boolean asc = true;
if (Sort.DESC.toString().equalsIgnoreCase(sortType.toString())) {
// 倒序排序
asc = false;
}
// 带排序元素
// 带排序元素前面的元素已经全部排序
int currentValue;
for (int i = 0; i < unSortedArray.length - 1; i++) {
// 已被排序数据的索引
int preIndex = i;
currentValue = unSortedArray[preIndex + 1];
//在已经排序好的元素面中寻找到当前元素合适的位置
while (preIndex >= 0 && (asc ? currentValue < unSortedArray[preIndex] : currentValue > unSortedArray[preIndex])) {
// 从已排序元素开始,将其与带排序元素逐一比较,以寻找带排序元素的合适位置
unSortedArray[preIndex + 1] = unSortedArray[preIndex];
preIndex--;
}
// while 循环结束时,说明已经找到了当前待排序数据的合适位置,插入
unSortedArray[preIndex + 1] = currentValue;
}
return unSortedArray;
}
}
package cn.dankal.share.common;
import cn.hutool.core.util.RandomUtil;
/**
* int 数组工厂
* <p>
* 生成指定长度的 int 数组 并使用随机数填充
* <br/>
* 随机数范围: array.length*3
* </p>
*
* @author ZGH.MercyModest
* @version V1.0.0
* @create 2021-01-02
*/
public class IntArrayFactory {
/**
* 生成的默认数组长度
*/
private final static Integer DEFAULT_ARRAY_LENGTH = 1000000;
/**
* 生成默认长度的 int 数组
*
* @return int []
* @see IntArrayFactory#DEFAULT_ARRAY_LENGTH
*/
public static int[] generateIntArray() {
return generateIntArray(DEFAULT_ARRAY_LENGTH);
}
/**
* 生成指定长度的数组
*
* @param arrayLength 生成长度为 <code>arrayLength</code> int 数组
* @return int []
*/
public static int[] generateIntArray(int arrayLength) {
if (arrayLength <= 0) {
arrayLength = DEFAULT_ARRAY_LENGTH;
}
int[] resultArray = new int[arrayLength];
for (int i = 0; i < resultArray.length; i++) {
resultArray[i] = RandomUtil.randomInt(0, arrayLength * 3);
}
return resultArray;
}
}
package cn.dankal.share.common;
import java.util.concurrent.TimeUnit;
/**
* int 数组 求和工具类
*
* @author ZGH.MercyModest
* @version V1.0.0
* @create 2021-01-02
*/
public class IntArraySumUtil {
/**
* 数组求和
*
* @param targetArray 目标数组
* @param fromIndex from index
* @param toIndex to index
* @return 计算结果
*/
public static int sumArray(int[] targetArray, int fromIndex, int toIndex) {
if (toIndex < fromIndex) {
throw new IllegalArgumentException("fromIndex 必须小于等于 toIndex");
}
if (targetArray.length <= toIndex) {
throw new IllegalArgumentException("toIndex 超出 数组长度范围");
}
int result = 0;
for (int i = fromIndex; i <= toIndex; i++) {
try {
// TODO 休眠 100 毫秒 便于区分不同计算方式和 fork/join 的区别
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
// ignore
}
result += targetArray[i];
}
return result;
}
}
package cn.dankal.share.common;
import java.util.Arrays;
/**
* 归并排序简单实现
*
* @author ZGH.MercyModest
* @version V1.0.0
* @create 2021-01-02
*/
public class MergeSort {
/**
* 数组拆分默认临界值
*/
public final static int DEFAULT_ARRAY_SPILT_THRESHOLD = 47;
/**
* 归并排序
*
* @param numArray 需要归并排序的数组
* @return 排序好后的数组
*/
public static int[] sort(int[] numArray) {
if (numArray.length <= DEFAULT_ARRAY_SPILT_THRESHOLD) {
return InsertionSort.doAscSort(numArray);
} else {
/*切分数组,然后递归调用*/
int mid = numArray.length / 2;
int[] left = Arrays.copyOfRange(numArray, 0, mid);
int[] right = Arrays.copyOfRange(numArray, mid, numArray.length);
return mergeSortedArray(sort(left), sort(right));
}
}
/**
* 将两个有序数组合并为一个有序数组(顺序排序)
*
* @param leftArray left array
* @param rightArray right array
* @return 排好序后合并的数组
*/
public static int[] mergeSortedArray(int[] leftArray, int[] rightArray) {
int[] result = new int[leftArray.length + rightArray.length];
for (int index = 0, i = 0, j = 0; index < result.length; index++) {
if (i >= leftArray.length) {
// 左边数组已经取完,完全取右边数组的值即可
result[index] = rightArray[j++];
} else if (j >= rightArray.length) {
// 右边数组已经取完,完全取左边数组的值即可
result[index] = leftArray[i++];
} else if (leftArray[i] > rightArray[j]) {
// 左边数组的元素值大于右边数组,取右边数组的值
result[index] = rightArray[j++];
} else {
// 右边数组的元素值大于左边数组,取左边数组的值
result[index] = leftArray[i++];
}
}
return result;
}
}
package cn.dankal.share.common;
import cn.hutool.core.util.StrUtil;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* ThreadExecutor 工厂
*
* @author ZGH.MercyModest
* @version V1.0.0
* @create 2021-01-02
*/
public class ThreadExecutorFactory {
/**
* 获取 ThreadPoolExecutor
*
* @param corePoolSize 核心线程数
* @param maximumPoolSize 最大线程数
* @param keepAliveTime 超过核心线程数的线程存活时间
* @param timeUnit 超过核心线程数的线程存活时间单位
* @param threadNamePrefix 创建线程的名称前缀
* @param blockQueueCapacity 线程池阻塞队列的容量
* @return ThreadPoolExecutor
* @see ThreadPoolExecutor
*/
public static ThreadPoolExecutor getThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit timeUnit,
String threadNamePrefix,
int blockQueueCapacity) {
// 参数校验
if (corePoolSize <= 0) {
throw new IllegalArgumentException("corePoolSize 必须大于0");
}
if (maximumPoolSize <= 0) {
throw new IllegalArgumentException("maximumPoolSize 必须大于0");
}
if (keepAliveTime < 0) {
throw new IllegalArgumentException("keepAliveTime 必须大于0");
}
if (null == timeUnit) {
throw new IllegalArgumentException("timeUnit 不能为空");
}
if (corePoolSize > maximumPoolSize) {
throw new IllegalArgumentException("corePoolSize 必须小于等于 maximumPoolSize");
}
if (StrUtil.isBlank(threadNamePrefix)) {
throw new IllegalArgumentException("threadNamePrefix 不能为空");
}
if (blockQueueCapacity <= 0) {
throw new IllegalArgumentException("blockQueueCapacity 必须大于0");
}
return new ThreadPoolExecutor(
corePoolSize,
maximumPoolSize,
keepAliveTime,
timeUnit,
new ArrayBlockingQueue<>(blockQueueCapacity),
threadFactory(threadNamePrefix)
);
}
/**
* ThreadFactory 简答实现 --> 为线程池线程指定名称
*
* @param threadNamePrefix 线程池线程名称前缀
* @return ThreadFactory
*/
private static ThreadFactory threadFactory(final String threadNamePrefix) {
return (runnable -> {
Thread thread = new Thread(runnable);
long id = thread.getId();
thread.setName(threadNamePrefix + " - " + id);
return thread;
});
}
/**
* 获取一个 <code>ThreadPoolExecutor</code>
* <p>
* coreSize: 6
* <br/>
* maximumPoolSize: 10
* <br/>
* KeepAliveTime: 10
* <br/>
* unit: TimeUnit.MILLISECONDS
* <br/>
* blockQueueCapacity: 100
* <br/>
* </p>
*
* @param threadNamePrefix <code>threadNamePrefix</code>
* @return ThreadPoolExecutor
*/
public static ThreadPoolExecutor getThreadPoolExecutor(final String threadNamePrefix) {
return ThreadExecutorFactory.getThreadPoolExecutor(6,
10,
10,
TimeUnit.MILLISECONDS,
threadNamePrefix,
100);
}
}
package cn.dankal.share.concurrentutil;
import cn.dankal.share.common.ThreadExecutorFactory;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadPoolExecutor;
/**
* CountDownLatch 简单使用测试
*
* @author ZGH.MercyModest
* @version V1.0.0
* @create 2021-01-02
*/
public class CountDownLatchTest {
public static void main(String[] args) {
ThreadPoolExecutor threadPoolExecutor = ThreadExecutorFactory.getThreadPoolExecutor("countDownLatch-test");
try {
/*
情景小析:
我们需要做这样一件事情:
target: 获取商品详情
- 获取商品信息 耗时 50 毫秒
- 获取商品价格 耗时 10 毫秒
- 商品属性获取并归并(Sku/Spu) 耗时 65 毫秒
- 假设三个子任务之间没有依赖关系
*/
// target: 获取商品详细
CountDownLatch countDownLatch = new CountDownLatch(3);
final long startTime = System.currentTimeMillis();
// 获取商品信息 50 毫秒
final long getProductDetailElapsed = 50;
Worker getProductDetailWorker = new Worker("获取商品信息", getProductDetailElapsed, countDownLatch);
threadPoolExecutor.execute(getProductDetailWorker);
// 获取商品价格 10 毫秒
final long getPriceElapsed = 10;
Worker getProductPrice = new Worker("获取商品价格", getPriceElapsed, countDownLatch);
threadPoolExecutor.execute(getProductPrice);
//商品属性获取并归并 65毫秒
final long productPropertiesReduceElapsed = 65;
Worker productPropertiesReduceWorker = new Worker("商品属性获取并归并", productPropertiesReduceElapsed, countDownLatch);
threadPoolExecutor.execute(productPropertiesReduceWorker);
try {
System.out.printf("%s 线程即将开始等待 ... ...%n", Thread.currentThread().getName());
countDownLatch.await();
} catch (InterruptedException e) {
// ignored
}
System.out.println("接口返回~");
System.out.println("接口放回理论耗时: " + (getProductDetailElapsed + getPriceElapsed + productPropertiesReduceElapsed) + " 毫秒");
System.out.println("接口放回实际耗时: " + (System.currentTimeMillis() - startTime) + " 毫秒");
} finally {
threadPoolExecutor.shutdown();
}
}
}
package cn.dankal.share.concurrentutil;
import cn.dankal.share.common.ThreadExecutorFactory;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ThreadPoolExecutor;
/**
* CyclicBarrier 简单使用测试
*
* @author ZGH.MercyModest
* @version V1.0.0
* @create 2021-01-02
*/
public class CyclicBarrierTest {
public static void main(String[] args) {
ThreadPoolExecutor threadPoolExecutor = ThreadExecutorFactory.getThreadPoolExecutor("cyclicBarrier-test");
try {
/*
情景小析:
我们需要做这样一件事情:
target: 获取商品详情
- 获取商品信息 耗时 50 毫秒
- 获取商品价格 耗时 10 毫秒
- 商品属性获取并归并(Sku/Spu) 耗时 65 毫秒
- 假设三个子任务之间没有依赖关系
*/
// target: 获取商品详细
CyclicBarrier cyclicBarrier = new CyclicBarrier(4);
final long startTime = System.currentTimeMillis();
// 获取商品信息 50 毫秒
final long getProductDetailElapsed = 50;
Worker getProductDetailWorker = new Worker("获取商品信息", getProductDetailElapsed, cyclicBarrier);
threadPoolExecutor.execute(getProductDetailWorker);
// 获取商品价格 10 毫秒
final long getPriceElapsed = 10;
Worker getProductPrice = new Worker("获取商品价格", getPriceElapsed, cyclicBarrier);
threadPoolExecutor.execute(getProductPrice);
//商品属性获取并归并 65毫秒
final long productPropertiesReduceElapsed = 65;
Worker productPropertiesReduceWorker = new Worker("商品属性获取并归并", productPropertiesReduceElapsed, cyclicBarrier);
threadPoolExecutor.execute(productPropertiesReduceWorker);
try {
System.out.printf("%s 线程即将开始等待 ... ...%n", Thread.currentThread().getName());
cyclicBarrier.await();
} catch (Exception e) {
// ignored
}
System.out.println("接口返回~");
System.out.println("接口放回理论耗时: " + (getProductDetailElapsed + getPriceElapsed + productPropertiesReduceElapsed) + " 毫秒");
System.out.println("接口放回实际耗时: " + (System.currentTimeMillis() - startTime) + " 毫秒");
} finally {
threadPoolExecutor.shutdown();
}
}
}
package cn.dankal.share.concurrentutil;
import cn.dankal.share.common.ThreadExecutorFactory;
import java.util.HashMap;
import java.util.UUID;
import java.util.concurrent.Exchanger;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* {@link Exchanger} 的简单使用
*
* @author ZGH.MercyModest
* @version V1.0.0
* @create 2021-01-02
*/
public class ExchangerTest {
public static void main(String[] args) {
/*
应用场景:
- 在两个线程之间进行数据交换
- 流水线式数据处理
- 线程一 处理完数据之后,将数据交给 线程二继续处理
*/
ThreadPoolExecutor threadPoolExecutor = ThreadExecutorFactory.getThreadPoolExecutor("exchanger-test");
Exchanger<HashMap<String, Object>> mapExchanger = new Exchanger<>();
try {
// 线程一 先处理
threadPoolExecutor.execute(() -> {
final HashMap<String, Object> dataMap = new HashMap<>(1 << 5);
dataMap.put(Thread.currentThread().getName(), UUID.randomUUID().toString().replace("-", ""));
System.out.println(Thread.currentThread().getName() + " 开始处理数据");
try {
TimeUnit.SECONDS.sleep(1);
System.out.printf("%s 线程 原始数据: %s%n", Thread.currentThread().getName(), dataMap);
System.out.println(Thread.currentThread().getName() + " 完成数据处理,即将进行数据传递");
HashMap<String, Object> exchangeResult = mapExchanger.exchange(dataMap);
System.out.printf("%s 获取交换后的结果 %s%n", Thread.currentThread().getName(), exchangeResult);
} catch (InterruptedException e) {
// ignored
}
});
// 线程二 继续处理
threadPoolExecutor.execute(() -> {
System.out.println(Thread.currentThread().getName() + " 开始处理数据");
try {
HashMap<String, Object> emptyMap = new HashMap<>(1);
TimeUnit.SECONDS.sleep(1);
System.out.printf("%s 线程 原始数据: %s%n", Thread.currentThread().getName(), emptyMap);
System.out.println(Thread.currentThread().getName() + " 完成数据处理,即将进行数据传递");
HashMap<String, Object> exchangeResult = mapExchanger.exchange(emptyMap);
System.out.printf("%s 获取交换后的结果 %s%n", Thread.currentThread().getName(), exchangeResult);
} catch (InterruptedException e) {
// ignored
}
});
} finally {
threadPoolExecutor.shutdown();
}
}
}
package cn.dankal.share.concurrentutil;
import cn.dankal.share.common.ThreadExecutorFactory;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadPoolExecutor;
/**
* Semaphore 简单使用测试
*
* @author ZGH.MercyModest
* @version V1.0.0
* @create 2021-01-02
*/
public class SemaphoreTest {
public static void main(String[] args) {
ThreadPoolExecutor threadPoolExecutor = ThreadExecutorFactory.getThreadPoolExecutor("semaphore-test");
// semaphore: 令牌桶只有 3 个令牌
Semaphore semaphore = new Semaphore(3);
// 参与抢令牌的线程数
final int count = 10;
// 确保统计顺序执行完(成功/失败统计)
CountDownLatch successAndFairCountDownLatch = new CountDownLatch(count);
TokenBucket tokenBucket = new TokenBucket(semaphore, successAndFairCountDownLatch);
try {
for (int i = 0; i < count; i++) {
threadPoolExecutor.execute(tokenBucket::getToken);
}
successAndFairCountDownLatch.await();
System.out.println("参与线程数: " + count);
System.out.println("成功线程数:" + tokenBucket.successCount());
System.out.println("失败线程数:" + tokenBucket.fairCount());
} catch (InterruptedException e) {
// ignored
} finally {
threadPoolExecutor.shutdown();
}
}
}
package cn.dankal.share.concurrentutil;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
* token Bucket 简单模拟
* <p>
* 基于 {@link Semaphore} 简单实现
* </p>
*
* @author ZGH.MercyModest
* @version V1.0.0
* @create 2021-01-02
*/
public class TokenBucket {
/**
* <code>Semaphore</code>
*/
private final Semaphore semaphore;
/**
* <code>CountDownLatch</code>
*/
private final CountDownLatch countDownLatch;
/**
* 令牌统计
*/
private final AtomicInteger tokenCounter = new AtomicInteger(0);
/**
* 成功线程数统计
*/
private final AtomicInteger successCounter = new AtomicInteger(0);
/**
* 失败线程数统计
*/
private final AtomicInteger fairCounter = new AtomicInteger(0);
public TokenBucket(Semaphore semaphore, CountDownLatch countDownLatch1) {
this.semaphore = semaphore;
this.countDownLatch = countDownLatch1;
}
/**
* 获取令牌
*/
public void getToken() {
try {
final long getTokenTimeout = 5;
if (semaphore.tryAcquire(getTokenTimeout, TimeUnit.MILLISECONDS)) {
// 成功获取到信号量: 获取令牌成功
// 保证 tryAcquire 超时
int tokenNum = tokenCounter.incrementAndGet();
TimeUnit.SECONDS.sleep(1);
System.out.printf("%s 成功获取到 %d 号令牌 %n", Thread.currentThread().getName(), tokenNum);
successCounter.incrementAndGet();
} else {
// 获取令牌失败
System.out.printf("%s 未获取到令牌 %n", Thread.currentThread().getName());
fairCounter.incrementAndGet();
}
} catch (InterruptedException e) {
// ignored
} finally {
// 成功/失败统计计数扣减
this.countDownLatch.countDown();
}
}
/**
* 成功获取令桶统计
*
* @return 成功获取令牌统计
*/
public int successCount() {
return successCounter.get();
}
/**
* 获取令牌失败统计
*
* @return 未成功获取令牌统计
*/
public int fairCount() {
return fairCounter.get();
}
}
package cn.dankal.share.concurrentutil;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
/**
* {@link Runnable}
* Worker 简单模拟
* <p>
* 用于测试 {@link CountDownLatch} 和 {@link CyclicBarrier} 的简单使用
* </p>
*
* @author ZGH.MercyModest
* @version V1.0.0
* @create 2021-01-02
*/
public class Worker implements Runnable {
/**
* 工作内容
*/
private final String workContent;
/**
* 工作消耗时间:单位 second
*/
private final long workTimeInMilliSecond;
/**
* <code>CountDownLatch</code>
*/
private CountDownLatch countDownLatch;
/**
* <code>CyclicBarrier</code>
*/
private CyclicBarrier cyclicBarrier;
public Worker(String workContent, long workTimeInMilliSecond, CountDownLatch countDownLatch) {
this.workContent = workContent;
this.workTimeInMilliSecond = workTimeInMilliSecond;
this.countDownLatch = countDownLatch;
}
public Worker(String workContent, long workTimeInMilliSecond, CyclicBarrier cyclicBarrier) {
this.workContent = workContent;
this.workTimeInMilliSecond = workTimeInMilliSecond;
this.cyclicBarrier = cyclicBarrier;
}
@Override
public void run() {
// 工作耗时
try {
TimeUnit.MILLISECONDS.sleep(workTimeInMilliSecond);
System.out.printf("%s 正在: %s 预计耗时 %d 毫秒 %n", Thread.currentThread().getName(), workContent, workTimeInMilliSecond);
} catch (InterruptedException e) {
// ignored
} finally {
if (null != this.countDownLatch) {
// 计数扣减
this.countDownLatch.countDown();
} else if (null != cyclicBarrier) {
try {
// 等待全部执行完成
this.cyclicBarrier.await();
} catch (Exception e) {
// ignored
}
}
}
}
}
package cn.dankal.share.forkjoin;
import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.io.FileUtil;
import cn.hutool.core.util.ArrayUtil;
import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.RecursiveAction;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 基于 {@link RecursiveAction} (ForkJoin)实现 文件查找与简单统计
*
* @author ZGH.MercyModest
* @version V1.0.0
* @create 2021-01-02
*/
public class FindFileTask extends RecursiveAction {
/**
* 目标文件夹
*/
private final String targetFolder;
/**
* 目标文件后缀
*/
private final String findFileSuffix;
/**
* 目标文件计数器
*/
public final static AtomicInteger COUNTER_TARGET_FILE = new AtomicInteger(0);
/**
* 遍历目录计数器
*/
public final static AtomicInteger COUNTER_EACH_FOLDER = new AtomicInteger(0);
public FindFileTask(String targetFolder, String findFileSuffix) {
this.targetFolder = targetFolder;
this.findFileSuffix = findFileSuffix;
}
@Override
protected void compute() {
// 参数校验
if (!FileUtil.exist(targetFolder) || FileUtil.isFile(targetFolder)) {
throw new IllegalArgumentException("targetFolder 必须是存在的 文件夹");
}
File targetFile = FileUtil.file(targetFolder);
File[] files = targetFile.listFiles();
if (ArrayUtil.isEmpty(files)) {
// 空文件夹直接返回
return;
}
// 需要执行 Task List
List<FindFileTask> subFindFileTask = new ArrayList<>(1 << 5);
for (File file : files) {
if (file.isDirectory()) {
// 文件夹
FindFileTask findFileTask = new FindFileTask(file.getAbsolutePath(), findFileSuffix);
subFindFileTask.add(findFileTask);
COUNTER_EACH_FOLDER.incrementAndGet();
} else {
// 文件
String fileSuffix = FileUtil.getSuffix(file);
if (("." + fileSuffix).equalsIgnoreCase(findFileSuffix)) {
// 是目标文件
System.out.println("find: " + file.getName());
COUNTER_TARGET_FILE.incrementAndGet();
}
}
}
if (CollectionUtil.isNotEmpty(subFindFileTask)) {
// 执行所有拆分的 task 如果需要的话
for (FindFileTask findFileTask : invokeAll(subFindFileTask)) {
// 因为是拆分 即使我们不需要获取拆分结果也需要 join 切切
findFileTask.join();
}
}
}
}
package cn.dankal.share.forkjoin;
import cn.dankal.share.common.InsertionSort;
import cn.dankal.share.common.MergeSort;
import java.util.Arrays;
import java.util.concurrent.RecursiveTask;
/**
* 基于 {@link RecursiveTask} (ForkJoin) 简单实现 数组排序(归并排序)
*
* @author ZGH.MercyModest
* @version V1.0.0
* @create 2021-01-02
*/
public class IntArraySortTask extends RecursiveTask<int[]> {
/**
* 需要排序的数组
*/
private final int[] intArray;
/**
* 数组拆分临界值(length)
*/
private final int thresholdLength;
public IntArraySortTask(int[] intArray, int thresholdLength) {
this.intArray = intArray;
this.thresholdLength = thresholdLength;
}
@Override
protected int[] compute() {
if (intArray.length <= thresholdLength) {
// 达到拆分临界 直接排序
return InsertionSort.doAscSort(intArray);
} else {
// formIndex ... middle ... toIndex
// 任务拆分 将排序任务拆分成两个或更多
// 此处演示 拆分为 左右两个任务
int middle = intArray.length / 2;
// 将目标数组从 “中间”拆分
int[] leftIntArray = Arrays.copyOfRange(intArray, 0, middle);
int[] rightIntArray = Arrays.copyOfRange(intArray, middle, intArray.length);
// 拆分后,分为 左,右两个子Task
IntArraySortTask leftSortTask = new IntArraySortTask(leftIntArray, thresholdLength);
IntArraySortTask rightSortTask = new IntArraySortTask(rightIntArray, thresholdLength);
//执行所有 Task
invokeAll(leftSortTask, rightSortTask);
// 归并 left 的结果
int[] leftArray = leftSortTask.join();
// 归并 right 的结果
int[] rightArray = rightSortTask.join();
// 合并 归并结果
return MergeSort.mergeSortedArray(leftArray, rightArray);
}
}
}
package cn.dankal.share.forkjoin;
import cn.dankal.share.common.IntArraySumUtil;
import java.util.concurrent.RecursiveTask;
/**
* 基于 {@link RecursiveTask} 简单实现数组求和
* <p>
* 简单测试对比,展示 ForkJoin 的优势
* </p>
*
* @author ZGH.MercyModest
* @version V1.0.0
* @create 2021-01-02
*/
public class IntArraySumTask extends RecursiveTask<Integer> {
/**
* 任务拆分临界值
*/
private final int thresholdLength;
/**
* 求和目标数组
*/
private final int[] intArray;
/**
* fromIndex
*/
private final int fromIndex;
/**
* toIndex
*/
private final int toIndex;
public IntArraySumTask(int thresholdLength, int[] intArray, int fromIndex, int toIndex) {
this.thresholdLength = thresholdLength;
this.intArray = intArray;
this.fromIndex = fromIndex;
this.toIndex = toIndex;
}
@Override
protected Integer compute() {
if (toIndex - fromIndex <= thresholdLength) {
return IntArraySumUtil.sumArray(intArray, fromIndex, toIndex);
} else {
//拆分成两个子 task 任务
int middleIndex = (fromIndex + toIndex) / 2;
IntArraySumTask leftSumTask = new IntArraySumTask(thresholdLength, intArray, fromIndex, middleIndex);
IntArraySumTask rightSumTask = new IntArraySumTask(thresholdLength, intArray, middleIndex + 1, toIndex);
invokeAll(leftSumTask, rightSumTask);
// 归并拆分执行结果并返回
return leftSumTask.join() + rightSumTask.join();
}
}
}
package cn.dankal.share.util;
import cn.dankal.share.common.IntArrayFactory;
import cn.dankal.share.common.IntArraySumUtil;
import cn.dankal.share.common.MergeSort;
import cn.dankal.share.forkjoin.FindFileTask;
import cn.dankal.share.forkjoin.IntArraySortTask;
import cn.dankal.share.forkjoin.IntArraySumTask;
import cn.hutool.core.util.RandomUtil;
import jdk.nashorn.internal.ir.annotations.Ignore;
import org.junit.jupiter.api.Test;
import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
/**
* Fork/Join 相关测试类
*
* @author ZGH.MercyModest
* @version V1.0.0
* @create 2021-01-02
*/
public class ForkJoinTest {
/**
* 测试 Fork/Join 实现的 归并排序
*/
@Test
public void testMergeSortByForkJoin() {
int[] generateIntArray = IntArrayFactory.generateIntArray(25);
System.out.println("排序前:");
System.out.println(Arrays.toString(generateIntArray));
System.out.println("========================");
ForkJoinPool forkJoinPool = new ForkJoinPool();
IntArraySortTask intArraySortTask = new IntArraySortTask(generateIntArray, 2);
int[] invokeResult = forkJoinPool.invoke(intArraySortTask);
System.out.println("排序后:");
System.out.println(Arrays.toString(invokeResult));
System.out.println("========================");
}
/**
* 测试 Fork/Join 实现 数组求和
*/
@Test
@Ignore
public void testArraySortByForkJoin() {
int[] generateIntArray = IntArrayFactory.generateIntArray(50);
IntArraySumTask intArraySumTask = new IntArraySumTask(10, generateIntArray, 0, generateIntArray.length - 1);
ForkJoinPool forkJoinPool = new ForkJoinPool();
Integer invokeResult = forkJoinPool.invoke(intArraySumTask);
System.out.println(invokeResult);
}
/**
* int 求和
*
* @param left 被加数
* @param right 加数
* @return 和
*/
private static int intSum(int left, int right) {
try {
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
// ignore
}
return left + right;
}
/**
* 测试 归并排序对比
*/
@Test
public void testMergeSortThreeWays() {
int[] generateIntArray = IntArrayFactory.generateIntArray(3000000);
System.out.println("数组元素: " + generateIntArray.length);
// general merge sort
final long generalMergeSortStart = System.currentTimeMillis();
MergeSort.sort(generateIntArray);
System.out.println("运行耗时: " + (System.currentTimeMillis() - generalMergeSortStart) + " 毫秒 (general mergeSort)");
// jdk utils: Arrays#sort
final long arraySortStartMs = System.currentTimeMillis();
// DualPivotQuicksort 快排
Arrays.sort(generateIntArray);
System.out.println("运行耗时: " + (System.currentTimeMillis() - arraySortStartMs) + " 毫秒 (Arrays#sort)");
// Fork/Join merge sort
ForkJoinPool forkJoinPool = new ForkJoinPool();
IntArraySortTask intArraySortTask = new IntArraySortTask(generateIntArray, 300);
final long forkJoinStartMs = System.currentTimeMillis();
forkJoinPool.invoke(intArraySortTask);
System.out.println("运行耗时: " + (System.currentTimeMillis() - forkJoinStartMs) + " 毫秒 (Fork/Join mergeSort)");
}
/**
* 测试: Fork/Join的优势
*/
@Test
public void testAdvantageOfForkJoin() {
Map<String, Long> expenditureTimeMap = new LinkedHashMap<>(1 << 4);
final String streamDataKey = "java8 stream";
final String forkJoinDataKey = "fork join";
final String generalSumDataKey = "general sum";
final int count = 10;
for (int i = 0; i < count; i++) {
int[] generateIntArray = IntArrayFactory.generateIntArray(RandomUtil.randomInt(50, 100));
System.out.printf("目标数组 length: %d%n ", generateIntArray.length);
System.out.println(Arrays.toString(generateIntArray));
System.out.printf("============= 第 %d 次计算 =============%n", (i + 1));
final long streamStartMs = System.currentTimeMillis();
int result = Arrays.stream(generateIntArray)
.parallel()
.reduce(ForkJoinTest::intSum)
.orElse(-1);
long streamExpenditure = System.currentTimeMillis() - streamStartMs;
System.out.println("(java8 stream) 运行结果: " + result);
System.out.println("(java8 stream) 运行耗时: " + streamExpenditure + " 毫秒\n");
// 将 java8 stream 每次执行时间存储在 LinkedHashMap 中
if (!expenditureTimeMap.containsKey(streamDataKey)) {
expenditureTimeMap.put(streamDataKey, streamExpenditure);
} else {
Long oldExpenditure = expenditureTimeMap.get(streamDataKey);
expenditureTimeMap.put(streamDataKey, oldExpenditure + streamExpenditure);
}
IntArraySumTask intArraySumTask = new IntArraySumTask(10, generateIntArray, 0, generateIntArray.length - 1);
ForkJoinPool forkJoinPool = new ForkJoinPool();
final long forkJoinStartMs = System.currentTimeMillis();
Integer invokeResult = forkJoinPool.invoke(intArraySumTask);
long forkJoinExpenditure = System.currentTimeMillis() - forkJoinStartMs;
System.out.println("(fork/join) 运行结果: " + invokeResult);
System.out.println("(fork/join) 运行耗时: " + forkJoinExpenditure + " 毫秒\n");
// 将 fork/join 每次执行时间存储在 LinkedHashMap 中
if (!expenditureTimeMap.containsKey(forkJoinDataKey)) {
expenditureTimeMap.put(forkJoinDataKey, forkJoinExpenditure);
} else {
Long oldExpenditure = expenditureTimeMap.get(forkJoinDataKey);
expenditureTimeMap.put(forkJoinDataKey, oldExpenditure + forkJoinExpenditure);
}
final long generalStartMs = System.currentTimeMillis();
int sum = IntArraySumUtil.sumArray(generateIntArray, 0, generateIntArray.length - 1);
long generalSumExpenditure = System.currentTimeMillis() - generalStartMs;
System.out.println("(general sum) 运行结果: " + sum);
System.out.println("(general sum) 运行耗时: " + generalSumExpenditure + " 毫秒");
// 将 general sum 每次执行时间存储在 LinkedHashMap 中
if (!expenditureTimeMap.containsKey(generalSumDataKey)) {
expenditureTimeMap.put(generalSumDataKey, generalSumExpenditure);
} else {
Long oldExpenditure = expenditureTimeMap.get(generalSumDataKey);
expenditureTimeMap.put(generalSumDataKey, oldExpenditure + generalSumExpenditure);
}
System.out.println("=============================\n");
}
System.out.println("平均耗时");
for (String key : expenditureTimeMap.keySet()) {
System.out.printf("%s 运行 %d 次,平均耗时 %d 毫秒%n", key, count, expenditureTimeMap.get(key) / count);
}
}
/**
* 测试: Fork/Join RecursiveAction: 目标文件查找
*/
@Test
public void testFindFileByRecursiveAction() {
final String targetFolder = "C:/dev/MavenRepository";
final String findFileSuffix = ".lastUpdated";
FindFileTask findFileTask = new FindFileTask(targetFolder, findFileSuffix);
ForkJoinPool forkJoinPool = new ForkJoinPool();
forkJoinPool.invoke(findFileTask);
System.out.printf("共遍历 : %d 个文件夹%n", FindFileTask.COUNTER_EACH_FOLDER.get());
System.out.printf("找到目标文件 %s : %d 个%n", findFileSuffix, FindFileTask.COUNTER_TARGET_FILE.get());
}
}
package cn.dankal.share.util;
import cn.dankal.share.common.InsertionSort;
import cn.dankal.share.common.IntArrayFactory;
import cn.dankal.share.common.MergeSort;
import org.junit.jupiter.api.Test;
import java.util.Arrays;
/**
* Util 工具类测试类
*
* @author ZGH.MercyModest
* @version V1.0.0
* @create 2021-01-02
*/
public class UtilMainTest {
/**
* 测试 插入排序
*/
@Test
public void testInsertSort() {
int[] generateIntArray = IntArrayFactory.generateIntArray(20);
System.out.println("排序前:");
System.out.println(Arrays.toString(generateIntArray));
System.out.println("========================");
InsertionSort.doAscSort(generateIntArray);
System.out.println("顺序排序后:");
System.out.println(Arrays.toString(generateIntArray));
System.out.println("倒序排序后:");
InsertionSort.doDescSort(generateIntArray);
System.out.println(Arrays.toString(generateIntArray));
System.out.println("========================");
}
/**
* 测试 MergeSort
*/
@Test
public void testMergeSort() {
int[] generateIntArray = IntArrayFactory.generateIntArray(25);
System.out.println("排序前:");
System.out.println(Arrays.toString(generateIntArray));
System.out.println("========================");
MergeSort.sort(generateIntArray);
System.out.println("排序后:");
System.out.println(Arrays.toString(generateIntArray));
System.out.println("========================");
}
}
package cn.dankal.share.util.completablefuture;
import cn.dankal.share.common.ThreadExecutorFactory;
import cn.hutool.core.util.RandomUtil;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Test;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* @author ZGH.MercyModest
* @version V1.0.0
* @create 2021-01-02
*/
public class CompletableFutureAuxiliaryMethodTest {
/**
* ThreadPoolExecutor
*/
private final static ThreadPoolExecutor THREAD_POOL_EXECUTOR = ThreadExecutorFactory.getThreadPoolExecutor("completableFuture-result-get-test");
/**
* 测试 CompletableFuture:allOf(CompletableFuture<?>... cfs)
*/
@Test
public void testAllOf() {
Runnable runnable = () -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
// ignored
}
System.out.printf("线程 %s 完成工作%n", Thread.currentThread().getName());
};
CompletableFuture<Void> completableFuture01 = CompletableFuture.runAsync(runnable);
CompletableFuture<Void> completableFuture02 = CompletableFuture.runAsync(runnable);
CompletableFuture<Void> completableFuture03 = CompletableFuture.runAsync(runnable);
CompletableFuture<Void> completableFuture04 = CompletableFuture.runAsync(runnable);
CompletableFuture<Void> completableFuture05 = CompletableFuture.runAsync(runnable);
System.out.println("all completableFuture is starting the work or will start work");
CompletableFuture.allOf(completableFuture01, completableFuture02, completableFuture03, completableFuture04, completableFuture05)
.join();
System.out.println("completableFuture all finished the work");
}
/**
* 测试 CompletableFuture:anyOf(CompletableFuture<?>... cfs)
*/
@Test
public void testAnyOf() {
Runnable runnable = () -> {
try {
TimeUnit.SECONDS.sleep(RandomUtil.randomInt(1,10));
} catch (InterruptedException e) {
// ignored
}
System.out.printf("线程 %s 完成工作%n", Thread.currentThread().getName());
};
CompletableFuture<Void> completableFuture01 = CompletableFuture.runAsync(runnable);
CompletableFuture<Void> completableFuture02 = CompletableFuture.runAsync(runnable);
CompletableFuture<Void> completableFuture03 = CompletableFuture.runAsync(runnable);
CompletableFuture<Void> completableFuture04 = CompletableFuture.runAsync(runnable);
CompletableFuture<Void> completableFuture05 = CompletableFuture.runAsync(runnable);
System.out.println("all completableFuture is starting the work or will start work");
CompletableFuture.anyOf(completableFuture01, completableFuture02, completableFuture03, completableFuture04, completableFuture05)
.join();
System.out.println("completableFuture all finished the work");
}
/**
* 关闭 线程池资源
*/
@AfterAll
public static void after() {
THREAD_POOL_EXECUTOR.shutdown();
}
}
package cn.dankal.share.util.completablefuture;
import cn.dankal.share.common.ThreadExecutorFactory;
import cn.hutool.core.util.RandomUtil;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Test;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* @author ZGH.MercyModest
* @version V1.0.0
* @create 2021-01-02
*/
public class CompletableFutureResultGetTest {
/**
* ThreadPoolExecutor
*/
private final static ThreadPoolExecutor THREAD_POOL_EXECUTOR = ThreadExecutorFactory.getThreadPoolExecutor("completableFuture-result-get-test");
/**
* completableFuture 结果获取: get()
*/
@Test
public void testCompletableFutureGet() {
// 使用 get() 获取 completableFuture 运行结果
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
int result = RandomUtil.randomInt(1, 100);
System.out.printf("线程 %s 已生产随机数 %d%n", Thread.currentThread().getName(), result);
return result;
}, THREAD_POOL_EXECUTOR);
try {
System.out.printf("%s 线程 即将开始获取 completableFuture 结果%n", Thread.currentThread().getName());
Integer result = completableFuture.get();
System.out.println(result);
} catch (Exception exception) {
exception.printStackTrace();
}
}
/**
* completableFuture 结果获取: get(long timeout, TimeUnit unit)
*/
@Test
public void testCompletableFutureGetTimeout() {
// 使用 get() 获取 completableFuture 运行结果
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
int result = RandomUtil.randomInt(1, 100);
System.out.printf("线程 %s 已生产随机数 %d%n", Thread.currentThread().getName(), result);
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
// ignored
}
return result;
}, THREAD_POOL_EXECUTOR);
try {
System.out.printf("%s 线程 即将开始获取 completableFuture 结果%n", Thread.currentThread().getName());
Integer result = completableFuture.get(100, TimeUnit.MILLISECONDS);
System.out.println(result);
} catch (Exception exception) {
System.out.println(exception.getMessage());
}
}
/**
* completableFuture 结果获取: getNow(T valueIfAbsent)
*/
@Test
public void testCompletableFutureGetNow() {
// 使用 get() 获取 completableFuture 运行结果
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
int result = RandomUtil.randomInt(1, 100);
System.out.printf("线程 %s 已生产随机数 %d%n", Thread.currentThread().getName(), result);
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
// ignored
}
return result;
}, THREAD_POOL_EXECUTOR);
try {
System.out.printf("%s 线程 即将开始获取 completableFuture 结果%n", Thread.currentThread().getName());
Integer result = completableFuture.getNow(-999);
System.out.println(result);
} catch (Exception exception) {
System.out.println(exception.getMessage());
}
}
/**
* completableFuture 结果获取: join()
*/
@Test
public void testCompletableFutureJoin() {
// 使用 get() 获取 completableFuture 运行结果
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
int result = RandomUtil.randomInt(1, 100);
System.out.printf("线程 %s 已生产随机数 %d%n", Thread.currentThread().getName(), result);
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
// ignored
}
return result;
}, THREAD_POOL_EXECUTOR);
try {
System.out.printf("%s 线程 即将开始获取 completableFuture 结果%n", Thread.currentThread().getName());
Integer result = completableFuture.join();
System.out.println(result);
} catch (Exception exception) {
System.out.println(exception.getMessage());
}
}
/**
* 关闭 线程池资源
*/
@AfterAll
public static void after() {
THREAD_POOL_EXECUTOR.shutdown();
}
}
package cn.dankal.share.util.completablefuture;
import cn.dankal.share.common.ThreadExecutorFactory;
import org.junit.jupiter.api.Test;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
/**
* <p>
* <code>CompletableFuture</code> 测试
* </p>
*
* @author ZGH.MercyModest
* @version V1.0.0
* @create 2021-01-22
*/
public class CompletableFutureTest {
/*
需求:
- 获取商品信息 (A) 耗时 120 毫秒
- 获取商品价格 (B) 耗时 95 毫秒
- 商品属性获取并归并(Sku/Spu) (C) 耗时 165 毫秒
重要: 我们只有获取到了 '商品信息' 才能执行 '商品属性获取并归并',即 C 必须在 A 之后执行
*/
/**
* <code>CompletableFuture</code> 测试
*/
@Test
public void test() throws Exception {
ThreadPoolExecutor threadPoolExecutor = ThreadExecutorFactory.getThreadPoolExecutor("dankal-test-completableFuture");
// 获取商品信息 耗时: 120 毫秒
final long getProductElapsed = 120L;
Supplier<Integer> getProductSupplier = () -> {
try {
// do work
TimeUnit.MILLISECONDS.sleep(getProductElapsed);
} catch (InterruptedException e) {
// ignored
}
// 返回商品信息成功: 200
System.out.printf("%s 获取商品信息成功,即将返回%n", Thread.currentThread().getName());
return 200;
};
// 获取商品价格 耗时 95 毫秒
final long getProductPriceElapsed = 95L;
Supplier<Double> getProductPriceSupplier = () -> {
try {
// do work
TimeUnit.MILLISECONDS.sleep(getProductPriceElapsed);
} catch (InterruptedException e) {
// ignored
}
System.out.printf("%s 成功获取到商品价格即将开始返回%n", Thread.currentThread().getName());
return 135.53;
};
// 归并商品属性耗时 165 毫秒
final long getProductPropertiesElapsed = 165L;
//程序开始计时
final long startMs = System.currentTimeMillis();
// 获取 商品信息 并归并商品属性
CompletableFuture<Integer> productAboutCompletableFuture = CompletableFuture.supplyAsync(getProductSupplier, threadPoolExecutor)
.thenApplyAsync(previousResult -> {
System.out.printf("%s 获取到商品信息: %d 工耗时 %d 毫秒%n", Thread.currentThread().getName(), previousResult, (System.currentTimeMillis() - startMs));
try {
// do work
TimeUnit.MILLISECONDS.sleep(getProductPropertiesElapsed);
} catch (InterruptedException e) {
// ignored
}
System.out.printf("%s 完成商品属性归并,即将返回结果%n", Thread.currentThread().getName());
return previousResult * 2;
}, threadPoolExecutor);
CompletableFuture<Double> getProductPriceCompletableFuture = CompletableFuture.supplyAsync(getProductPriceSupplier, threadPoolExecutor);
// 等待程序执行完成
CompletableFuture.allOf(productAboutCompletableFuture, getProductPriceCompletableFuture).get();
System.out.println("程序执行完成:");
System.out.printf("\t运行耗时: %d %n", (System.currentTimeMillis() - startMs));
System.out.printf("\t理论耗时: %d %n", (getProductElapsed + getProductPropertiesElapsed + getProductPriceElapsed));
// 模拟 Service 方法返回
System.out.printf("获取商品详情: %d%n", productAboutCompletableFuture.get()/2);
System.out.printf("获取商品属性: %d%n", productAboutCompletableFuture.get());
System.out.printf("获取商品价格: %.2f RMB%n", getProductPriceCompletableFuture.get());
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment