diff --git a/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/function/StreamBridge.java b/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/function/StreamBridge.java index 2f3aaf8f06..02f7c141a1 100644 --- a/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/function/StreamBridge.java +++ b/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/function/StreamBridge.java @@ -21,8 +21,10 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantLock; import java.util.function.Function; @@ -128,6 +130,14 @@ public final class StreamBridge implements StreamOperations, SmartInitializingSi private static final ReentrantLock lock = new ReentrantLock(); + private static final String EXECUTOR_CORE_POOL_PROP = "spring.cloud.stream.streamBridge.executor.corePoolSize"; + + private static final String EXECUTOR_MAX_POOL_PROP = "spring.cloud.stream.streamBridge.executor.maxPoolSize"; + + private static final String EXECUTOR_QUEUE_CAPACITY_PROP = "spring.cloud.stream.streamBridge.executor.queueCapacity"; + + private static final String EXECUTOR_KEEPALIVE_PROP = "spring.cloud.stream.streamBridge.executor.keepAliveSeconds"; + private ObservationRegistry observationRegistry = ObservationRegistry.NOOP; /** * @@ -138,7 +148,7 @@ public final class StreamBridge implements StreamOperations, SmartInitializingSi @SuppressWarnings("serial") StreamBridge(FunctionCatalog functionCatalog, BindingServiceProperties bindingServiceProperties, ConfigurableApplicationContext applicationContext, @Nullable NewDestinationBindingCallback destinationBindingCallback, ObjectProvider observationRegistries) { - this.executorService = Executors.newCachedThreadPool(); + this.executorService = createBoundedExecutor(); Assert.notNull(functionCatalog, "'functionCatalog' must not be null"); Assert.notNull(applicationContext, "'applicationContext' must not be null"); Assert.notNull(bindingServiceProperties, "'bindingServiceProperties' must not be null"); @@ -349,8 +359,14 @@ public void destroy() throws Exception { this.executorService = null; this.async = false; - channelCache.keySet().forEach(bindingService::unbindProducers); - channelCache.clear(); + lock.lock(); + try { + channelCache.keySet().forEach(bindingService::unbindProducers); + channelCache.clear(); + } + finally { + lock.unlock(); + } } public boolean isAsync() { @@ -369,8 +385,14 @@ public void setAsync(boolean async) { public void onApplicationEvent(ApplicationEvent event) { // we need to do it by String to avoid cloud-bus and context dependencies if (event.getClass().getName().equals("org.springframework.cloud.bus.event.RefreshRemoteApplicationEvent")) { - closeChannelsGracefully(); - this.channelCache.clear(); + lock.lock(); + try { + closeChannelsGracefully(); + this.channelCache.clear(); + } + finally { + lock.unlock(); + } } } @@ -393,6 +415,44 @@ private void closeChannelsGracefully() { }); } + private static ExecutorService createBoundedExecutor() { + int cores = Runtime.getRuntime().availableProcessors(); + int corePoolSize = intProperty(EXECUTOR_CORE_POOL_PROP, cores); + int maxPoolSize = intProperty(EXECUTOR_MAX_POOL_PROP, cores * 4); + int queueCapacity = intProperty(EXECUTOR_QUEUE_CAPACITY_PROP, 1000); + long keepAliveSeconds = intProperty(EXECUTOR_KEEPALIVE_PROP, 60); + + AtomicLong threadCount = new AtomicLong(); + ThreadPoolExecutor executor = new ThreadPoolExecutor( + corePoolSize, + maxPoolSize, + keepAliveSeconds, + TimeUnit.SECONDS, + new LinkedBlockingQueue<>(queueCapacity), + runnable -> { + Thread thread = new Thread(runnable, + "stream-bridge-" + threadCount.incrementAndGet()); + thread.setDaemon(true); + return thread; + }, + new ThreadPoolExecutor.CallerRunsPolicy()); + executor.allowCoreThreadTimeOut(true); + return executor; + } + + private static int intProperty(String name, int defaultValue) { + String value = System.getProperty(name); + if (value == null || value.isBlank()) { + return defaultValue; + } + try { + return Integer.parseInt(value.trim()); + } + catch (NumberFormatException ex) { + return defaultValue; + } + } + private static final class ContextPropagationHelper { static ExecutorService wrap(ExecutorService executorService) { return ContextExecutorService.wrap(executorService, () -> ContextSnapshotFactory.builder().build().captureAll());