Hevo Sensor
The HevoSensor is a special type of operator that supports two modes:
-
Explicit Job ID: In this mode, the sensor monitors a specific job indicated by the ID.
-
Auto-discovery: In this mode, the sensor automatically identifies the relevant job in the Hevo Pipeline and monitors it.
Parameters
The HevoSensor accepts the following parameters:
Required
| Name | Type | Default | Description |
|---|---|---|---|
pipeline_idNote: Supports Jinja templating. |
INT |
NA | The unique identifier of the Hevo Pipeline to be monitored. |
Optional
| Name | Type | Default | Description |
|---|---|---|---|
job_idNote: Supports Jinja templating. |
STR |
None | The ID of the job to be monitored. If a job ID is specified, the sensor monitors that job. If not, it identifies the active job via auto-discovery. |
job_typeNote: This parameter is used when a job_id is not specified. |
- JobType- STR
|
INCREMENTAL | The type of job to be monitored when a job ID is not specified. The trigger discovers the relevant job based on the specified type. Allowable values: INCREMENTAL: A job that syncs new and updated data. HISTORICAL: A job that syncs all existing data from the Source. RESYNC: A job that re-ingests historical data for all active objects in the Pipeline and updates the Destination tables if changes are detected in the re-ingested data. RESYNC_WITH_DROP_AND_LOAD: A job that drops and recreates the Destination tables for all active objects in the Pipeline. During this process, the tables are temporarily unavailable. Hevo then re-ingests all data from the Source objects and loads it into the newly created Destination tables. |
connection_id |
STR |
hevo_airflow_conn_id | The ID of the connection created in Airflow for Hevo API credentials and connection details. |
poke_interval |
INT |
15 | The time (in seconds) between status checks while waiting for the Pipeline job to complete. |
accept_completed_with_failures |
BOOL |
False | A flag to treat the Pipeline job’s Completed with Failures status as success. |
deferrable |
BOOL |
True Note: It is recommended to set this parameter to True in production environments. |
A flag to determine whether the task should release the worker slot while waiting for the job to complete. Allowable values: True: Retains the worker slot for the entire runtime duration. False: Releases the worker slot and delegates the monitoring logic to the Triggerer service. Note: The Airflow Triggerer service must be running. |
wait_for_job_max_attempts |
INT |
10 | The maximum number of attempts to identify active jobs through auto-discovery. |
wait_for_job_interval |
INT |
5 | The time (in seconds) between each attempt to discover active jobs. |
wait_for_job_initial_delay |
INT |
10 | The time (in seconds) that the task should wait before attempting to discover the relevant job. This delay allows for the job to be created in the Pipeline. |
The following table lists the methods that the HevoSensor provides:
| Method | Description |
|---|---|
execute |
Runs the monitoring logic for the Pipeline job. The execution flow is decided by the deferrable parameter. |
_get_job_id |
Obtains the ID of the job to be monitored through the job_id parameter or by auto-discovery using the job_type parameter. |
poke |
Checks if the job being monitored, through explicit job ID or auto-discovery, has completed. The job statuses that the method monitors are: - JobCompletionStatus.COMPLETED: Job completed successfully.- JobCompletionStatus.COMPLETED_WITH_FAILURES: Job finished with some failures.- JobCompletionStatus.FAILED: Job failed, cancelled, skipped, or deferred with failure.- JobCompletionStatus.PENDING: Job is still running. |
execute_complete |
Handles the trigger completion event, validating the event, and logging the results. |
Example
The following example creates two HevoSensor tasks: wait_for_sync and wait_for_historical_sync.
-
The
wait_for_synctask runs for a specific job ID. This ID, generated by thetrigger_synctask, is retrieved from Airflow’s cross-communication (XCom) storage. -
The
wait_for_historical_synctask runs in the auto-discovery mode. It retrieves the relevant job ID from the Hevo Pipeline using thejob_typeparameter.
from airflow.hevo.sensors.hevo_sensor import HevoSensor
from airflow.hevo.models.job import JobType
# With explicit job_id from XCom
wait_for_job = HevoSensor(
task_id="wait_for_sync",
pipeline_id=123,
job_id="{{ ti.xcom_pull(task_ids='trigger_sync') }}",
poke_interval=10,
accept_completed_with_failures=True,
deferrable=True,
timeout=3600 # Airflow sensor parameter
)
# With auto-discovery
auto_discover_sensor = HevoSensor(
task_id="wait_for_historical_sync",
pipeline_id=456,
job_type=JobType.HISTORICAL,
wait_for_job_initial_delay=15,
wait_for_job_max_attempts=20,
wait_for_job_interval=5,
deferrable=True
)