UN-3435 [FIX] Bind request_id and trace context to worker logs#1932
UN-3435 [FIX] Bind request_id and trace context to worker logs#1932muhammad-ali-e wants to merge 7 commits intomainfrom
Conversation
The new workers service was emitting `request_id:- trace_id:- span_id:-`
in every log line, blocking ops debugging. The plumbing existed
(RequestIDFilter, OTelFieldFilter, LogContext.request_id) but was
never wired up to actual values.
Changes in workers/shared/infrastructure/logging/logger.py:
* RequestIDFilter now falls back to LogContext.request_id from the
thread-local context when record.request_id is unset, so log_context()
and signal-based binding actually populate the structured field.
* Added _extract_request_id() with priority order
request_id > file_execution_id > execution_id > run_id, preserving
the legacy structure-tool convention of using file_execution_id as
the per-tool correlation ID. Migration to true HTTP X-Request-ID
later is purely additive on the backend side.
* Added Celery task_prerun/task_postrun signals that bind request_id
onto LogContext for every task and clear it on completion. Falls
back to Celery task_id when the payload has none, so non-execution
tasks (scheduler, log-consumer) still get a usable correlation ID.
* Signal install is idempotent and silently no-ops if Celery is not
importable (unit tests).
Trace ID / span ID for the executor pod is fixed by a paired helm
change in unstract-cloud (workerExecutorV2 OTel auto-instrumentation
wrapper in otel.values.yaml).
Verified with sanity tests covering filter fallback, explicit-extra
precedence, payload extraction, prerun/postrun lifecycle, and full
Celery signal round-trip.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
| Filename | Overview |
|---|---|
| workers/shared/infrastructure/logging/logger.py | Adds Celery signal handlers for request_id binding, extends RequestIDFilter fallback chain, and introduces payload extraction helpers — all correctly implemented with prior review feedback incorporated. |
Sequence Diagram
sequenceDiagram
participant Celery
participant BindCtx as _bind_task_context
participant Extract as _extract_request_id
participant WL as WorkerLogger
participant LC as LogContext
participant RIF as RequestIDFilter
Celery->>BindCtx: task_prerun(task_id, task, args, kwargs)
BindCtx->>Extract: args, kwargs, task
Extract-->>BindCtx: request_id or None
BindCtx->>WL: update_context(request_id, task_id)
WL->>LC: setattr request_id and task_id
Note over LC: Thread-local context populated
Celery->>RIF: filter(record) during task log emit
alt record.request_id set via extra=
RIF-->>RIF: use record.request_id
else fallback
RIF->>LC: getattr(ctx, request_id, None)
RIF-->>RIF: use ctx.request_id or "-"
end
Celery->>BindCtx: task_postrun signal fires
BindCtx->>WL: update_context(request_id=None, task_id=None)
Note over LC: worker_name preserved, task fields cleared
Reviews (5): Last reviewed commit: "UN-3435 [FIX] Address coderabbit nitpick..." | Re-trigger Greptile
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
WalkthroughRequest ID logging and Celery task-scoped context handling were changed: RequestIDFilter now applies explicit precedence for request IDs; WorkerLogger.configure installs idempotent Celery hooks that extract request IDs from diverse task payload shapes and clear only task-scoped fields after run. Changes
Sequence Diagram(s)sequenceDiagram
actor Task as Celery Task
participant Prerun as task_prerun Hook
participant LogContext as LogContext
participant Filter as RequestIDFilter
participant Postrun as task_postrun Hook
Task->>Prerun: start(task payload)
Prerun->>Prerun: scan payload for keys<br/>(request_id, file_execution_id, execution_id, run_id)
Prerun->>Prerun: coerce/validate type (no bool)
Prerun->>LogContext: set request_id & task_id (task-scoped)
Task->>Filter: emit log record
Filter->>LogContext: read request_id
Filter->>Filter: choose request_id (record → context → "-")
Task->>Postrun: finish
Postrun->>LogContext: clear task-scoped fields (request_id, task_id)
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes 🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
🧹 Nitpick comments (1)
workers/shared/infrastructure/logging/logger.py (1)
622-625: Consider preserving baseline context when clearing post-task state.Line 624 clears the entire thread-local
LogContext, which also drops static fields (for example,worker_nameset at Line 134). If you need those to persist outside task execution, prefer clearing only task-scoped fields or snapshot/restore baseline context.Possible minimal adjustment
def _clear_task_context(**_): """Celery ``task_postrun`` handler: clear context to avoid bleed-through.""" - WorkerLogger.clear_context() + ctx = WorkerLogger.get_context() + worker_name = ctx.worker_name if ctx else None + WorkerLogger.clear_context() + if worker_name: + WorkerLogger.set_context(LogContext(worker_name=worker_name))🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@workers/shared/infrastructure/logging/logger.py` around lines 622 - 625, The _clear_task_context handler currently calls WorkerLogger.clear_context(), which removes the entire thread-local LogContext (dropping static fields such as worker_name set earlier); change it to preserve baseline context by either (a) snapshotting the baseline context when the worker initializes (e.g., after worker_name is set) and restoring that snapshot in _clear_task_context, or (b) modify WorkerLogger to offer a clear_task_fields() method and call that from _clear_task_context to remove only task-scoped keys; reference _clear_task_context, WorkerLogger.clear_context, WorkerLogger (or LogContext) snapshot/restore or clear_task_fields to implement the minimal fix.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@workers/shared/infrastructure/logging/logger.py`:
- Around line 622-625: The _clear_task_context handler currently calls
WorkerLogger.clear_context(), which removes the entire thread-local LogContext
(dropping static fields such as worker_name set earlier); change it to preserve
baseline context by either (a) snapshotting the baseline context when the worker
initializes (e.g., after worker_name is set) and restoring that snapshot in
_clear_task_context, or (b) modify WorkerLogger to offer a clear_task_fields()
method and call that from _clear_task_context to remove only task-scoped keys;
reference _clear_task_context, WorkerLogger.clear_context, WorkerLogger (or
LogContext) snapshot/restore or clear_task_fields to implement the minimal fix.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 3e167d11-5d18-4b04-ba4f-f10796d5d6ea
📒 Files selected for processing (1)
workers/shared/infrastructure/logging/logger.py
Three review fixes from greptile bot in #1932: * `_clear_task_context` now resets only task-scoped fields (`request_id`, `task_id`) instead of deleting the entire `LogContext`. This preserves baseline fields like `worker_name` set at `WorkerLogger.configure()` -- previously the first task_postrun would wipe `worker_name` permanently and subsequent tasks ran without it. * Signal install is now thread-safe via double-checked locking with `threading.Lock`. The previous bare-flag check could allow two threads racing through `_install_celery_request_id_signals` to both pass the guard and register handlers twice -- Celery's `Signal.connect()` does not deduplicate, so handlers would fire twice per task. * `_extract_request_id` now also scans dict values inside `kwargs` (e.g. `task(context={"execution_id": "abc"})`). The previous version only looked at top-level kwargs and dict args; dict-valued kwargs were silently missed and fell back to `task_id`. Search order now: top-level kwargs > nested dict in kwargs > dict args. Verified: * 20 concurrent installs register the signal exactly once. * `worker_name` survives `_clear_task_context`; `request_id` and `task_id` are nulled. * `kwargs={'context': {'execution_id': 'X'}}` resolves to 'X'. * Original 9 sanity tests still pass. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
muhammad-ali-e
left a comment
There was a problem hiding this comment.
Multi-agent PR review summary (PR #1932)
Review ran six agents (Code Reviewer, Silent-Failure Hunter, Comment Analyzer, Test Analyzer, Type Design, Simplifier) over the single changed file workers/shared/infrastructure/logging/logger.py. Findings prioritised below; inline comments follow.
P0 — Behavioural gaps
- Positional string args go unrecognised.
_extract_request_idonly scansdictargs, but the main worker entry —async_execute_bindispatched frombackend/workflow_manager/workflow_v2/workflow_helper.py— usesargs=[org_schema, workflow_id, execution_id, file_hash_in_str](all strings) plus a kwargs dict that doesn't containexecution_id. For this task family the new logic falls back to the Celerytask_idUUID, defeating the stated per-execution granularity. (logger.py:625-629) ExecutionContextdocstring example is wrong. The example at logger.py:615 advertisesExecutionContextas a dict-args case, butExecutionContext(workers/shared/workflow/destination_connector.py:77) is a@dataclassand is constructed inside the task body — it never reachestask_prerun. Misleading.- Nested-dict scan order is insertion-order, not priority-order. When two nested kwarg dicts each contain a different
_REQUEST_ID_KEYSmember, the result depends on which kwarg was added first, not on the priority list. (logger.py:620-624)
P1 — Robustness / correctness
task_postrunis not guaranteed to fire on hard failure / SIGTERM in gevent/eventlet pools — the next thread reuse can carry stalerequest_idfor any logging emitted between tasks. Hooktask_failure/task_revoked, or always reset before binding. (logger.py:644-651)- Silent
ImportErrorswallow. Fine for unit tests, but in a worker a real import break disables the whole feature with zero diagnostic. At minimum log at debug level. (logger.py:671-674) task_idover-clear breaks nested-task contexts:_TASK_SCOPED_FIELDSis unconditionally nulled at every postrun, butwith_execution_context(logger.py:486-492) also writestask_id. Nested/chainedtask_postrunclobbers the outer task's id. Save/restore prior values instead.RequestIDFilterdocstring inaccuracy — claimsLogContext.request_idis set by_bind_task_context, butlog_context()andupdate_context()are also writers. Drop the parenthetical. (logger.py:38-43)
P2 — Tests / typing / style
- No tests committed despite the PR description claiming "Verified locally with 9 sanity tests". Add the tests or trim the description.
- Bare
dict/tupleannotations on_scan_for_id,_extract_request_id. UseMapping[str, Any]/Sequence[Any]. - Double-checked locking is overengineered for once-per-process install — a single
with _signals_lock:plus inner check is enough. _clear_task_context'sdict.fromkeystrick is less readable thanupdate_context(request_id=None, task_id=None)for two fields.str(value)may stringify opaque objects (UUIDs, dataclass__str__defaults). Guard with anisinstance(value, (str, int, UUID))check before stringifying.
Generated by /pr-review-toolkit:review-pr orchestrating six review agents.
Addresses 13 review comments on #1932. Highlights: * P0 -- Positional-string args now resolve. ``async_execute_bin`` is dispatched as ``send_task("async_execute_bin", args=[schema, workflow_id, execution_id, file_hash])`` -- all positional strings. The previous implementation only inspected dict args and dict kwargs values, so it returned None and fell back to the Celery task UUID, defeating per-execution log granularity for the main workflow path. ``_gather_containers`` now uses ``inspect.signature(task.run).bind_partial(*args, **kwargs)`` to map positional args to parameter names before scanning. * Priority order is now by KEY, not by container. Outer loop is over ``_REQUEST_ID_KEYS``, inner over containers, so a payload with file_execution_id in one nested dict and execution_id in another deterministically picks the higher-priority key regardless of insertion order. * ``_coerce_id`` rejects non-id types. Previous ``str(value)`` could stringify dataclass instances or arbitrary objects into ``"<X object at 0x...>"`` log lines and OTel attributes. Only ``str``, ``int``, and ``UUID`` are accepted now. * Dataclass positional args are supported via ``dataclasses.is_dataclass`` + ``dataclasses.asdict``. * ``_bind_task_context`` wraps extraction in try/except so a misbehaving payload falls back to ``task_id`` instead of leaving the previous task's id bound on the thread. * Signal install simplified to ``@functools.lru_cache(maxsize=1)`` -- idempotent and thread-safe by construction; replaces the explicit flag + Lock pattern. * ``ImportError`` for ``celery.signals`` now logs at debug level instead of disappearing silently, so a broken deployment is diagnosable rather than mysteriously missing request_ids. * Type hints use ``Mapping[str, Any]`` / ``Sequence[Any]`` instead of bare ``dict`` / ``tuple``. * Docstring on ``RequestIDFilter`` no longer name-checks a single helper as the sole writer of ``LogContext.request_id``. Tests added at ``workers/shared/tests/test_logger_request_id.py`` (22 tests covering filter fallback chain, coerce rejection, key- priority scan, signature-bound positional args, dataclass arg, nested dict priority, baseline preservation, and concurrent install). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Tests at workers/shared/tests/test_logger_request_id.py have been
moved out of this PR. Rationale:
* Keeps this PR scoped to the P0 ops fix (logger.py only).
* Test conventions for workers/shared/tests/ deserve their own
review (only one existing test file there today).
* pytest project config integration (coverage flags etc.) needs
separate verification.
The 22 tests have been verified locally against the current
implementation; a follow-up PR will add them once test layout
conventions are confirmed.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@workers/shared/infrastructure/logging/logger.py`:
- Around line 601-604: The _coerce_id function currently treats bools as ints
(since bool is a subclass of int) and will coerce True/False to "True"/"False";
update _coerce_id to explicitly exclude booleans before the integer check—either
add a boolean guard (if isinstance(value, bool): return None) above the int/UUID
branch or change the int test to "if isinstance(value, int) and not
isinstance(value, bool)" so only real integers (and UUIDs) are stringified; keep
the str(value) return for valid ints/UUIDs and preserve the existing string
handling.
- Around line 706-722: Replace the `@functools.lru_cache` guard on
_install_celery_request_id_signals with a double-checked locking pattern: add a
module-level threading.Lock (e.g. _signals_install_lock) and a boolean flag
(e.g. _signals_installed = False), then in _install_celery_request_id_signals
first check _signals_installed and return if True, otherwise acquire the lock,
re-check _signals_installed, perform the celery import and connect calls to
_bind_task_context and _clear_task_context (with weak=False) inside the locked
section, set _signals_installed = True on success, and keep the ImportError
logging behavior unchanged.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 713b865e-813a-43ff-8ac5-0e10935589db
📒 Files selected for processing (2)
workers/shared/infrastructure/logging/logger.pyworkers/shared/tests/test_logger_request_id.py
Sonar flagged _gather_containers at cognitive complexity 19 (max 15).
Extracted two single-purpose helpers:
* _bind_positional_args_to_names -- the inspect.signature path that
catches send_task("async_execute_bin", args=[schema, wf, exec, ...])
where ids are passed as positional strings.
* _arg_as_mapping -- coerces a single positional arg into a Mapping
(Mapping pass-through, dataclass via dataclasses.asdict, else None).
_gather_containers itself is now a flat ordered append: bound names,
kwargs + nested mappings, then per-arg coercion. Behaviour is
unchanged; verified with sanity scenarios for each input shape.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
🧹 Nitpick comments (1)
workers/shared/infrastructure/logging/logger.py (1)
707-711: Consider logging suppressed extraction failures.The broad
except Exceptionis intentional defensive code, but silently swallowing errors makes debugging malformed payloads difficult. Logging at DEBUG level would aid observability without affecting production noise.Suggested improvement
try: request_id = _extract_request_id(args or (), kwargs or {}, task) or task_id except Exception: + logging.getLogger(__name__).debug( + "request_id extraction failed for task %s; falling back to task_id", + task_id, + exc_info=True, + ) request_id = task_id WorkerLogger.update_context(request_id=request_id, task_id=task_id)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@workers/shared/infrastructure/logging/logger.py` around lines 707 - 711, The try/except around _extract_request_id currently swallows all exceptions silently; modify it to catch Exception as e and log the suppressed failure at DEBUG (including exception details and relevant context like task, args, kwargs, and task_id) before falling back to request_id = task_id so you retain current behavior; update the block that calls _extract_request_id and WorkerLogger.update_context to log the caught exception (e) at debug level for observability.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@workers/shared/infrastructure/logging/logger.py`:
- Around line 707-711: The try/except around _extract_request_id currently
swallows all exceptions silently; modify it to catch Exception as e and log the
suppressed failure at DEBUG (including exception details and relevant context
like task, args, kwargs, and task_id) before falling back to request_id =
task_id so you retain current behavior; update the block that calls
_extract_request_id and WorkerLogger.update_context to log the caught exception
(e) at debug level for observability.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 10545038-0c5b-411d-8dd5-18de1227c3b3
📒 Files selected for processing (1)
workers/shared/infrastructure/logging/logger.py
Two valid nitpicks from coderabbit on the previous commits:
* ``_coerce_id`` now explicitly rejects ``bool`` before the ``int``
check. Without the guard, ``True``/``False`` would coerce to the
literal strings ``"True"``/``"False"`` because ``bool`` is a
subclass of ``int`` in Python.
* ``_bind_task_context`` now logs at DEBUG with ``exc_info=True``
when ``_extract_request_id`` raises, instead of swallowing the
exception silently. Behaviour is unchanged in production
(request_id still falls back to task_id); the log line lets
operators diagnose malformed payloads from the executor pod.
Skipped coderabbit's third suggestion (replace ``@functools.lru_cache``
with explicit ``threading.Lock`` + flag) -- contradicts the explicit
preference in PR review #11 for the simpler lru_cache pattern.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
Test ResultsSummary
Runner Tests - Full Report
SDK1 Tests - Full Report
|
There was a problem hiding this comment.
@muhammad-ali-e so earlier was this not done in a standardised way? Also if the execution id is not available will we still autogenerate request id? If not that should be done
cc: @greptileai
There was a problem hiding this comment.
On standardization: No, it wasn't standardized before. The infrastructure existed (RequestIDFilter, LogContext.request_id, the format string request_id:%(request_id)s) but was never wired up. Workers calling log_context() only set task_id/execution_id/workflow_id etc., and the filter only checked record.request_id (set via extra=) — it never fell back to LogContext. So every worker was emitting request_id:- unconditionally.
On auto-generation when no execution ID is available: It's already handled. In _bind_task_context:
request_id = _extract_request_id(args or (), kwargs or {}, task) or task_idIf _extract_request_id finds none of the keys (request_id, file_execution_id, execution_id, run_id) in the payload, it returns None, and the or task_id fallback kicks in — so every task always gets a non-empty request_id bound, using Celery's auto-generated task UUID as the last resort. The error path in _bind_task_context also falls back to task_id. So there's no scenario where request_id is left empty after prerun.
Tip: You can customize Greptile's behavior for this repo with .greptile/rules.md and .greptile/config.json.



What
RequestIDFilternow falls back toLogContext.request_idfrom the thread-local context whenrecord.request_idisn't set explicitly viaextra=._extract_request_id()resolves a usable correlation ID from a Celery task's payload using the priorityrequest_id > file_execution_id > execution_id > run_id. Looks at top-level kwargs and inside dict args (e.g.ExecutionContext/ batch payloads).task_prerun/task_postrunsignal handlers (_bind_task_context/_clear_task_context) bindrequest_idontoLogContextfor every task and clear it on completion. Falls back to Celerytask_idwhen the payload has none._install_celery_request_id_signals) is idempotent and silently no-ops if Celery is not importable (unit tests).Why
The new workers service was emitting
request_id:- trace_id:- span_id:-in every log line, blocking ops debugging — flagged as P0 in UN-3435.The plumbing already existed (
RequestIDFilter,OTelFieldFilter,LogContext.request_id, format stringrequest_id:%(request_id)s) but no code path actually populated therequest_idfield on log records. Even workers that calledlog_context()only settask_id/execution_id/workflow_id/pipeline_id/organization_id— neverrequest_id. And the filter only read fromrecord.request_id, not fromLogContext, so even if context was set, the filter wouldn't see it.How
task_prerunsignal fires for every task in every worker (executor, file_processing, callback, api-deployment, general, notification, scheduler, ide_callback, log_consumer, plus cloud'sbulk_downloadandagentic_callback). Binding once at the framework level avoids touching every task entry-point and keeps cloud workers free of changes.file_execution_idas the value bound torequest_id(legacy structure-tool behaviour) —structure_tool_task.py:461,490already setsExecutionContext.request_id = file_execution_id. This gives per-file granularity in logs across the multi-tool execution chain (structure tool → executor → platform-service). Cross-service correlation is handled separately by OpenTelemetrytrace_id, not byrequest_id._extract_request_idis also the migration path. Callers may start passing a real HTTPrequest_idin the payload at any time and it will take precedence overfile_execution_idautomatically — no worker change required.unstract-cloud(workerExecutorV2OTel auto-instrumentation wrapper inotel.values.yaml).Can this PR break any existing features. If yes, please list possible items. If no, please explain why.
Safe. Changes are additive:
RequestIDFilterstill returns"-"when no value is found anywhere — same observable output as before for code paths that don't setrequest_id.LogContext.request_idandLogContext.task_id), andtask_postrunclears the context to avoid bleed-through across tasks.log_context()callers continue to work — verified that prerun-boundrequest_idsurvives nestedlog_context(...)blocks.Database Migrations
None.
Env Config
None.
Relevant Docs
Related Issues or PRs
unstract-cloudfor executor V2 OTel auto-instrumentation (forthcoming PR).Dependencies Versions
None.
Notes on Testing
Verified locally with manual scenarios covering:
RequestIDFilterfallback chain:record.request_id(extra=) >LogContext.request_id>"-"._coerce_idrejects non-id types (objects, lists) and accepts onlystr/int/UUID._extract_request_idpriority by KEY (request_id>file_execution_id>execution_id>run_id) across containers._extract_request_idresolves positional-string args viainspect.signature(task.run).bind_partial(...)— coversasync_execute_bin(schema, workflow_id, execution_id, ...)._extract_request_idsearches dict args, dict-valued kwargs, and dataclass args._bind_task_contextswallows extraction errors and falls back totask_id(no stale state on misbehaving payload)._clear_task_contextresets onlyrequest_id/task_id; baselineworker_nameis preserved.@functools.lru_cache(maxsize=1)install is idempotent under concurrent calls (verifiedcache_info()reports 1 miss / 19 hits across 20 threads).A follow-up PR will commit a regression test file (
workers/shared/tests/test_logger_request_id.py, 22 tests) once test layout conventions forworkers/shared/tests/are agreed.After deploy, ops should see
request_id:<file_execution_id>populated in worker logs (replacesrequest_id:-). Once the pairedunstract-cloudhelm change rolls out,trace_id/span_idwill also be populated.Screenshots
Checklist