Scheduling with Airflow
The DataRecs Airflow provider (apache-airflow-provider-datarecs) lets you orchestrate reconciliation jobs from Apache Airflow. You can trigger jobs by ID or by name, wait for them to complete, and retrieve results — all as native Airflow tasks.
Prerequisites
Section titled “Prerequisites”- Apache Airflow 2.7+ (or Airflow 3.x).
- A DataRecs API key with permission to trigger jobs. See Managing API Keys.
Install the provider
Section titled “Install the provider”pip install apache-airflow-provider-datarecsOr, if you build a custom Airflow image:
COPY dist/apache_airflow_provider_datarecs-*.whl /tmp/provider/RUN pip install --no-cache-dir /tmp/provider/*.whlConfigure the Airflow connection
Section titled “Configure the Airflow connection”- In the Airflow UI, go to Admin → Connections.
- Click + to create a new connection.
- Set Connection Type to
datarecs. - Set Host to your DataRecs API URL (e.g.
https://api.datarecs.io). - Set API Key (password field) to your DataRecs API key.
- Save as
datarecs_default(or any name — just passconn_idto operators).
Available operators and sensors
Section titled “Available operators and sensors”| Component | Description |
|---|---|
TriggerJobRunOperator | Trigger a job run by job ID and workspace ID. |
TriggerJobRunByNameOperator | Trigger a job run by workspace name and job name (resolves IDs automatically). |
TriggerJobGroupRunOperator | Trigger all jobs in a job group. |
ListWorkspacesOperator | List all workspaces the API key can access. |
ListJobsOperator | List jobs, optionally filtered by workspace ID. |
JobRunSensor | Wait for a job run to reach a terminal status. |
JobGroupRunSensor | Wait for a job group run to complete. |
GetJobRunResultOperator | Retrieve the result of a completed job run. |
Trigger a job by name
Section titled “Trigger a job by name”The simplest way to run a reconciliation from Airflow. You provide the workspace and job by their human-readable names — the operator resolves the IDs for you.
from airflow.models.dag import DAGfrom airflow.models.param import Param
from airflow.providers.datarecs.operators.trigger_job_run_by_name import TriggerJobRunByNameOperatorfrom airflow.providers.datarecs.operators.get_job_run_result import GetJobRunResultOperatorfrom airflow.providers.datarecs.sensors.job_run import JobRunSensor
with DAG( dag_id="reconcile_daily_balances", schedule="@daily", catchup=False, tags=["datarecs"], params={ "workspace_name": Param(default="Production", type="string"), "job_name": Param(default="daily-balance-check", type="string"), },) as dag:
trigger = TriggerJobRunByNameOperator( task_id="trigger", workspace_name="{{ params.workspace_name }}", job_name="{{ params.job_name }}", )
wait = JobRunSensor( task_id="wait", job_id="{{ ti.xcom_pull(task_ids='trigger', key='job_id') }}", workspace_id="{{ ti.xcom_pull(task_ids='trigger', key='workspace_id') }}", run_id="{{ ti.xcom_pull(task_ids='trigger', key='run_id') }}", poke_interval=30, timeout=7200, )
result = GetJobRunResultOperator( task_id="result", job_id="{{ ti.xcom_pull(task_ids='trigger', key='job_id') }}", workspace_id="{{ ti.xcom_pull(task_ids='trigger', key='workspace_id') }}", run_id="{{ ti.xcom_pull(task_ids='trigger', key='run_id') }}", )
trigger >> wait >> resultIf the workspace or job name doesn’t match exactly one resource, the operator raises an error with the available names.
Trigger a job by ID
Section titled “Trigger a job by ID”When you already know the UUIDs (e.g. from Terraform outputs or environment variables):
from airflow.providers.datarecs.operators.trigger_job_run import TriggerJobRunOperator
trigger = TriggerJobRunOperator( task_id="trigger", job_id="b1c2d3e4-...", workspace_id="a1b2c3d4-...", run_label="nightly-{{ ds }}",)List workspaces and jobs
Section titled “List workspaces and jobs”Use these operators to discover available resources, or to build dynamic DAGs:
from airflow.providers.datarecs.operators.list_workspaces import ListWorkspacesOperatorfrom airflow.providers.datarecs.operators.list_jobs import ListJobsOperator
list_ws = ListWorkspacesOperator(task_id="list_workspaces")
list_jobs = ListJobsOperator( task_id="list_jobs", workspace_id="{{ ti.xcom_pull(task_ids='list_workspaces', key='workspaces')[0]['id'] }}",)Both push their results to XCom (workspaces and jobs keys respectively), so downstream tasks can use them.
Wait for completion
Section titled “Wait for completion”JobRunSensor polls the DataRecs API until the run reaches a terminal state:
- COMPLETED — sensor succeeds, pushes
run_resultto XCom. - FAILED — sensor raises
AirflowExceptionwith the error details. - CANCELLED — sensor raises
AirflowException.
from airflow.providers.datarecs.sensors.job_run import JobRunSensor
wait = JobRunSensor( task_id="wait_for_completion", job_id="{{ ti.xcom_pull(task_ids='trigger', key='job_id') }}", workspace_id="{{ ti.xcom_pull(task_ids='trigger', key='workspace_id') }}", run_id="{{ ti.xcom_pull(task_ids='trigger', key='run_id') }}", poke_interval=30, # seconds between polls timeout=7200, # max wait time in seconds mode="poke",)Retrieve results
Section titled “Retrieve results”After the sensor confirms completion, use GetJobRunResultOperator to get the row counts and status:
from airflow.providers.datarecs.operators.get_job_run_result import GetJobRunResultOperator
result = GetJobRunResultOperator( task_id="get_result", job_id="{{ ti.xcom_pull(task_ids='trigger', key='job_id') }}", workspace_id="{{ ti.xcom_pull(task_ids='trigger', key='workspace_id') }}", run_id="{{ ti.xcom_pull(task_ids='trigger', key='run_id') }}",)The operator pushes run_result to XCom with fields: status, rows_processed, rows_matched, rows_unmatched, started_at, completed_at, and error.
Troubleshooting
Section titled “Troubleshooting”| Issue | Resolution |
|---|---|
AirflowConfigException: Invalid or expired API key | Check the API key in your Airflow connection. Ensure it hasn’t been revoked. |
AirflowException: Resource not found | Verify the job ID / workspace ID (or names) are correct. |
AirflowException: Workspace 'X' not found | The workspace name doesn’t match any workspace. Check spelling and case. |
AirflowException: Multiple jobs named 'X' found | Job names must be unique within a workspace when using TriggerJobRunByNameOperator. Use TriggerJobRunOperator with explicit IDs instead. |
| Sensor times out | Increase the timeout parameter, or check the DataRecs console for stuck runs. |