Skip to content

Reconciliation Run Lifecycle Events

The reconciliation run lifecycle is built on a layered event architecture that separates infrastructure events (emitted by the Argo exit handler when a workflow finishes) from platform events (emitted by Core-API after the database is updated). This separation eliminates duplicate notifications and guarantees that downstream consumers always see DB-consistent state.

  • Infrastructure events (run.completed, run.errored) are emitted by the Argo exit handler the moment a workflow finishes. They reflect the raw Argo outcome and may arrive before the database is updated.
  • Platform events (run.finalised) are emitted by Core-API’s RunCompletionController after the database has been updated, the job event has been recorded, and the WebSocket broadcast has been sent. This guarantees DB consistency for all downstream consumers.

run.finalised — The Single Notification Trigger

Section titled “run.finalised — The Single Notification Trigger”

The email-notifier subscribes exclusively to run.finalised. It does not listen to run.completed or run.errored. This is a structural fix for duplicate emails — the email-notifier’s Knative Trigger physically cannot receive infrastructure events from the Argo exit handler.

  • Errored means the workflow could not run to completion due to an infrastructure issue (crash, timeout, resource failure). Represented by run.errored.
  • Unmatched means the reconciliation comparison ran successfully but found data discrepancies outside tolerances. This is a business outcome, not an error. Represented by run.completed with result: UNMATCHED.

The result field on run.completed and run.finalised events uses the RunResult enum:

ValueMeaning
MATCHED100% join match across all sources AND all tolerances passed
UNMATCHEDAny unmatched groups across sources OR any tolerance failures

100% join match is required for MATCHED. Any non-zero values in unmatched_by_source result in UNMATCHED.

The legacy architecture caused duplicate emails because both the Argo exit handler and Core-API emitted events with the same CloudEvent type strings.

sequenceDiagram
participant User
participant CoreAPI as Core-API
participant Agent as Platform-Agent
participant Argo as Argo Workflow
participant Broker as Knative Broker
participant Email as Email-Notifier
participant Webhook as Webhook-Service
User->>CoreAPI: Trigger run
CoreAPI->>Agent: gRPC submitJob
Agent->>Argo: Create Workflow
CoreAPI-->>Broker: JOB_RUN_STARTED (misleading)
Note over Argo: Workflow executes...
Argo->>Broker: run.completed OR run.failed (from exit handler)
Broker->>Email: ❌ Email #1 (raw Argo event, possibly stale DB)
Broker->>CoreAPI: run.completed/failed
CoreAPI->>CoreAPI: Update DB, record event, broadcast WS
CoreAPI-->>Broker: run.completed OR run.failed (RE-EMITTED)
Broker->>Email: ❌ Email #2 (duplicate, different status string)
Broker->>Webhook: Both events forwarded

The redesigned architecture introduces run.finalised as the single authoritative event for downstream consumers.

sequenceDiagram
participant User
participant CoreAPI as Core-API
participant Agent as Platform-Agent
participant Argo as Argo Workflow
participant Workers as Recon Workers
participant Broker as Knative Broker
participant Email as Email-Notifier
participant Webhook as Webhook-Service
User->>CoreAPI: Trigger run
CoreAPI-->>Broker: run.triggered
CoreAPI->>Agent: gRPC submitJob
Agent-->>CoreAPI: Accepted (workflow name)
CoreAPI-->>Broker: run.queued
Note over Argo: Workflow executes...
Workers-->>Broker: run.extraction.started/completed/errored
Workers-->>Broker: run.comparison.started/completed/errored
Workers-->>Broker: run.stage.started/completed/errored
Argo->>Broker: run.completed (result: MATCHED|UNMATCHED)<br/>OR run.errored (error: {code, message})
Broker->>CoreAPI: run.completed / run.errored
CoreAPI->>CoreAPI: Update DB, check cancel_requested flag, record event, broadcast WS
CoreAPI-->>Broker: run.finalised (DB-backed, authoritative)
Broker->>Email: ✅ Single email from run.finalised
Broker->>Webhook: All events forwarded (no filter)

Cancellation uses a cancel_requested flag rather than an immediate status change. This handles the race condition where the Argo workflow may have already completed by the time the cancel request is processed.

sequenceDiagram
participant User
participant CoreAPI as Core-API
participant Agent as Platform-Agent
participant Argo as Argo Workflow
participant Broker as Knative Broker
User->>CoreAPI: Cancel run
CoreAPI->>CoreAPI: Set cancel_requested=true on run record
CoreAPI-->>Broker: run.cancel_requested (audit trail)
alt Run is QUEUED (not yet in Argo)
CoreAPI->>CoreAPI: Set status=CANCELLED immediately
alt argo_workflow_name is set (submission already happened)
CoreAPI->>Agent: gRPC cancelJob (clean up workflow)
end
CoreAPI-->>Broker: run.cancelled
else Run is RUNNING in Argo
CoreAPI->>Agent: gRPC cancelJob (stop workflow)
Note over Argo: Workflow stops...
Argo->>Broker: run.errored (workflow was stopped)
Broker->>CoreAPI: run.errored
CoreAPI->>CoreAPI: Check cancel_requested=true → CANCELLED
CoreAPI-->>Broker: run.cancelled (NOT run.finalised)
Note over CoreAPI: Email-notifier does NOT receive this
end
alt Race condition: workflow finished before cancel took effect
Argo->>Broker: run.completed (result: MATCHED|UNMATCHED)
Broker->>CoreAPI: run.completed
CoreAPI->>CoreAPI: Check cancel_requested=true BUT run.completed → stale cancel
CoreAPI-->>Broker: run.finalised (normal completion)
Note over CoreAPI: Email-notifier processes this normally
end

Cancelled runs emit run.cancelled, NOT run.finalised. Since the email-notifier only subscribes to run.finalised, cancelled runs are structurally excluded from email notifications.

All event type strings follow the CloudEvents v1.0 specification. The DataRecsEventType enum in data-models-events is the single source of truth.

Event TypeCloudEvent Type StringSourceEmitting ServiceDescription
run.triggeredio.datarecs.reconciliation.run.triggered/datarecs/core-apiCore-APIRun was triggered by a user or schedule
run.queuedio.datarecs.reconciliation.run.queued/datarecs/core-apiCore-APIArgo workflow was submitted successfully
run.queue_erroredio.datarecs.reconciliation.run.queue_errored/datarecs/core-apiCore-APIArgo workflow submission failed
run.completedio.datarecs.reconciliation.run.completed/datarecs/reconciliation-workerArgo Exit HandlerWorkflow succeeded (infrastructure event)
run.erroredio.datarecs.reconciliation.run.errored/datarecs/reconciliation-workerArgo Exit HandlerWorkflow failed (infrastructure event)
run.finalisedio.datarecs.reconciliation.run.finalised/datarecs/core-apiCore-APIDB updated, authoritative status (platform event)
run.cancel_requestedio.datarecs.reconciliation.run.cancel_requested/datarecs/core-apiCore-APIUser requested cancellation (audit trail)
run.cancelledio.datarecs.reconciliation.run.cancelled/datarecs/core-apiCore-APIRun confirmed cancelled
run.rows_processedio.datarecs.reconciliation.run.rows_processed/datarecs/reconciliation-workerReconciliation WorkerProgress event (unchanged)
Event TypeCloudEvent Type StringSourceEmitting ServiceDescription
run.extraction.startedio.datarecs.reconciliation.run.extraction.started/datarecs/reconciliation-workerReconciliation WorkerExtractor began execution
run.extraction.completedio.datarecs.reconciliation.run.extraction.completed/datarecs/reconciliation-workerReconciliation WorkerExtractor finished successfully
run.extraction.erroredio.datarecs.reconciliation.run.extraction.errored/datarecs/reconciliation-workerReconciliation WorkerExtractor encountered an error
Event TypeCloudEvent Type StringSourceEmitting ServiceDescription
run.comparison.startedio.datarecs.reconciliation.run.comparison.started/datarecs/reconciliation-workerReconciliation WorkerComparator began execution
run.comparison.completedio.datarecs.reconciliation.run.comparison.completed/datarecs/reconciliation-workerReconciliation WorkerComparator finished (rich payload with join stats)
run.comparison.erroredio.datarecs.reconciliation.run.comparison.errored/datarecs/reconciliation-workerReconciliation WorkerComparator encountered an error
Event TypeCloudEvent Type StringSourceEmitting ServiceDescription
run.stage.startedio.datarecs.reconciliation.run.stage.started/datarecs/reconciliation-workerReconciliation WorkerStage began execution
run.stage.completedio.datarecs.reconciliation.run.stage.completed/datarecs/reconciliation-workerReconciliation WorkerStage finished (includes result and tolerances)
run.stage.erroredio.datarecs.reconciliation.run.stage.errored/datarecs/reconciliation-workerReconciliation WorkerStage encountered an error

Emitted by Core-API when a user or schedule triggers a run.

{
"run_id": "uuid",
"job_id": "uuid",
"mode": "IMMEDIATE | SCHEDULED",
"triggered_by": "user-id | scheduled"
}

The mode field records the trigger origin: IMMEDIATE (user clicked “Run Now”) or SCHEDULED (Airflow cron schedule fired).

Emitted by Core-API after successful Argo workflow submission.

{
"run_id": "uuid",
"job_id": "uuid",
"argo_workflow_name": "recon-<job_id>-<run_id>"
}

Emitted by Core-API when Argo workflow submission fails.

{
"run_id": "uuid",
"job_id": "uuid",
"reason": "Platform-Agent rejected the workflow: resource quota exceeded"
}

Emitted by the Argo exit handler when the workflow succeeds. This is an infrastructure event.

{
"run_id": "uuid",
"job_id": "uuid",
"tenant_id": "uuid",
"workflow_name": "recon-<job_id>-<run_id>",
"result": "MATCHED | UNMATCHED"
}

The result field is read from /tmp/comparator-result.json, written by the comparator worker before it exits. If the file is missing or unreadable, the exit handler falls through to emitting run.errored with code UNKNOWN.

Emitted by the Argo exit handler when the workflow fails. This is an infrastructure event.

{
"run_id": "uuid",
"job_id": "uuid",
"tenant_id": "uuid",
"workflow_name": "recon-<job_id>-<run_id>",
"error": {
"code": "TIMED_OUT | CRASHED | QUERY_FAILED | UNKNOWN",
"message": "Human-readable description of the failure"
}
}

Emitted by Core-API after the database has been updated. This is the single notification trigger for the email-notifier. NOT emitted for cancelled runs.

{
"run_id": "uuid",
"job_id": "uuid",
"tenant_id": "uuid",
"result": "MATCHED | UNMATCHED"
}

Exactly one of result or error is present in the payload.

Emitted by Core-API when a user requests cancellation. This is an audit trail event only — it does not change the run status or trigger downstream processing.

{
"run_id": "uuid",
"job_id": "uuid",
"tenant_id": "uuid",
"cancelled_by": "user-id",
"reason": "User requested cancellation"
}

Emitted by Core-API when a run is confirmed cancelled.

{
"run_id": "uuid",
"job_id": "uuid",
"tenant_id": "uuid",
"cancelled_by": "user-id",
"reason": "User requested cancellation"
}
{
"run_id": "uuid",
"job_id": "uuid",
"tenant_id": "uuid",
"extraction_index": 0,
"connection_type": "postgres"
}
{
"run_id": "uuid",
"job_id": "uuid",
"tenant_id": "uuid",
"extraction_index": 0,
"rows_extracted": 15000
}
{
"run_id": "uuid",
"job_id": "uuid",
"tenant_id": "uuid",
"extraction_index": 0,
"error": {
"code": "QUERY_FAILED",
"message": "relation \"orders\" does not exist"
}
}
{
"run_id": "uuid",
"job_id": "uuid",
"tenant_id": "uuid",
"input_source_count": 2
}

The richest payload in the lifecycle. Contains full join statistics, tolerance results, and the overall comparison outcome.

{
"run_id": "uuid",
"job_id": "uuid",
"tenant_id": "uuid",
"source_row_counts": {
"source_0": 1000,
"source_1": 1005
},
"join_stats": {
"matched_groups": 980,
"unmatched_by_source": {
"source_0": 5,
"source_1": 20
}
},
"tolerances": [
{
"measure_name": "total_amount",
"tolerance_type": "ABSOLUTE",
"tolerance_value": 0.01,
"within_tolerance_count": 975,
"outside_tolerance_count": 5,
"passed": false
}
],
"result": "UNMATCHED",
"rows_compared": 1005,
"rows_matched": 975,
"rows_unmatched": 30
}

The join_stats.unmatched_by_source field is a Record<string, number> that scales to N data sources, rather than a two-source left_only/right_only model.

{
"run_id": "uuid",
"job_id": "uuid",
"tenant_id": "uuid",
"error": {
"code": "COMPARISON_FAILED",
"message": "DuckDB out of memory during full outer join"
}
}
{
"run_id": "uuid",
"job_id": "uuid",
"tenant_id": "uuid",
"stage_name": "balance-check"
}

Each stage runs one comparison across all sources for that stage’s dimensions and tolerances. The stage is the unit of pass/fail.

{
"run_id": "uuid",
"job_id": "uuid",
"tenant_id": "uuid",
"stage_name": "balance-check",
"result": "MATCHED | UNMATCHED",
"tolerances": [
{
"measure_name": "total_amount",
"tolerance_type": "ABSOLUTE",
"tolerance_value": 0.01,
"within_tolerance_count": 975,
"outside_tolerance_count": 5,
"passed": false
}
]
}
{
"run_id": "uuid",
"job_id": "uuid",
"tenant_id": "uuid",
"stage_name": "balance-check",
"error": {
"code": "COMPARISON_FAILED",
"message": "Stage failed during comparison phase"
}
}

Progress event emitted during execution. Unchanged from the previous design.

{
"run_id": "uuid",
"job_id": "uuid",
"stage_name": "balance-check",
"row_count": 5000,
"mismatch_count": 12
}

When a workflow fails, the Argo exit handler inspects the failure metadata and maps it to one of four well-known error codes. These codes appear in the error.code field of run.errored and run.finalised payloads.

Error CodeArgo Failure ReasonDescription
TIMED_OUTWorkflow exceeded activeDeadlineSecondsThe workflow ran longer than the configured deadline (default 3600s). The Argo failure message contains activeDeadlineSeconds or exceeded deadline.
CRASHEDPod OOMKilled or unexpected terminationA worker pod was killed by the kernel OOM killer or terminated unexpectedly. The failure message contains OOMKilled or memory.
QUERY_FAILEDExtractor step failedAn extractor step failed, typically due to a SQL query error or connection failure. Detected when the failed step name starts with extract-.
UNKNOWNAny other failureThe failure reason could not be classified into a known category. This is the default fallback.

The exit handler reads {{workflow.failures}} (Argo template variable containing a JSON array of failed nodes) and {{workflow.status}} to make this determination. The mapping is deterministic — each failure maps to exactly one code.

The cancellation flow uses a two-phase approach to handle the race condition between a user’s cancel request and the Argo workflow’s natural completion.

  1. User requests cancellation via the API.
  2. Core-API sets cancel_requested = true on the run record (without changing status).
  3. Core-API emits run.cancel_requested (audit trail only).
  4. If the run is QUEUED: Core-API immediately sets status to CANCELLED and emits run.cancelled. If argo_workflow_name is set, Core-API also calls Platform-Agent to clean up the workflow.
  5. If the run is RUNNING: Core-API calls Platform-Agent to stop the Argo workflow. The final status is determined later by RunCompletionController when the Argo completion event arrives.
  6. When the Argo event arrives, RunCompletionController checks the cancel_requested flag:
    • cancel_requested = true + run.errored → workflow was stopped by the cancel. Final status: CANCELLED. Emits run.cancelled. Does NOT emit run.finalised.
    • cancel_requested = true + run.completed → workflow finished before cancel took effect (stale cancel). Final status: COMPLETED. Emits run.finalised normally.
    • cancel_requested = false → normal processing. Emits run.finalised.

Core-API subscribes to infrastructure events from the Argo exit handler:

# Trigger: run.completed
filter:
attributes:
type: io.datarecs.reconciliation.run.completed
# Trigger: run.errored
filter:
attributes:
type: io.datarecs.reconciliation.run.errored

The email-notifier subscribes to a single platform event. Cancelled runs are structurally excluded because they emit run.cancelled, not run.finalised.

# Single trigger: run.finalised only
filter:
attributes:
type: io.datarecs.reconciliation.run.finalised

The webhook service has no type filter — it receives all events from the broker. No configuration changes needed.