feat: Expose prediction SSE streams#3019
Conversation
There was a problem hiding this comment.
I'm Bonk, and I've done a quick review of your PR.
This PR adds Server-Sent Events (SSE) streaming for predictions with event replay for late subscribers. The implementation looks solid overall with good test coverage. I found one potential resource concern that should be addressed:
Issues
1. Unbounded memory growth in stream_history (Medium)
The stream_history Vec in Prediction grows unbounded during prediction execution. For long-running predictions with frequent output chunks, logs, or metrics, this could cause memory exhaustion. Consider adding a limit to the history size or making it configurable.
|
I've completed my review of PR #3019. The PR adds SSE streaming for predictions with event replay functionality. The implementation is well-structured and includes comprehensive tests. Summary of findings: I identified one medium-severity issue regarding unbounded memory growth in the The rest of the implementation looks solid:
|
|
Based on my review of PR #3019, I agree with the previous reviewer's finding about unbounded memory growth in The implementation is well-structured:
LGTM |
cfa865e to
c61e323
Compare
|
Based on my thorough review of PR #3019, I have analyzed all the changed files: Summary of changes: Previous findings: My independent assessment:
No additional actionable issues beyond what was already flagged. LGTM |
|
After reviewing PR #3019, I've analyzed the changes to add Server-Sent Events (SSE) streaming for predictions. The implementation reuses Key aspects reviewed:
Previously flagged issue: No additional actionable issues found beyond what was already flagged. LGTM |
|
Based on my thorough review of PR #3019, I can see this PR adds Server-Sent Events (SSE) streaming for predictions by reusing Key changes reviewed:
Previously flagged issue (addressed): Additional verification:
LGTM |
|
Based on my review of PR #3019, I can confirm that the previously flagged issue regarding unbounded memory growth in Verified fixes:
Additional verification:
The implementation is well-structured with comprehensive test coverage including unit tests for lagged streams, truncated replay, and integration tests. LGTM |
michaeldwan
left a comment
There was a problem hiding this comment.
Solid design. Main point discussed offline: streaming needs to be opt-in at the handler level via a decorator/attribute. Five concrete changes in the comments.
|
|
||
| fn emit_stream_event(&mut self, event: PredictionStreamEvent) { | ||
| if self.stream_history.len() == MAX_STREAM_HISTORY_EVENTS { | ||
| self.stream_history.remove(0); |
There was a problem hiding this comment.
Vec::remove(0) is O(n) on every event after the buffer fills -- thousands of shifts per prediction for token-by-token LLM output. VecDeque::pop_front() is O(1).
| prediction_id.clone(), | ||
| input.clone(), | ||
| webhook_sender, | ||
| response_mode != PredictionResponseMode::AsyncJson, |
There was a problem hiding this comment.
response_mode != PredictionResponseMode::AsyncJson is true for SyncJson too. Should be response_mode == PredictionResponseMode::AsyncSse -- the current expression is inert for sync mode but reads as a bug.
|
|
||
| impl Prediction { | ||
| pub fn new(id: String, webhook: Option<WebhookSender>) -> Self { | ||
| let (stream_tx, _) = tokio::sync::broadcast::channel(1024); |
There was a problem hiding this comment.
Channel capacity and history cap are both 1024 but defined independently. Use a shared constant so they can't drift.
| return; | ||
| } | ||
|
|
||
| if self.service.stream_receiver_count(&self.id) == 0 |
There was a problem hiding this comment.
The async cleanup task calls remove_prediction while the SSE client may still be draining. After removal, stream_receiver_count returns unwrap_or(0) and prediction_is_terminal returns unwrap_or(true) -- which happen to be the safe defaults. That invariant needs a comment, or hold an Arc to the entry in the guard so it doesn't depend on post-removal lookup.
| cog serve --upload-url http://unused/ | ||
|
|
||
| curl -H Accept:text/event-stream PUT /predictions/sse-stream-test '{"id":"sse-stream-test","input":{}}' | ||
| stdout 'event: output' |
There was a problem hiding this comment.
Missing stdout 'event: start' -- should verify the full lifecycle.
There was a problem hiding this comment.
I'm Bonk, and I've done a quick review of your PR.
This PR adds Server-Sent Events (SSE) streaming for predictions with event replay for late subscribers. The implementation is well-structured overall.
Summary
I've verified the issues flagged by previous reviewers:
- Vec::remove(0) O(n) issue - Fixed: Now uses
VecDeque::pop_front()which is O(1) - Unbounded memory growth - Fixed:
STREAM_EVENT_BUFFER_CAPACITY(1024) limits both broadcast channel and history - Integration test coverage - The test at
integration-tests/tests/sse_streaming_output.txtarincludesstdout 'event: start'as suggested
The implementation correctly:
- Distinguishes sync vs async cancellation semantics via
PredictionStreamGuard - Handles late subscribers with bounded replay (1024 events)
- Includes 15s keep-alive for SSE streams
- Has comprehensive test coverage including lagged stream and truncated replay scenarios
LGTM - all previously flagged issues have been addressed.
|
Posted review for PR #3019. All previously flagged issues have been addressed:
|
|
Based on my review of PR #3019, I've verified that all previously flagged issues have been addressed: Verified fixes:
Implementation highlights:
LGTM |
|
LGTM |
|
Based on my thorough review of PR #3019, I've verified that all previously flagged issues have been addressed: Verified fixes:
Implementation highlights:
LGTM |
|
@michaeldwan decorator added in 8c9c982 |
michaeldwan
left a comment
There was a problem hiding this comment.
Summary
Solid design. The broadcast channel + bounded replay architecture is sound, the opt-in gating via @cog.streaming is the right call, and the integration between Go parser → OpenAPI extension → coglet runtime is clean. The replay/live transition has no race (mutex held atomically), the bounded history buffer has no off-by-one, and completed events are emitted on all terminal paths.
One blocker, several should-fixes. Details below and inline.
Blockers
-
examples/streaming-text/predict.pydoesn't use@cog.streaming-- the README tells users to curl withAccept: text/event-stream, which will 406. The integration testsse_streaming_output.txtarcorrectly uses@streaming, but the shipped example doesn't. (See inline comment.) -
No test for SSE events during failed or cancelled predictions.
set_failed()andset_canceled()both emitCompletedevents, but no test verifies the SSE stream deliversevent: completedwith"status":"failed"/"status":"canceled". These are critical user-facing paths.
Should-fix
decoratorIsCogStreaminghard-codes"cog.streaming"instead of resolving throughImportContext.import cog as c→@c.streamingwon't be detected. The rest of the parser handles aliases. (See inline comment.)@cog.streaming()with parens silently degrades. Parser rejects call form, but Python decorator works either way. User gets a working model where SSE returns 406 with no hint about the parens.- No limit on SSE subscriber count per prediction.
subscribe_prediction_stream()creates a new broadcast receiver with no cap. Repeated SSE connections to the same prediction ID amplify memory pressure. - Orphaned
pending_cancellationsleak memory. Cancel messages arriving after a prediction completes get stored in theHashSetand never cleaned up. (See inline comment.) - Double-clone on every stream event.
emit_stream_eventclonesserde_json::Valuefor history storage.Arc<PredictionStreamEvent>would eliminate deep clones -- history and broadcast share the same allocation. Also fixes the O(n) deep-clone insubscribe_stream_replay()under the mutex. (See inline comment.) PredictionStreamGuard::Dropcallstokio::spawnviacancel(). If dropped outside a tokio runtime context,tokio::spawnpanics. UseHandle::try_current().- No test for concurrent SSE subscribers. The guard checks
stream_receiver_count() == 0before cancelling, but no test verifies dropping one of two subscribers doesn't cancel. - Training endpoints silently ignore
Accept: text/event-stream. Returning 406 or documenting would be more honest than silent fallback to JSON.
Nits
RegisterPredictionMessage4-element tuple → named struct.streaminglisted under# Metricsin__all__-- it's a decorator.- Module-level
FTypeVar →_Fto signal internal. replay.into()creates unnecessaryVecDequefromVec.id.to_string()allocated twice insubscribe_prediction_stream.- Missing
require.NotNilguard before type assertions in streaming OpenAPI tests. - Broadcast channel capacity and history buffer both use the same 1024 constant by coincidence -- give them separate named constants.
Verified correct
- Replay + live transition (no race -- mutex held atomically during subscribe + snapshot)
- Bounded history buffer (no off-by-one)
completedevent emitted on all terminal paths- Terminal state guards prevent double-completion
findTargetFunctionreturningdecorated_definitionhandled correctly viaUnwrapFunctioncog predictCLI works fine with streaming models (uses sync JSON path)docs/python.mdanddocs/llms.txtare accurate and in sync
| ), | ||
| ) -> Iterator[str]: | ||
| messages = [{"role": "user", "content": prompt}] | ||
| text = self.tokenizer.apply_chat_template( |
There was a problem hiding this comment.
Blocker: This method is missing @cog.streaming. The README (line 28-31) tells users to curl with Accept: text/event-stream, which will return 406 since the model doesn't opt in.
The integration test sse_streaming_output.txtar correctly uses @streaming, but this shipped example doesn't.
from cog import BasePredictor, Input, streaming
class Predictor(BasePredictor):
# ...
@streaming
def predict(self, ...) -> Iterator[str]:| self.stream_history.pop_front(); | ||
| self.stream_history_skipped += 1; | ||
| } | ||
| self.stream_history.push_back(event.clone()); |
There was a problem hiding this comment.
Should-fix: This clones the event (containing serde_json::Value) for history, then moves the original into broadcast::send. For high-throughput models yielding many chunks, this deep-clones arbitrarily large JSON on every output.
Consider Arc<PredictionStreamEvent> for the broadcast channel type -- history and broadcast share the same allocation, and subscribe_stream_replay() becomes 1024 atomic increments instead of 1024 deep JSON clones under the prediction mutex.
stream_tx: broadcast::Sender<Arc<PredictionStreamEvent>>,
stream_history: VecDeque<Arc<PredictionStreamEvent>>,
fn emit_stream_event(&mut self, event: PredictionStreamEvent) {
// ...
let event = Arc::new(event);
self.stream_history.push_back(Arc::clone(&event));
let _ = self.stream_tx.send(event);
}| None => { | ||
| tracing::debug!(%prediction_id, "Cancel requested for unknown prediction (may have already completed)"); | ||
| tracing::debug!(%prediction_id, "Cancel requested for unknown prediction; storing pending cancellation"); | ||
| pending_cancellations.insert(prediction_id); |
There was a problem hiding this comment.
Should-fix: If the cancel arrives after the prediction has already completed and been removed from predictions, the ID is stored here and never consumed. In a long-running server with many cancelled predictions, this is an unbounded leak.
Consider adding a size cap (e.g., 1000 entries) or a TTL, and log a warning when it's exceeded.
| func decoratorIsCogStreaming(node *sitter.Node, source []byte, imports *schema.ImportContext) bool { | ||
| for _, child := range NamedChildren(node) { | ||
| switch child.Type() { | ||
| case "attribute": |
There was a problem hiding this comment.
Should-fix: Hard-coded string match. import cog as c then @c.streaming won't be detected -- the content will be "c.streaming", not "cog.streaming".
The rest of the parser resolves aliases through ImportContext (e.g., IsBaseModel, IsOpaque). This should do the same:
case "attribute":
text := Content(child, source)
parts := strings.SplitN(text, ".", 2)
if len(parts) != 2 || parts[1] != "streaming" {
return false
}
entry, ok := imports.Names.Get(parts[0])
return ok && entry.Module == "cog" && entry.Original == "cog"| entry, ok := imports.Names.Get("streaming") | ||
| return ok && entry.Module == "cog" && entry.Original == "streaming" | ||
| case "call": | ||
| return false |
There was a problem hiding this comment.
Should-fix: This rejects @cog.streaming() (call form), but the Python decorator works fine with either @streaming or @streaming(). A user who writes @cog.streaming() gets a model that builds, runs, and yields output -- but SSE returns 406 with no hint about the parentheses.
Either support the call form here (check if the callee is cog.streaming or imported streaming), or make the Python decorator raise a clear error when called with parens. The current behavior is a silent gotcha.
| self: &Arc<Self>, | ||
| id: &str, | ||
| ) -> Option<PredictionStreamSubscription> { | ||
| let entry = self.predictions.get(id)?; |
There was a problem hiding this comment.
Should-fix: No cap on subscriber count. The idempotent PUT endpoint allows repeated SSE connections to the same prediction ID, each creating a new broadcast receiver. An attacker opening many connections forces the sender to retain events for slow consumers.
Consider:
let pred = entry.prediction.lock().ok()?;
if pred.stream_receiver_count() >= MAX_STREAM_SUBSCRIBERS {
return None;
}| // Prediction cleanup may remove the service entry before the SSE response | ||
| // finishes draining. Missing entries deliberately report zero receivers and | ||
| // terminal state so this guard cannot cancel an already-cleaned prediction. | ||
| if self.service.stream_receiver_count(&self.id) == 0 |
There was a problem hiding this comment.
Should-fix: cancel() internally calls tokio::spawn. If this guard is dropped outside a tokio runtime context (panic unwinding, runtime shutdown), tokio::spawn panics.
Safer:
if let Ok(handle) = tokio::runtime::Handle::try_current() {
let service = Arc::clone(&self.service);
let id = self.id.clone();
handle.spawn(async move { service.cancel(&id); });
}Or refactor cancel() itself to use Handle::try_current() before spawning.
| "CancelationException", | ||
| # Metrics | ||
| "current_scope", | ||
| "streaming", |
There was a problem hiding this comment.
Nit: streaming is a decorator, not a metric. Move it to its own # Decorators section or under # Core classes.
| URLPath, | ||
| ) | ||
|
|
||
| F = TypeVar("F", bound=Callable[..., object]) |
There was a problem hiding this comment.
Nit: This is importable as from cog import F. Prefix with underscore (_F) to signal it's internal.
| } | ||
|
|
||
| type RegisterPredictionMessage = ( | ||
| SlotId, |
There was a problem hiding this comment.
Nit: A 4-element tuple type alias is opaque -- the ack channel's purpose is invisible at usage sites. A named struct would be clearer:
struct RegisterPredictionMessage {
slot_id: SlotId,
prediction: Arc<StdMutex<Prediction>>,
idle_sender: oneshot::Sender<SlotIdleToken>,
registered_ack: oneshot::Sender<()>,
}
Summary
POST /predictionsandPUT /predictions/{id}for Server-Sent Events by returning an SSE stream when requests sendAccept: text/event-stream.Prefer: respond-asyncwithout SSE still returns202JSON.CLI behavior
cog predict --streamoption or otherwise change the predict CLI.