Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;

Expand Down Expand Up @@ -128,6 +130,14 @@ public final class StreamBridge implements StreamOperations, SmartInitializingSi

private static final ReentrantLock lock = new ReentrantLock();

private static final String EXECUTOR_CORE_POOL_PROP = "spring.cloud.stream.streamBridge.executor.corePoolSize";

private static final String EXECUTOR_MAX_POOL_PROP = "spring.cloud.stream.streamBridge.executor.maxPoolSize";

private static final String EXECUTOR_QUEUE_CAPACITY_PROP = "spring.cloud.stream.streamBridge.executor.queueCapacity";

private static final String EXECUTOR_KEEPALIVE_PROP = "spring.cloud.stream.streamBridge.executor.keepAliveSeconds";

private ObservationRegistry observationRegistry = ObservationRegistry.NOOP;
/**
*
Expand All @@ -138,7 +148,7 @@ public final class StreamBridge implements StreamOperations, SmartInitializingSi
@SuppressWarnings("serial")
StreamBridge(FunctionCatalog functionCatalog, BindingServiceProperties bindingServiceProperties,
ConfigurableApplicationContext applicationContext, @Nullable NewDestinationBindingCallback destinationBindingCallback, ObjectProvider<ObservationRegistry> observationRegistries) {
this.executorService = Executors.newCachedThreadPool();
this.executorService = createBoundedExecutor();
Assert.notNull(functionCatalog, "'functionCatalog' must not be null");
Assert.notNull(applicationContext, "'applicationContext' must not be null");
Assert.notNull(bindingServiceProperties, "'bindingServiceProperties' must not be null");
Expand Down Expand Up @@ -349,8 +359,14 @@ public void destroy() throws Exception {

this.executorService = null;
this.async = false;
channelCache.keySet().forEach(bindingService::unbindProducers);
channelCache.clear();
lock.lock();
try {
channelCache.keySet().forEach(bindingService::unbindProducers);
channelCache.clear();
}
finally {
lock.unlock();
}
}

public boolean isAsync() {
Expand All @@ -369,8 +385,14 @@ public void setAsync(boolean async) {
public void onApplicationEvent(ApplicationEvent event) {
// we need to do it by String to avoid cloud-bus and context dependencies
if (event.getClass().getName().equals("org.springframework.cloud.bus.event.RefreshRemoteApplicationEvent")) {
closeChannelsGracefully();
this.channelCache.clear();
lock.lock();
try {
closeChannelsGracefully();
this.channelCache.clear();
}
finally {
lock.unlock();
}
}
}

Expand All @@ -393,6 +415,44 @@ private void closeChannelsGracefully() {
});
}

private static ExecutorService createBoundedExecutor() {
int cores = Runtime.getRuntime().availableProcessors();
int corePoolSize = intProperty(EXECUTOR_CORE_POOL_PROP, cores);
int maxPoolSize = intProperty(EXECUTOR_MAX_POOL_PROP, cores * 4);
int queueCapacity = intProperty(EXECUTOR_QUEUE_CAPACITY_PROP, 1000);
long keepAliveSeconds = intProperty(EXECUTOR_KEEPALIVE_PROP, 60);

AtomicLong threadCount = new AtomicLong();
ThreadPoolExecutor executor = new ThreadPoolExecutor(
corePoolSize,
maxPoolSize,
keepAliveSeconds,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(queueCapacity),
runnable -> {
Thread thread = new Thread(runnable,
"stream-bridge-" + threadCount.incrementAndGet());
thread.setDaemon(true);
return thread;
},
new ThreadPoolExecutor.CallerRunsPolicy());
executor.allowCoreThreadTimeOut(true);
return executor;
}

private static int intProperty(String name, int defaultValue) {
String value = System.getProperty(name);
if (value == null || value.isBlank()) {
return defaultValue;
}
try {
return Integer.parseInt(value.trim());
}
catch (NumberFormatException ex) {
return defaultValue;
}
}

private static final class ContextPropagationHelper {
static ExecutorService wrap(ExecutorService executorService) {
return ContextExecutorService.wrap(executorService, () -> ContextSnapshotFactory.builder().build().captureAll());
Expand Down