Skip to content

test(connectors): add source liveness helper#3377

Open
arunsingh wants to merge 5 commits into
apache:masterfrom
arunsingh:feat/2892-pareto-source-suite
Open

test(connectors): add source liveness helper#3377
arunsingh wants to merge 5 commits into
apache:masterfrom
arunsingh:feat/2892-pareto-source-suite

Conversation

@arunsingh
Copy link
Copy Markdown
Contributor

@arunsingh arunsingh commented May 29, 2026

Summary

Part of #2892.

Adds a small source connector liveness helper and wires the random source through it as smoke coverage only.

Changes:

  • Add connectors::source_suite for bounded, accumulating source message polling.
  • Register the helper under connector integration tests.
  • Replace random source polling with the shared helper.
  • Drop random restart coverage because random has no stable id/cursor to prove resume.

The broader behavior suite should start from sources with observable cursors/ids, such as postgres or influxdb, with #2940 scope kept separate for failed-send semantics.

Local validation

  • cargo fmt --all --check
  • git diff --check

Local blockers:

  • cargo test -p integration --no-run is blocked on Windows before reaching these tests by existing Unix-only message_bus imports and missing pkg-config/hwloc discovery.
  • WSL validation is blocked because cargo is not installed in WSL.

@arunsingh arunsingh changed the title test(connectors): add source connector suite helper test(connectors): add source connector pareto suite May 29, 2026
@codecov
Copy link
Copy Markdown

codecov Bot commented May 29, 2026

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 46.18%. Comparing base (e7b2d7e) to head (0d9c13b).
⚠️ Report is 12 commits behind head on master.

Additional details and impacted files
@@              Coverage Diff              @@
##             master    #3377       +/-   ##
=============================================
- Coverage     73.71%   46.18%   -27.54%     
  Complexity      943      943               
=============================================
  Files          1223     1229        +6     
  Lines        115755   105623    -10132     
  Branches      92491    82359    -10132     
=============================================
- Hits          85329    48777    -36552     
- Misses        27556    54206    +26650     
+ Partials       2870     2640      -230     
Components Coverage Δ
Rust Core 39.11% <ø> (-35.62%) ⬇️
Java SDK 58.44% <ø> (ø)
C# SDK 69.94% <ø> (ø)
Python SDK 81.06% <ø> (ø)
Node SDK 91.53% <ø> (ø)
Go SDK 40.20% <ø> (ø)
see 358 files with indirect coverage changes
🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@arunsingh arunsingh marked this pull request as ready for review May 29, 2026 20:18
@github-actions github-actions Bot added the S-waiting-on-review PR is waiting on a reviewer label May 29, 2026
Copy link
Copy Markdown
Contributor

@hubcio hubcio Jun 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove this file; source_suite.rs code should be self-documenting

Copy link
Copy Markdown
Contributor Author

@arunsingh arunsingh Jun 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have removed source_suite.md; the helper now stands on the Rust code only.

@github-actions github-actions Bot added S-waiting-on-author PR is waiting on author response and removed S-waiting-on-review PR is waiting on a reviewer labels Jun 1, 2026
@hubcio
Copy link
Copy Markdown
Contributor

hubcio commented Jun 1, 2026

overall change is OK, but what about the rest of connectors? you marked that #2892 will be closed, yet this issue it only touching the random source connector which is essentially a pure-test connector type.

@arunsingh
Copy link
Copy Markdown
Contributor Author

arunsingh commented Jun 1, 2026

overall change is OK, but what about the rest of connectors? you marked that #2892 will be closed, yet this issue it only touching the random source connector which is essentially a pure-test connector type.

@hubcio I have changed this to Part of #2892; this PR won't close the issue, and real source connectors can be wired in follow-ups. Let me know if you want this merged as the first step or want one real source connector wired before merge, likely, wire one Docker-backed source connector, probably Postgres or Elasticsearch, into source_suite.

Copy link
Copy Markdown
Contributor

@hubcio hubcio left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for PR, I finally had some time to properly review it.

two blockers inline (the restart test can't actually fail, and poll_until_min_messages drops messages when min_messages is above one batch), then a note on direction.

on direction: #2892 is really about behaviour - progress-before-send, retry-on-fail, restart-resume, mark/delete-after-commit, replay window. a poll-until-non-empty helper proves liveness, which already works everywhere; it doesn't give a seam for any of those, and two structural things stop it growing into the suite as-is.

first, the random source can't host 4 of the 5 checks - no seed, the produced ProducedMessage goes out with envelope id: None (only a random, non-monotonic Uuid lives inside the json payload as Record.id), no consumer-visible cursor (just an in-process messages_produced counter that's already unit-tested in the crate), and no external system to mark/delete. so it can only ever be a liveness smoke. the sources that already have an observable, gated cursor are postgres and influxdb - those are the honest first wires for a real behaviour test, and the postgres source/restart tests from #2579 are the pattern to copy.

second, heads up that behaviours 1 and 2 aren't guaranteed by the SDK today: the plugin advances its cursor inside poll() before returning (random does messages_produced +=, postgres marks/deletes rows and bumps tracking offsets), and the send path is one-way - SendCallback returns (), and the runtime's failed-send branch only logs/metrics/sets-error, nothing re-offers the batch. so on a failed send the un-sent batch is dropped (at-most-once on the runtime->iggy hop), and #2892 item 2 can't pass against any source without an SDK/FFI change. this is the same contract question already open in #2940 (partial writes / replay-safe progress / deterministic write identity) - worth linking the suite scope to that and rescoping, rather than writing a test that asserts behaviour the code doesn't have. the one invariant the runtime does enforce is on-disk state only advances on send success (the state save sits entirely in the success arm), which is testable now.

suggested shape if you want this to become the real suite: a small SourceUnderTest trait the runner is generic over (config_toml, seed(n) with a monotonic stamped id, expected()), plus one accumulating poll_until_n that replaces the inline loops and unifies the retry budget - today that's two independent per-module copies of POLL_ATTEMPTS/POLL_INTERVAL_MS (postgres + elasticsearch) plus hardcoded literals in influxdb, so there's no single shared constant to point at yet. that's basically the existing postgres fixture made generic. with the connector roadmap (#2753) and new sources still landing (mongodb #3285, meilisearch #3404), a real per-source baseline pays off. random then stays as an explicit liveness smoke and the restart check lives on postgres.

server(connectors_runtime(config_path = "tests/connectors/random/source.toml")),
seed = seeds::connector_stream
)]
async fn state_persists_across_connector_restart(harness: &mut TestHarness) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this test can't fail. it polls two different fresh consumers (source_suite_before_restart, then source_suite_after_restart) against a source that emits forever, and only checks !after.is_empty(). a fresh consumer gets new messages whether or not connector state survived the restart, so a state-loss regression still passes green - which is worse than no test under this name.

the old current_offset check was the partition head too (it's documented as the partition's offset, not the consumer's), so it never proved cursor survival either. the pattern to copy is the postgres source restart test from #2579: one consumer across the restart, seed a second batch with strictly-greater ids, assert every post-restart id is greater than the pre-restart max. random can't supply that discriminator, so either move this test to postgres or drop it and keep random as a liveness smoke (and rename it).

Copy link
Copy Markdown
Contributor Author

@arunsingh arunsingh Jun 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks @hubcio for detailed review, I appreciate your efforts, I have dropped the random restart test; random now stays as liveness smoke only.

.expect("Failed to poll messages");

let messages = source_suite::assert_source_produces_messages(harness).await;
assert!(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

redundant - assert_source_produces_messages already asserts non-empty before returning. drop this, or call poll_until_min_messages directly if you want the explicit check here.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed the duplicate assertion and call the helper directly.

.await
.expect("poll source messages");

if polled.messages.len() >= config.min_messages {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this returns a single poll iteration's messages, but with auto_commit=true above each poll commits its batch. when min_messages is larger than one server batch, the earlier batches are committed and then dropped here - never accumulated - so the loop can spin to the timeout while silently discarding everything it read. latent now since the default min_messages is 1, but it bites the moment a real source is wired with a higher count. accumulate into a Vec across iterations like the postgres and influxdb source loops do.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed polling to accumulate batches until min_messages is reached.

use integration::harness::{TestHarness, seeds};
use tokio::time::{sleep, timeout};

pub(crate) struct SourceSuiteConfig {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as-is this can only assert liveness - there's no notion of message identity, exact count, ordering, or the connector cursor, so it can't host the checks #2892 is about (restart-resume needs an id discriminator, replay/dup needs == N). if this is meant to grow into the suite, the shape wants to be a fixture/trait the runner is generic over (a source-under-test that seeds n stamped records and states its expected set), not a free-function poll. sketch is in the review summary.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I narrowed this PR to liveness only; the SourceUnderTest behavior suite belongs in the postgres/influxdb follow-up.

pub(crate) consumer_name: &'static str,
pub(crate) min_messages: usize,
pub(crate) poll_batch: u32,
pub(crate) warmup: Duration,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this hardcodes its own retry budget (warmup/retry_interval/timeout). worth noting there isn't a single shared constant to reuse yet - postgres and elasticsearch each carry their own POLL_ATTEMPTS=100/POLL_INTERVAL_MS=50 copy and influxdb just hardcodes 0..100/from_millis(100). a helper whose point is to absorb duplication should unify all of those into one place rather than add a fourth timing style.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed warmup; I’ll keep cross-source retry-budget unification for the behavior-suite follow-up.

harness: &TestHarness,
config: &SourceSuiteConfig,
) -> Vec<IggyMessage> {
sleep(config.warmup).await;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this fixed 1s sleep is dead time - it's before and outside the timeout below, so worst case is warmup + timeout. the harness already gates runtime readiness with a health-poll on startup, and the retry loop below covers not-ready, which is why the existing postgres/elasticsearch/influxdb source loops poll immediately with no warmup. drop it.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed the fixed warmup; polling starts immediately and retries inside the timeout.

true,
)
.await
.expect("poll source messages");
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.expect here panics the test on any transient poll error instead of retrying. the postgres and elasticsearch source loops use if let Ok(...) and keep looping until their timeout, which is the more robust pattern. (the influxdb source loops already use .expect too, so they share this fragility - worth fixing there rather than copying it here.)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Poll errors now retry within the timeout budget.


timeout(config.timeout, poll)
.await
.expect("source suite timed out waiting for messages")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

on timeout this doesn't say which consumer or config timed out - include consumer_name so a failure points at the right test.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Timeout failures now include the consumer name.

@arunsingh arunsingh changed the title test(connectors): add source connector pareto suite test(connectors): add source liveness helper Jun 2, 2026
@arunsingh
Copy link
Copy Markdown
Contributor Author

Updated this to liveness-only: random restart is dropped, polling now accumulates/retries, and postgres/influxdb behavior coverage can follow with #2940 scope in mind.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

S-waiting-on-author PR is waiting on author response

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants