feat(governance): Policy Agent batch settings + per-task manual-grant reason#28746
feat(governance): Policy Agent batch settings + per-task manual-grant reason#28746yan-3005 wants to merge 8 commits into
Conversation
… reason Supporting OpenMetadata changes for the Collate Data Access Request (DAR) concurrency work — clubbing concurrent policy-agent grants into a single ingestion run. - workflowSettings: add policyAgentConfiguration (pollingIntervalSeconds, batchWindowSeconds, batchMaxSize), read by the Collate batch coordinator. - policyAgentTaskDefinition: add pollingIntervalSeconds node config. - CreateTask: optionally carry a workflow-supplied reason into the created task's payload as `manualGrantReason` (its own structured field). Strict no-op (try/catch + instanceof) for workflows that do not set it, so generic approval workflows (Glossary, etc.) are byte-for-byte unaffected. - Tests: CreateTaskManualGrantReasonTest; the 44 generic CreateTaskTest pass. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
✅ PR checks passedThe linked issue has a description and all required Shipping project fields set. Thanks! |
|
Hi there 👋 Thanks for your contribution! The OpenMetadata team will review the PR shortly! Once it has been labeled as Let us know if you need any help! |
There was a problem hiding this comment.
Pull request overview
This PR extends OpenMetadata governance/workflow capabilities to support Policy Agent batching for Data Access Request (DAR) concurrency, and adds support for carrying a workflow-supplied manual-grant reason into created Task payloads as a structured field.
Changes:
- Added
policyAgentConfigurationtoworkflowSettings(polling interval + batching window/size controls) for the Policy Agent batch coordinator. - Extended
PolicyAgentTaskDefinitionconfig withpollingIntervalSeconds(per-node override). - Updated
CreateTaskto optionally merge a workflow-suppliedmanualGrantReasoninto task payload, with new unit tests covering merge behavior.
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 4 comments.
| File | Description |
|---|---|
| openmetadata-spec/src/main/resources/json/schema/governance/workflows/elements/nodes/automatedTask/policyAgentTaskDefinition.json | Adds per-node polling interval setting for Policy Agent task nodes. |
| openmetadata-spec/src/main/resources/json/schema/configuration/workflowSettings.json | Introduces server-wide Policy Agent batching configuration under workflow settings. |
| openmetadata-service/src/test/java/org/openmetadata/service/governance/workflows/elements/nodes/userTask/CreateTaskManualGrantReasonTest.java | Adds unit tests validating manualGrantReason payload merge behavior. |
| openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/userTask/CreateTask.java | Merges optional workflow manualGrantReason into task payload during create/update. |
✅ TypeScript Types Auto-UpdatedThe generated TypeScript types have been automatically updated based on JSON schema changes in this PR. |
🟡 Playwright Results — all passed (11 flaky)✅ 4293 passed · ❌ 0 failed · 🟡 11 flaky · ⏭️ 88 skipped
🟡 11 flaky test(s) (passed on retry)
How to debug locally# Download playwright-test-results-<shard> artifact and unzip
npx playwright show-trace path/to/trace.zip # view trace |
- workflowSettings.policyAgentConfiguration: add `minimum: 1` to pollingIntervalSeconds / batchWindowSeconds / batchMaxSize; add `batchWorkerThreads` (default 4) so the coordinator's worker-pool size is configurable instead of a hard-coded cap. - policyAgentTaskDefinition: add `minimum: 1` to pollingIntervalSeconds and correct the fallback doc path to workflowSettings.policyAgentConfiguration.pollingIntervalSeconds. - Regenerate UI TypeScript for WorkflowSettings and PolicyAgentTaskDefinition so the generated types stay in sync with the schema. - Align the new test's license-header year (2026) with neighbouring tests. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
✅ TypeScript Types Auto-UpdatedThe generated TypeScript types have been automatically updated based on JSON schema changes in this PR. |
…er-wide default Removing the per-node default (30) makes the generated field nullable, so an unset node value is now distinguishable from a configured one. The Collate PolicyAgentTask resolves it to the node override → server-wide policyAgentConfiguration.pollingIntervalSeconds → built-in default, which is what the field description already promised. (minimum:1 stays.) Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
mergeManualGrantReason called JsonUtils.getMap (a Jackson convertValue) even when the payload was already a Map, which can re-type nested values and can throw. Copy the existing map entries directly (mirroring withGrantExpirationDate) and only fall back to a guarded conversion for the unexpected non-Map case, so task creation is never broken by a conversion issue. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
| LOG.warn( | ||
| "[CreateTask] Could not merge manualGrantReason into payload of type {}", | ||
| payload.getClass().getSimpleName()); | ||
| return payload; |
Summary.from_step caps the reported failure details to the first 10 (status payload size). That's fine for diagnostics but unsafe when downstream logic needs the COMPLETE per-item failure list — e.g. the Collate Policy Agent batch coordinator, which decides each clubbed request's outcome (grant vs manual) from exactly which policy ids failed; a truncated list silently grants the un-reported failures. Add an opt-in `report_all_failures` flag (default False → every existing connector keeps the 10-cap). The error COUNT was already accurate. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Code Review 👍 Approved with suggestions 4 resolved / 5 findingsImplements Data Access Request concurrency settings and adds manual-grant reason support, addressing issues with interval bounds and task state management. Consider updating the error logging in 💡 Quality: Best-effort catch logs only getMessage(), dropping stack traceThe catch block in Pass the throwable to SLF4J so the full stack trace is logged.✅ 4 resolved✅ Quality: Schema doc references wrong settings path
✅ Edge Case: No minimum bound on new integer interval settings
✅ Edge Case: Concurrent runs can leave two live approvals despite invariant
✅ Edge Case: Superseded Flowable process left running if task read non-terminal
🤖 Prompt for agentsOptionsDisplay: compact → Showing less information. Comment with these commands to change:
Was this helpful? React with 👍 / 👎 | Gitar |
| private static String resolveManualGrantReason(DelegateTask delegateTask) { | ||
| try { | ||
| Object reason = | ||
| new WorkflowVariableHandler(delegateTask) | ||
| .getNamespacedVariable(GLOBAL_NAMESPACE, "manualGrantReason"); | ||
| return reason instanceof String s ? s : null; | ||
| } catch (Exception e) { | ||
| return null; | ||
| } |
There was a problem hiding this comment.
Silent exception swallowing on the DAR path
resolveManualGrantReason swallows all exceptions without any trace. For non-DAR workflows this is intentional (the variable simply isn't set), but on the Policy Agent path a genuine error — e.g., a type mismatch or a deserialization failure — would silently return null, causing manualGrantReason to be dropped from the task payload with no indication in the logs. A LOG.debug or LOG.trace call in the catch block would make this scenario diagnosable without adding noise for the common non-DAR case.
| failures=( | ||
| (step.status.failures if getattr(step, "report_all_failures", False) else step.status.failures[0:10]) | ||
| if step.status.failures | ||
| else None |
There was a problem hiding this comment.
report_all_failures is undeclared on the base Step class
The attribute is consumed via getattr(step, "report_all_failures", False), but Step never declares it (not as a class variable, not in __init__, and there's no Protocol or type annotation). Any step that forgets to set it will silently revert to the 10-failure cap. For the Policy Agent use case, where truncation causes incorrect security decisions (as the comment notes), a missed attribute is indistinguishable from the deliberate opt-out. Declaring report_all_failures: bool = False in Step.__init__ would make the contract explicit and allow static analysis to catch misuse.
|
|



Fixes #28759
What
Supporting OpenMetadata changes for the Collate Data Access Request (DAR) concurrency work — clubbing concurrent policy-agent grants into a single ingestion run per pipeline (Collate PR links to this one).
workflowSettings— addpolicyAgentConfiguration:pollingIntervalSeconds(default 30),batchWindowSeconds(default 60, the scheduler/window),batchMaxSize(default 200),batchWorkerThreads(default 4, the coordinator's worker-pool size — bounds how many distinct pipelines provision concurrently). All boundedminimum: 1.policyAgentTaskDefinition— add an optional per-nodepollingIntervalSeconds. When unset it inherits the server-widepolicyAgentConfiguration.pollingIntervalSeconds(then the built-in default); it has no own default so "unset" stays meaningful.CreateTask— optionally carry a workflow-supplied reason into the created task's payload asmanualGrantReason(its own structured field), instead of overloading the description.Generic approval workflows are unaffected
The
CreateTaskchange is a strict no-op for any workflow that doesn't set themanualGrantReasonworkflow variable (Glossary approval, tag/description suggestions, test-case resolution, …): the read is wrapped intry/catch+instanceof, andmergeManualGrantReason(payload, null)returns the payload unchanged. So those task types are byte-for-byte identical to before.Tests
CreateTaskManualGrantReasonTest— no-op for null/blank reason, setspayload.manualGrantReason, preserves existing payload keys, overwrites on stage re-entry.CreateTaskTest(44 existing) still pass — the generic task-creation path is unchanged.🤖 Generated with Claude Code
ingestion(api/step.py) — add an opt-inreport_all_failuresflag.Summary.from_stepcaps reported failure details to the first 10 (status payload size); the Collate Policy Agent needs the complete per-policy failure list because the batch coordinator decides each clubbed request's outcome (grant vs manual) from exactly which policy ids failed — a truncated list silently granted the un-reported failures. Default staysFalse, so every other connector keeps the 10-cap.Greptile Summary
This PR wires in OpenMetadata-side configuration and task-payload support for the Collate Policy Agent batch concurrency work. It adds
policyAgentConfigurationtoworkflowSettings(four tunable integers with safe defaults), a per-nodepollingIntervalSecondsoverride onpolicyAgentTaskDefinition, and amanualGrantReasonfield stamped into the task payload on the DAR approval path — all generated TypeScript types are updated to match.workflowSettings.json+ TS: NewpolicyAgentConfigurationobject (pollingIntervalSeconds,batchWindowSeconds,batchMaxSize,batchWorkerThreads) — all optional with sensible defaults; read by the Collate batch coordinator.CreateTask.java:resolveManualGrantReason+mergeManualGrantReasonstamp a workflow variable intopayload.manualGrantReason; both are strict no-ops for any workflow that doesn't set the variable, guarded by try/catch and blank-check.step.py: Opt-inreport_all_failuresflag (viagetattrwithFalsedefault) lets Policy Agent steps bypass the 10-failure cap so the coordinator can make accurate per-policy grant/manual decisions without silent truncation.Confidence Score: 4/5
Safe to merge; all changes are additive and backward-compatible — generic approval workflows are byte-for-byte unaffected by the manualGrantReason path.
The two non-generated files with behavioral changes (step.py and CreateTask.java) both have minor observability gaps: the report_all_failures opt-in is undeclared on the base class (a Policy Agent step that forgets to set it silently reverts to truncated failures), and resolveManualGrantReason swallows exceptions without any log trace (making a DAR-path variable resolution failure invisible). Neither gap causes incorrect behavior for existing workflows, but both could complicate debugging if something goes wrong in the batch coordinator integration.
ingestion/src/metadata/ingestion/api/step.py and CreateTask.java are the two files worth a second look — the rest are generated TypeScript and JSON schema additions.
Important Files Changed
Sequence Diagram
%%{init: {'theme': 'neutral'}}%% sequenceDiagram participant WF as Workflow Engine participant CT as CreateTask.java participant PA as Policy Agent Step (Python) participant Coord as Batch Coordinator (Collate) WF->>CT: notify(delegateTask) [DAR path] CT->>CT: resolveManualGrantReason(delegateTask) CT->>CT: mergeManualGrantReason(payload, reason) CT-->>WF: Task saved with payload.manualGrantReason WF->>PA: run pipeline batch PA->>PA: "report_all_failures=True → Summary.from_step() returns full failure list" PA-->>Coord: StepSummary (all failures, not capped at 10) Coord->>Coord: per-policy grant vs manual-review decision Coord-->>WF: batch result (policyAgentConfiguration settings applied)%%{init: {'theme': 'base', 'themeVariables': {"darkMode": true, "background": "#0d1117", "primaryColor": "#21262d", "primaryTextColor": "#e6edf3", "primaryBorderColor": "#8b949e", "lineColor": "#8b949e", "textColor": "#e6edf3", "edgeLabelBackground": "#161b22", "actorBkg": "#21262d", "actorBorder": "#8b949e", "actorTextColor": "#e6edf3", "actorLineColor": "#8b949e", "signalColor": "#8b949e", "signalTextColor": "#e6edf3", "noteBkgColor": "#373320", "noteBorderColor": "#d4a72c", "noteTextColor": "#f0e6c0", "labelBoxBkgColor": "#21262d", "labelBoxBorderColor": "#8b949e", "labelTextColor": "#e6edf3", "loopTextColor": "#e6edf3", "activationBkgColor": "#30363d", "activationBorderColor": "#8b949e"}}}%% sequenceDiagram participant WF as Workflow Engine participant CT as CreateTask.java participant PA as Policy Agent Step (Python) participant Coord as Batch Coordinator (Collate) WF->>CT: notify(delegateTask) [DAR path] CT->>CT: resolveManualGrantReason(delegateTask) CT->>CT: mergeManualGrantReason(payload, reason) CT-->>WF: Task saved with payload.manualGrantReason WF->>PA: run pipeline batch PA->>PA: "report_all_failures=True → Summary.from_step() returns full failure list" PA-->>Coord: StepSummary (all failures, not capped at 10) Coord->>Coord: per-policy grant vs manual-review decision Coord-->>WF: batch result (policyAgentConfiguration settings applied)Reviews (1): Last reviewed commit: "feat(ingestion): let a step opt out of t..." | Re-trigger Greptile