feat(connectors): add S3 sink connector#3103
Conversation
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #3103 +/- ##
=============================================
- Coverage 74.16% 13.93% -60.23%
Complexity 943 943
=============================================
Files 1237 1241 +4
Lines 112641 98557 -14084
Branches 89201 75148 -14053
=============================================
- Hits 83536 13736 -69800
- Misses 26309 84676 +58367
+ Partials 2796 145 -2651
🚀 New features to boost your workflow:
|
|
This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 7 days if no further activity occurs. If you need a review, please ensure CI is green and the PR is rebased on the latest master. Don't hesitate to ping the maintainers - either @core on Discord or by mentioning them directly here on the PR. Thank you for your contribution! |
slbotbm
left a comment
There was a problem hiding this comment.
I left some comments.
Also, do you plan to support using parquet files as well in the future?
|
I also feel data loss due to maximum retries being exceeded should be mentioned in readme.md as a precaution. |
96ed8d1 to
87e0cc0
Compare
oh yes absolutely.. parquet support is on the roadmap as a future |
I agree, I'll add that |
0a37619 to
00fc2df
Compare
|
This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 7 days if no further activity occurs. If you need a review, please ensure CI is green and the PR is rebased on the latest master. Don't hesitate to ping the maintainers - either @core on Discord or by mentioning them directly here on the PR. Thank you for your contribution! |
|
This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 7 days if no further activity occurs. If you need a review, please ensure CI is green and the PR is rebased on the latest master. Don't hesitate to ping the maintainers - either @core on Discord or by mentioning them directly here on the PR. Thank you for your contribution! |
|
/ready |
| - Buffered uploads with configurable file rotation (by size or message count) | ||
| - Multiple output formats: JSON Lines, JSON Array, Raw | ||
| - Configurable path templates with variables for stream, topic, date, hour, partition | ||
| - Deterministic S3 keys based on offset ranges for idempotent crash recovery |
There was a problem hiding this comment.
"deterministic S3 keys ... idempotent crash recovery" is false on three grounds:
path.rs:64substitutes{timestamp}viaUtc::now().timestamp_millis()at render time, so retries produce different keysbuffer.rsis in-memory only with no WAL, so a process restart cannot reproduce the same batch boundaries- runtime auto-commits offsets on poll (bug(connectors): PollingMessages auto-commit commits offsets before sink processing #2928) and the FFI return code is discarded (bug(connectors): consume() return value discarded in sink runtime #2927), so replay never happens anyway
either remove the claim, or remove {timestamp} from the template and document the in-memory loss path honestly.
There was a problem hiding this comment.
Removed claim, documented honestly instead
| - Deterministic S3 keys based on offset ranges for idempotent crash recovery | ||
| - Optional metadata and header inclusion in output | ||
| - Support for custom endpoints (MinIO, R2) and path-style addressing | ||
| - Retry with exponential backoff on upload failures |
There was a problem hiding this comment.
"retry with exponential backoff" - code at sink.rs:260 is retry_delay * attempts, which is linear (1s, 2s, 3s). either update the doc to "linear backoff" or implement retry_delay * 2u32.pow(attempts - 1) with jitter. also AFAIR there is backoff in connectors SDK, please check it.
There was a problem hiding this comment.
Now uses SDK exponential_backoff + jitter
|
|
||
| ## Data Delivery Guarantees | ||
|
|
||
| This connector provides **at-least-once** delivery under normal operation. However, **data loss can occur** if all upload retries are exhausted (controlled by `max_retries`). When an upload fails after all retry attempts, the affected messages are dropped and an error is logged. Monitor your connector logs for `failed to upload` errors in production. Increase `max_retries` and `retry_delay` if transient S3 failures are common in your environment. |
There was a problem hiding this comment.
this paragraph claims at-least-once and then admits "data loss can occur" in the same sentence - that is self-contradictory. given #2927 + #2928, no sink connector can deliver at-least-once today. the canonical in-tree wording is http_sink/README.md:790-800 which honestly documents at-most-once and cites both bugs. recommend copying that section verbatim.
| chrono = { workspace = true } | ||
| dashmap = { workspace = true } | ||
| humantime = { workspace = true } | ||
| iggy_common = { workspace = true } |
There was a problem hiding this comment.
iggy_common is declared under [dependencies] but the only usage in this crate is formatter.rs:233-235 under #[cfg(test)]. move it to [dev-dependencies] so it does not bloat the cdylib build.
There was a problem hiding this comment.
Moved to [dev-dependencies]
There was a problem hiding this comment.
Correction to my earlier reply: iggy_common remains in [dependencies] (not [dev-dependencies]) because the header-serialization fix (your comment on formatter.rs:75) introduced runtime usage of HeaderKey, HeaderValue, and HeaderKind via serialize_headers at formatter.rs:124-169. The original premise (test-only) was accurate for v1 but is no longer true after the HeaderKind match dispatch was added
| # under the License. | ||
|
|
||
| [package] | ||
| name = "iggy_connector_s3_sink" |
There was a problem hiding this comment.
missing publish = false. every other connector sink (postgres_sink, delta_sink, http_sink, elasticsearch_sink, stdout_sink) declares publish = false because they are cdylib plugins not meant for crates.io. without it the crate would publish on the next workspace release.
There was a problem hiding this comment.
Added, matches all other sinks
| } | ||
|
|
||
| let mut state = self.state.lock().await; | ||
| state.messages_processed += messages.len() as u64; |
There was a problem hiding this comment.
state.messages_processed += messages.len() as u64 runs unconditionally outside the rotate loop, so when a mid-batch flush dropped N messages this counter still claims it processed them. upload_errors increments separately at :209. result: the close-log at :155-156 reports more messages processed than actually landed in S3. either decrement on drop or split into messages_buffered / messages_uploaded / messages_lost.
There was a problem hiding this comment.
Split into messages_received / messages_uploaded / messages_lost
| // Reset buffer even on failure to prevent unbounded growth. | ||
| // Messages are lost but offsets will be re-delivered by the | ||
| // runtime on next poll since consume() returned Ok. | ||
| buffer.reset(); |
There was a problem hiding this comment.
this is the worst data-loss path in the PR. on retry-exhaust the failure branch logs an error and then calls buffer.reset() at :214 to drop the messages, while consume at :126 still returns Ok(()). the comment at :211-213 claims "offsets will be re-delivered by the runtime on next poll since consume() returned Ok" - that is doubly false:
- the runtime auto-commits offsets before consume runs (
runtime/sink.rs:421usesAutoCommitWhen::PollingMessages, see bug(connectors): PollingMessages auto-commit commits offsets before sink processing #2928), so the offset has already advanced past these messages by the time you return. - the runtime discards the consume FFI return code (
runtime/sink.rs:585-593, see bug(connectors): consume() return value discarded in sink runtime #2927), so even returningErrwouldn't trigger retry today.
net result: a single transient S3 hiccup that exhausts max_retries (default 3) permanently loses every buffered message. the README at line 139 acknowledges this but still claims at-least-once on the same line.
minimum: drop the false comment, propagate Err to the runtime, and align the README with http_sink/README.md:790-800 (at-most-once + cite #2927 / #2928).
There was a problem hiding this comment.
Removed false re-delivery comment, propagate error, aligned README with known runtime limitations
| return Ok(()); | ||
| } | ||
| attempts += 1; | ||
| if attempts >= max_retries { |
There was a problem hiding this comment.
attempts starts at 0 and increments before the >= max_retries check, so max_retries = 3 yields 3 total attempts (2 retries past the initial one). this matches the postgres_sink pattern but the field name is misleading. either rename to max_attempts or use attempts > max_retries so the field name lines up with semantics.
There was a problem hiding this comment.
Renamed to max_attempts for clarity
| let mut attempts = 0u32; | ||
|
|
||
| loop { | ||
| match bucket.put_object(s3_key, data).await { |
There was a problem hiding this comment.
the retry loop treats every non-2xx status uniformly. AWS permanent failures - AccessDenied (403), NoSuchBucket (404), InvalidBucketName (400), MalformedPolicy - get retried 3 times, wasting retry_delay * (1 + 2) = 3s before the final Err. classify retriable vs not: 5xx + 408 + 429 + 503 SlowDown -> retry; other 4xx -> fail fast.
There was a problem hiding this comment.
Added is_retriable_status: only 5xx/408/429 retry, other 4xx fail fast
| ); | ||
| } | ||
| } | ||
| tokio::time::sleep(retry_delay * attempts).await; |
There was a problem hiding this comment.
retry_delay * attempts is linear backoff (1s, 2s, 3s), but the README at line 13 advertises exponential. either implement retry_delay * 2u32.pow(attempts - 1) with jitter, or update the doc to "linear".
separately, Duration::Mul<u32> panics on overflow - default config is safe but pathological values (e.g. retry_delay = "1h" + large max_retries) would panic. saturating_mul is cheap defensive practice.
There was a problem hiding this comment.
Implemented true exponential with jitter, capped at 60s via SDK helpers
|
/author |
Write Iggy stream messages to Amazon S3 and S3-compatible stores with buffered uploads, configurable rotation, and deterministic offset-based keys.
980d64d to
c0a5722
Compare
c0a5722 to
099de6f
Compare
…ctness DashMap per-partition buffers (no lock held during upload), contiguous buffer Vec<u8>+sidecar, SecretString credentials, HeaderKind-aware serialization, owned_value_to_serde_json, byte-concat JsonArray finalize, 20-digit offset padding, partition in filename, deterministic timestamps, strict config validation, .lost marker on flush failure, error propagation.
f15cc53 to
6cfac12
Compare
|
The biggest thing I caught was a credential leak where the derived Debug on S3Sink would dump the full AWS access key and secret key into logs whenever the struct got printed. I replaced that with a manual Debug impl that just shows the bucket name and added a regression test so it never sneaks back in. On the metrics side, the received count was being incremented after processing the batch, which meant if something failed halfway through you could lose track of how many messages actually got dropped. I moved that counter up front and added logic to correctly mark the unprocessed remainder as lost. I also cleaned up the dependency situation since The /ready |
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #3103 +/- ##
=============================================
- Coverage 74.16% 18.27% -55.89%
Complexity 943 943
=============================================
Files 1237 1241 +4
Lines 112641 98557 -14084
Branches 89201 75149 -14052
=============================================
- Hits 83536 18010 -65526
- Misses 26309 80107 +53798
+ Partials 2796 440 -2356
🚀 New features to boost your workflow:
|
|
/author |
|
This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 7 days if no further activity occurs. If you need a review, please ensure CI is green and the PR is rebased on the latest master. Don't hesitate to ping the maintainers - either Thank you for your contribution! |
Which issue does this PR close?
Closes #2956
Rationale
Iggy lacks a native way to write stream messages to Amazon S3 and S3-compatible stores (MinIO, Cloudflare R2, Backblaze B2, DigitalOcean Spaces). This is a frequently requested capability for data lake ingestion and long-term archival pipelines.
What changed?
There was no connector for persisting Iggy messages to object storage. Users had to build custom consumers and upload logic to get data into S3.
This PR adds a new
iggy_connector_s3_sinkcrate that implements theSinktrait. It buffers messages in-memory per stream/topic/partition, rotates files by size or message count, renders S3 keys from a configurable path template ({stream}/{topic}/{date}/{hour}/...), and uploads with retry + exponential backoff. Supportsjson_lines,json_array, andrawoutput formats with optional Iggy metadata and header embedding. Usesrust-s3(already in workspace) with path-style addressing auto-enabled for custom endpoints.Key implementation details:
lib.rs(config + entry point),client.rs(S3 client init + bucket verification),buffer.rs(in-memory accumulation + rotation logic),formatter.rs(JSON/raw output + metadata/header inclusion),path.rs(template engine for S3 keys with offset-based filenames),sink.rs(Sink trait: open/consume/close lifecycle)_build_rust_artifacts.ymlandedge-release.ymlfor cdylib plugin builds and release notesLocal Execution
cargo fmt --check-- passcargo clippy --tests -D warnings-- pass (zero warnings)cargo test -p iggy_connector_s3_sink-- 36/36 passmarkdownlint --check-- passtrailing-whitespace-- passtrailing-newline-- passlicense-headers-- passAI Usage
-D warnings, and end-to-end testing with MinIO Docker + Iggy server + CLI producer + connector runtimeHere are all the relevant screenshots:
iggy-testbucketapplication_logsand topicapi_requests.jsonlfile in the correct path structure (application_logs/api_requests/{date}/{hour}/)cargo clippy --tests -D warningspassing with zero warnings