Skip to content

Scheduling with Airflow

The DataRecs Airflow provider (apache-airflow-provider-datarecs) lets you orchestrate reconciliation jobs from Apache Airflow. You can trigger jobs by ID or by name, wait for them to complete, and retrieve results — all as native Airflow tasks.

  • Apache Airflow 2.7+ (or Airflow 3.x).
  • A DataRecs API key with permission to trigger jobs. See Managing API Keys.
Terminal window
pip install apache-airflow-provider-datarecs

Or, if you build a custom Airflow image:

COPY dist/apache_airflow_provider_datarecs-*.whl /tmp/provider/
RUN pip install --no-cache-dir /tmp/provider/*.whl
  1. In the Airflow UI, go to Admin → Connections.
  2. Click + to create a new connection.
  3. Set Connection Type to datarecs.
  4. Set Host to your DataRecs API URL (e.g. https://api.datarecs.io).
  5. Set API Key (password field) to your DataRecs API key.
  6. Save as datarecs_default (or any name — just pass conn_id to operators).
ComponentDescription
TriggerJobRunOperatorTrigger a job run by job ID and workspace ID.
TriggerJobRunByNameOperatorTrigger a job run by workspace name and job name (resolves IDs automatically).
TriggerJobGroupRunOperatorTrigger all jobs in a job group.
ListWorkspacesOperatorList all workspaces the API key can access.
ListJobsOperatorList jobs, optionally filtered by workspace ID.
JobRunSensorWait for a job run to reach a terminal status.
JobGroupRunSensorWait for a job group run to complete.
GetJobRunResultOperatorRetrieve the result of a completed job run.

The simplest way to run a reconciliation from Airflow. You provide the workspace and job by their human-readable names — the operator resolves the IDs for you.

from airflow.models.dag import DAG
from airflow.models.param import Param
from airflow.providers.datarecs.operators.trigger_job_run_by_name import TriggerJobRunByNameOperator
from airflow.providers.datarecs.operators.get_job_run_result import GetJobRunResultOperator
from airflow.providers.datarecs.sensors.job_run import JobRunSensor
with DAG(
dag_id="reconcile_daily_balances",
schedule="@daily",
catchup=False,
tags=["datarecs"],
params={
"workspace_name": Param(default="Production", type="string"),
"job_name": Param(default="daily-balance-check", type="string"),
},
) as dag:
trigger = TriggerJobRunByNameOperator(
task_id="trigger",
workspace_name="{{ params.workspace_name }}",
job_name="{{ params.job_name }}",
)
wait = JobRunSensor(
task_id="wait",
job_id="{{ ti.xcom_pull(task_ids='trigger', key='job_id') }}",
workspace_id="{{ ti.xcom_pull(task_ids='trigger', key='workspace_id') }}",
run_id="{{ ti.xcom_pull(task_ids='trigger', key='run_id') }}",
poke_interval=30,
timeout=7200,
)
result = GetJobRunResultOperator(
task_id="result",
job_id="{{ ti.xcom_pull(task_ids='trigger', key='job_id') }}",
workspace_id="{{ ti.xcom_pull(task_ids='trigger', key='workspace_id') }}",
run_id="{{ ti.xcom_pull(task_ids='trigger', key='run_id') }}",
)
trigger >> wait >> result

If the workspace or job name doesn’t match exactly one resource, the operator raises an error with the available names.

When you already know the UUIDs (e.g. from Terraform outputs or environment variables):

from airflow.providers.datarecs.operators.trigger_job_run import TriggerJobRunOperator
trigger = TriggerJobRunOperator(
task_id="trigger",
job_id="b1c2d3e4-...",
workspace_id="a1b2c3d4-...",
run_label="nightly-{{ ds }}",
)

Use these operators to discover available resources, or to build dynamic DAGs:

from airflow.providers.datarecs.operators.list_workspaces import ListWorkspacesOperator
from airflow.providers.datarecs.operators.list_jobs import ListJobsOperator
list_ws = ListWorkspacesOperator(task_id="list_workspaces")
list_jobs = ListJobsOperator(
task_id="list_jobs",
workspace_id="{{ ti.xcom_pull(task_ids='list_workspaces', key='workspaces')[0]['id'] }}",
)

Both push their results to XCom (workspaces and jobs keys respectively), so downstream tasks can use them.

JobRunSensor polls the DataRecs API until the run reaches a terminal state:

  • COMPLETED — sensor succeeds, pushes run_result to XCom.
  • FAILED — sensor raises AirflowException with the error details.
  • CANCELLED — sensor raises AirflowException.
from airflow.providers.datarecs.sensors.job_run import JobRunSensor
wait = JobRunSensor(
task_id="wait_for_completion",
job_id="{{ ti.xcom_pull(task_ids='trigger', key='job_id') }}",
workspace_id="{{ ti.xcom_pull(task_ids='trigger', key='workspace_id') }}",
run_id="{{ ti.xcom_pull(task_ids='trigger', key='run_id') }}",
poke_interval=30, # seconds between polls
timeout=7200, # max wait time in seconds
mode="poke",
)

After the sensor confirms completion, use GetJobRunResultOperator to get the row counts and status:

from airflow.providers.datarecs.operators.get_job_run_result import GetJobRunResultOperator
result = GetJobRunResultOperator(
task_id="get_result",
job_id="{{ ti.xcom_pull(task_ids='trigger', key='job_id') }}",
workspace_id="{{ ti.xcom_pull(task_ids='trigger', key='workspace_id') }}",
run_id="{{ ti.xcom_pull(task_ids='trigger', key='run_id') }}",
)

The operator pushes run_result to XCom with fields: status, rows_processed, rows_matched, rows_unmatched, started_at, completed_at, and error.

IssueResolution
AirflowConfigException: Invalid or expired API keyCheck the API key in your Airflow connection. Ensure it hasn’t been revoked.
AirflowException: Resource not foundVerify the job ID / workspace ID (or names) are correct.
AirflowException: Workspace 'X' not foundThe workspace name doesn’t match any workspace. Check spelling and case.
AirflowException: Multiple jobs named 'X' foundJob names must be unique within a workspace when using TriggerJobRunByNameOperator. Use TriggerJobRunOperator with explicit IDs instead.
Sensor times outIncrease the timeout parameter, or check the DataRecs console for stuck runs.