Reconciliation Run Lifecycle Events
The reconciliation run lifecycle is built on a layered event architecture that separates infrastructure events (emitted by the Argo exit handler when a workflow finishes) from platform events (emitted by Core-API after the database is updated). This separation eliminates duplicate notifications and guarantees that downstream consumers always see DB-consistent state.
Key Concepts
Section titled “Key Concepts”Infrastructure vs Platform Events
Section titled “Infrastructure vs Platform Events”- Infrastructure events (
run.completed,run.errored) are emitted by the Argo exit handler the moment a workflow finishes. They reflect the raw Argo outcome and may arrive before the database is updated. - Platform events (
run.finalised) are emitted by Core-API’sRunCompletionControllerafter the database has been updated, the job event has been recorded, and the WebSocket broadcast has been sent. This guarantees DB consistency for all downstream consumers.
run.finalised — The Single Notification Trigger
Section titled “run.finalised — The Single Notification Trigger”The email-notifier subscribes exclusively to run.finalised. It does not listen to run.completed or run.errored. This is a structural fix for duplicate emails — the email-notifier’s Knative Trigger physically cannot receive infrastructure events from the Argo exit handler.
Errored vs Unmatched
Section titled “Errored vs Unmatched”- Errored means the workflow could not run to completion due to an infrastructure issue (crash, timeout, resource failure). Represented by
run.errored. - Unmatched means the reconciliation comparison ran successfully but found data discrepancies outside tolerances. This is a business outcome, not an error. Represented by
run.completedwithresult: UNMATCHED.
RunResult
Section titled “RunResult”The result field on run.completed and run.finalised events uses the RunResult enum:
| Value | Meaning |
|---|---|
MATCHED | 100% join match across all sources AND all tolerances passed |
UNMATCHED | Any unmatched groups across sources OR any tolerance failures |
100% join match is required for MATCHED. Any non-zero values in unmatched_by_source result in UNMATCHED.
Event Flow Diagrams
Section titled “Event Flow Diagrams”Before (Legacy)
Section titled “Before (Legacy)”The legacy architecture caused duplicate emails because both the Argo exit handler and Core-API emitted events with the same CloudEvent type strings.
sequenceDiagram participant User participant CoreAPI as Core-API participant Agent as Platform-Agent participant Argo as Argo Workflow participant Broker as Knative Broker participant Email as Email-Notifier participant Webhook as Webhook-Service
User->>CoreAPI: Trigger run CoreAPI->>Agent: gRPC submitJob Agent->>Argo: Create Workflow CoreAPI-->>Broker: JOB_RUN_STARTED (misleading)
Note over Argo: Workflow executes...
Argo->>Broker: run.completed OR run.failed (from exit handler) Broker->>Email: ❌ Email #1 (raw Argo event, possibly stale DB) Broker->>CoreAPI: run.completed/failed CoreAPI->>CoreAPI: Update DB, record event, broadcast WS CoreAPI-->>Broker: run.completed OR run.failed (RE-EMITTED) Broker->>Email: ❌ Email #2 (duplicate, different status string) Broker->>Webhook: Both events forwardedAfter (Redesigned)
Section titled “After (Redesigned)”The redesigned architecture introduces run.finalised as the single authoritative event for downstream consumers.
sequenceDiagram participant User participant CoreAPI as Core-API participant Agent as Platform-Agent participant Argo as Argo Workflow participant Workers as Recon Workers participant Broker as Knative Broker participant Email as Email-Notifier participant Webhook as Webhook-Service
User->>CoreAPI: Trigger run CoreAPI-->>Broker: run.triggered CoreAPI->>Agent: gRPC submitJob Agent-->>CoreAPI: Accepted (workflow name) CoreAPI-->>Broker: run.queued
Note over Argo: Workflow executes... Workers-->>Broker: run.extraction.started/completed/errored Workers-->>Broker: run.comparison.started/completed/errored Workers-->>Broker: run.stage.started/completed/errored
Argo->>Broker: run.completed (result: MATCHED|UNMATCHED)<br/>OR run.errored (error: {code, message}) Broker->>CoreAPI: run.completed / run.errored CoreAPI->>CoreAPI: Update DB, check cancel_requested flag, record event, broadcast WS CoreAPI-->>Broker: run.finalised (DB-backed, authoritative) Broker->>Email: ✅ Single email from run.finalised Broker->>Webhook: All events forwarded (no filter)Cancellation Flow
Section titled “Cancellation Flow”Cancellation uses a cancel_requested flag rather than an immediate status change. This handles the race condition where the Argo workflow may have already completed by the time the cancel request is processed.
sequenceDiagram participant User participant CoreAPI as Core-API participant Agent as Platform-Agent participant Argo as Argo Workflow participant Broker as Knative Broker
User->>CoreAPI: Cancel run CoreAPI->>CoreAPI: Set cancel_requested=true on run record CoreAPI-->>Broker: run.cancel_requested (audit trail)
alt Run is QUEUED (not yet in Argo) CoreAPI->>CoreAPI: Set status=CANCELLED immediately alt argo_workflow_name is set (submission already happened) CoreAPI->>Agent: gRPC cancelJob (clean up workflow) end CoreAPI-->>Broker: run.cancelled else Run is RUNNING in Argo CoreAPI->>Agent: gRPC cancelJob (stop workflow) Note over Argo: Workflow stops... Argo->>Broker: run.errored (workflow was stopped) Broker->>CoreAPI: run.errored CoreAPI->>CoreAPI: Check cancel_requested=true → CANCELLED CoreAPI-->>Broker: run.cancelled (NOT run.finalised) Note over CoreAPI: Email-notifier does NOT receive this end
alt Race condition: workflow finished before cancel took effect Argo->>Broker: run.completed (result: MATCHED|UNMATCHED) Broker->>CoreAPI: run.completed CoreAPI->>CoreAPI: Check cancel_requested=true BUT run.completed → stale cancel CoreAPI-->>Broker: run.finalised (normal completion) Note over CoreAPI: Email-notifier processes this normally endCancelled runs emit run.cancelled, NOT run.finalised. Since the email-notifier only subscribes to run.finalised, cancelled runs are structurally excluded from email notifications.
Event Type Reference
Section titled “Event Type Reference”All event type strings follow the CloudEvents v1.0 specification. The DataRecsEventType enum in data-models-events is the single source of truth.
Run Lifecycle Events
Section titled “Run Lifecycle Events”| Event Type | CloudEvent Type String | Source | Emitting Service | Description |
|---|---|---|---|---|
run.triggered | io.datarecs.reconciliation.run.triggered | /datarecs/core-api | Core-API | Run was triggered by a user or schedule |
run.queued | io.datarecs.reconciliation.run.queued | /datarecs/core-api | Core-API | Argo workflow was submitted successfully |
run.queue_errored | io.datarecs.reconciliation.run.queue_errored | /datarecs/core-api | Core-API | Argo workflow submission failed |
run.completed | io.datarecs.reconciliation.run.completed | /datarecs/reconciliation-worker | Argo Exit Handler | Workflow succeeded (infrastructure event) |
run.errored | io.datarecs.reconciliation.run.errored | /datarecs/reconciliation-worker | Argo Exit Handler | Workflow failed (infrastructure event) |
run.finalised | io.datarecs.reconciliation.run.finalised | /datarecs/core-api | Core-API | DB updated, authoritative status (platform event) |
run.cancel_requested | io.datarecs.reconciliation.run.cancel_requested | /datarecs/core-api | Core-API | User requested cancellation (audit trail) |
run.cancelled | io.datarecs.reconciliation.run.cancelled | /datarecs/core-api | Core-API | Run confirmed cancelled |
run.rows_processed | io.datarecs.reconciliation.run.rows_processed | /datarecs/reconciliation-worker | Reconciliation Worker | Progress event (unchanged) |
Extraction Lifecycle Events
Section titled “Extraction Lifecycle Events”| Event Type | CloudEvent Type String | Source | Emitting Service | Description |
|---|---|---|---|---|
run.extraction.started | io.datarecs.reconciliation.run.extraction.started | /datarecs/reconciliation-worker | Reconciliation Worker | Extractor began execution |
run.extraction.completed | io.datarecs.reconciliation.run.extraction.completed | /datarecs/reconciliation-worker | Reconciliation Worker | Extractor finished successfully |
run.extraction.errored | io.datarecs.reconciliation.run.extraction.errored | /datarecs/reconciliation-worker | Reconciliation Worker | Extractor encountered an error |
Comparison Lifecycle Events
Section titled “Comparison Lifecycle Events”| Event Type | CloudEvent Type String | Source | Emitting Service | Description |
|---|---|---|---|---|
run.comparison.started | io.datarecs.reconciliation.run.comparison.started | /datarecs/reconciliation-worker | Reconciliation Worker | Comparator began execution |
run.comparison.completed | io.datarecs.reconciliation.run.comparison.completed | /datarecs/reconciliation-worker | Reconciliation Worker | Comparator finished (rich payload with join stats) |
run.comparison.errored | io.datarecs.reconciliation.run.comparison.errored | /datarecs/reconciliation-worker | Reconciliation Worker | Comparator encountered an error |
Stage Lifecycle Events
Section titled “Stage Lifecycle Events”| Event Type | CloudEvent Type String | Source | Emitting Service | Description |
|---|---|---|---|---|
run.stage.started | io.datarecs.reconciliation.run.stage.started | /datarecs/reconciliation-worker | Reconciliation Worker | Stage began execution |
run.stage.completed | io.datarecs.reconciliation.run.stage.completed | /datarecs/reconciliation-worker | Reconciliation Worker | Stage finished (includes result and tolerances) |
run.stage.errored | io.datarecs.reconciliation.run.stage.errored | /datarecs/reconciliation-worker | Reconciliation Worker | Stage encountered an error |
Payload Schemas
Section titled “Payload Schemas”run.triggered
Section titled “run.triggered”Emitted by Core-API when a user or schedule triggers a run.
{ "run_id": "uuid", "job_id": "uuid", "mode": "IMMEDIATE | SCHEDULED", "triggered_by": "user-id | scheduled"}The mode field records the trigger origin: IMMEDIATE (user clicked “Run Now”) or SCHEDULED (Airflow cron schedule fired).
run.queued
Section titled “run.queued”Emitted by Core-API after successful Argo workflow submission.
{ "run_id": "uuid", "job_id": "uuid", "argo_workflow_name": "recon-<job_id>-<run_id>"}run.queue_errored
Section titled “run.queue_errored”Emitted by Core-API when Argo workflow submission fails.
{ "run_id": "uuid", "job_id": "uuid", "reason": "Platform-Agent rejected the workflow: resource quota exceeded"}run.completed
Section titled “run.completed”Emitted by the Argo exit handler when the workflow succeeds. This is an infrastructure event.
{ "run_id": "uuid", "job_id": "uuid", "tenant_id": "uuid", "workflow_name": "recon-<job_id>-<run_id>", "result": "MATCHED | UNMATCHED"}The result field is read from /tmp/comparator-result.json, written by the comparator worker before it exits. If the file is missing or unreadable, the exit handler falls through to emitting run.errored with code UNKNOWN.
run.errored
Section titled “run.errored”Emitted by the Argo exit handler when the workflow fails. This is an infrastructure event.
{ "run_id": "uuid", "job_id": "uuid", "tenant_id": "uuid", "workflow_name": "recon-<job_id>-<run_id>", "error": { "code": "TIMED_OUT | CRASHED | QUERY_FAILED | UNKNOWN", "message": "Human-readable description of the failure" }}run.finalised
Section titled “run.finalised”Emitted by Core-API after the database has been updated. This is the single notification trigger for the email-notifier. NOT emitted for cancelled runs.
{ "run_id": "uuid", "job_id": "uuid", "tenant_id": "uuid", "result": "MATCHED | UNMATCHED"}{ "run_id": "uuid", "job_id": "uuid", "tenant_id": "uuid", "error": { "code": "TIMED_OUT | CRASHED | QUERY_FAILED | UNKNOWN", "message": "Human-readable description of the failure" }}Exactly one of result or error is present in the payload.
run.cancel_requested
Section titled “run.cancel_requested”Emitted by Core-API when a user requests cancellation. This is an audit trail event only — it does not change the run status or trigger downstream processing.
{ "run_id": "uuid", "job_id": "uuid", "tenant_id": "uuid", "cancelled_by": "user-id", "reason": "User requested cancellation"}run.cancelled
Section titled “run.cancelled”Emitted by Core-API when a run is confirmed cancelled.
{ "run_id": "uuid", "job_id": "uuid", "tenant_id": "uuid", "cancelled_by": "user-id", "reason": "User requested cancellation"}run.extraction.started
Section titled “run.extraction.started”{ "run_id": "uuid", "job_id": "uuid", "tenant_id": "uuid", "extraction_index": 0, "connection_type": "postgres"}run.extraction.completed
Section titled “run.extraction.completed”{ "run_id": "uuid", "job_id": "uuid", "tenant_id": "uuid", "extraction_index": 0, "rows_extracted": 15000}run.extraction.errored
Section titled “run.extraction.errored”{ "run_id": "uuid", "job_id": "uuid", "tenant_id": "uuid", "extraction_index": 0, "error": { "code": "QUERY_FAILED", "message": "relation \"orders\" does not exist" }}run.comparison.started
Section titled “run.comparison.started”{ "run_id": "uuid", "job_id": "uuid", "tenant_id": "uuid", "input_source_count": 2}run.comparison.completed
Section titled “run.comparison.completed”The richest payload in the lifecycle. Contains full join statistics, tolerance results, and the overall comparison outcome.
{ "run_id": "uuid", "job_id": "uuid", "tenant_id": "uuid", "source_row_counts": { "source_0": 1000, "source_1": 1005 }, "join_stats": { "matched_groups": 980, "unmatched_by_source": { "source_0": 5, "source_1": 20 } }, "tolerances": [ { "measure_name": "total_amount", "tolerance_type": "ABSOLUTE", "tolerance_value": 0.01, "within_tolerance_count": 975, "outside_tolerance_count": 5, "passed": false } ], "result": "UNMATCHED", "rows_compared": 1005, "rows_matched": 975, "rows_unmatched": 30}The join_stats.unmatched_by_source field is a Record<string, number> that scales to N data sources, rather than a two-source left_only/right_only model.
run.comparison.errored
Section titled “run.comparison.errored”{ "run_id": "uuid", "job_id": "uuid", "tenant_id": "uuid", "error": { "code": "COMPARISON_FAILED", "message": "DuckDB out of memory during full outer join" }}run.stage.started
Section titled “run.stage.started”{ "run_id": "uuid", "job_id": "uuid", "tenant_id": "uuid", "stage_name": "balance-check"}run.stage.completed
Section titled “run.stage.completed”Each stage runs one comparison across all sources for that stage’s dimensions and tolerances. The stage is the unit of pass/fail.
{ "run_id": "uuid", "job_id": "uuid", "tenant_id": "uuid", "stage_name": "balance-check", "result": "MATCHED | UNMATCHED", "tolerances": [ { "measure_name": "total_amount", "tolerance_type": "ABSOLUTE", "tolerance_value": 0.01, "within_tolerance_count": 975, "outside_tolerance_count": 5, "passed": false } ]}run.stage.errored
Section titled “run.stage.errored”{ "run_id": "uuid", "job_id": "uuid", "tenant_id": "uuid", "stage_name": "balance-check", "error": { "code": "COMPARISON_FAILED", "message": "Stage failed during comparison phase" }}run.rows_processed
Section titled “run.rows_processed”Progress event emitted during execution. Unchanged from the previous design.
{ "run_id": "uuid", "job_id": "uuid", "stage_name": "balance-check", "row_count": 5000, "mismatch_count": 12}Error Code Taxonomy
Section titled “Error Code Taxonomy”When a workflow fails, the Argo exit handler inspects the failure metadata and maps it to one of four well-known error codes. These codes appear in the error.code field of run.errored and run.finalised payloads.
| Error Code | Argo Failure Reason | Description |
|---|---|---|
TIMED_OUT | Workflow exceeded activeDeadlineSeconds | The workflow ran longer than the configured deadline (default 3600s). The Argo failure message contains activeDeadlineSeconds or exceeded deadline. |
CRASHED | Pod OOMKilled or unexpected termination | A worker pod was killed by the kernel OOM killer or terminated unexpectedly. The failure message contains OOMKilled or memory. |
QUERY_FAILED | Extractor step failed | An extractor step failed, typically due to a SQL query error or connection failure. Detected when the failed step name starts with extract-. |
UNKNOWN | Any other failure | The failure reason could not be classified into a known category. This is the default fallback. |
The exit handler reads {{workflow.failures}} (Argo template variable containing a JSON array of failed nodes) and {{workflow.status}} to make this determination. The mapping is deterministic — each failure maps to exactly one code.
Cancellation State Machine
Section titled “Cancellation State Machine”The cancellation flow uses a two-phase approach to handle the race condition between a user’s cancel request and the Argo workflow’s natural completion.
- User requests cancellation via the API.
- Core-API sets
cancel_requested = trueon the run record (without changing status). - Core-API emits
run.cancel_requested(audit trail only). - If the run is QUEUED: Core-API immediately sets status to CANCELLED and emits
run.cancelled. Ifargo_workflow_nameis set, Core-API also calls Platform-Agent to clean up the workflow. - If the run is RUNNING: Core-API calls Platform-Agent to stop the Argo workflow. The final status is determined later by
RunCompletionControllerwhen the Argo completion event arrives. - When the Argo event arrives,
RunCompletionControllerchecks thecancel_requestedflag:cancel_requested = true+run.errored→ workflow was stopped by the cancel. Final status: CANCELLED. Emitsrun.cancelled. Does NOT emitrun.finalised.cancel_requested = true+run.completed→ workflow finished before cancel took effect (stale cancel). Final status: COMPLETED. Emitsrun.finalisednormally.cancel_requested = false→ normal processing. Emitsrun.finalised.
Knative Trigger Configuration
Section titled “Knative Trigger Configuration”Core-API
Section titled “Core-API”Core-API subscribes to infrastructure events from the Argo exit handler:
# Trigger: run.completedfilter: attributes: type: io.datarecs.reconciliation.run.completed
# Trigger: run.erroredfilter: attributes: type: io.datarecs.reconciliation.run.erroredEmail-Notifier
Section titled “Email-Notifier”The email-notifier subscribes to a single platform event. Cancelled runs are structurally excluded because they emit run.cancelled, not run.finalised.
# Single trigger: run.finalised onlyfilter: attributes: type: io.datarecs.reconciliation.run.finalisedWebhook Service
Section titled “Webhook Service”The webhook service has no type filter — it receives all events from the broker. No configuration changes needed.