From d46ecc32c30ebaac822a99f1edefbce132241855 Mon Sep 17 00:00:00 2001 From: dileep Kumar Somajohassula Date: Sat, 13 Jun 2026 14:26:29 -0500 Subject: [PATCH] Signed-off-by: Dileep Kumar Somajohassula dileepharithasa@gmail.com - reactive multi-output: correct per-binding producer lookup + count-mismatch validation --- .../cloud/stream/function/FunctionConfiguration.java | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/function/FunctionConfiguration.java b/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/function/FunctionConfiguration.java index 0868808c72..7c6e963c15 100644 --- a/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/function/FunctionConfiguration.java +++ b/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/function/FunctionConfiguration.java @@ -565,8 +565,15 @@ private void bindFunctionToDestinations(BindableProxyFactory bindableProxyFactor if (!(resultPublishers instanceof Iterable)) { resultPublishers = Collections.singletonList(resultPublishers); } - Iterator outputBindingIter = outputBindingNames.iterator(); long outputCount = StreamSupport.stream(((Iterable) resultPublishers).spliterator(), false).count(); + if (!CollectionUtils.isEmpty(outputBindingNames) && outputCount != outputBindingNames.size()) { + throw new IllegalStateException("Reactive function '" + functionDefinition + + "' produced " + outputCount + " output publisher(s) but " + outputBindingNames.size() + + " output binding(s) are configured " + outputBindingNames + + ". Each configured output binding must have a corresponding publisher; " + + "return a Tuple of Flux/Publisher when binding multiple outputs."); + } + Iterator outputBindingIter = outputBindingNames.iterator(); ((Iterable) resultPublishers).forEach(publisher -> { Flux flux = Flux.from((Publisher) publisher); @@ -584,7 +591,7 @@ private void bindFunctionToDestinations(BindableProxyFactory bindableProxyFactor } if (message instanceof Message m && m.getHeaders().get("spring.cloud.stream.sendto.destination") != null) { String destinationName = (String) m.getHeaders().get("spring.cloud.stream.sendto.destination"); - ProducerProperties producerProperties = this.serviceProperties.getBindings().get(outputBindingNames.iterator().next()).getProducer(); + ProducerProperties producerProperties = this.serviceProperties.getBindings().get(outputBinding).getProducer(); MessageChannel dynamicChannel = streamBridge.resolveDestination(destinationName, producerProperties, null); if (logger.isInfoEnabled()) { logger.info("Output message is sent to '" + destinationName + "' destination");