Exterior Job Sensors cease unhealthy information from trickling downstream in a knowledge pipeline. Leverage them to create a dependable information infrastructure.
Orchestrating a knowledge pipeline is a fragile endeavor. In a knowledge pipeline, we are able to have hundreds of duties operating concurrently and they’re typically depending on each other. If we’re not cautious, a single level of failure can have a domino-like impact that trickles downstream and mess up the entire pipeline.
Apache Airflow launched the Exterior Job Sensor to place an finish to those points. Whereas it’s an especially highly effective function, it additionally comes with a point of complexity.
On this introductory piece, I hope to untangle a number of the confusion surrounding the Exterior Job Sensor and present how we are able to use it to reinforce the reliability of our information pipelines — making sense of sensors!
Meet Jamie, a rookie chef at Airflow Bakery. She’s new. Her solely accountability is to make a brand new batch of cookie dough each hour.
After which now we have Gordon Damnsie, the cookie grasp. Gordon takes the dough from Jamie and turns them into award-winning cookies.
One tremendous day, Gordon swoops in to seize the freshest dough he can discover and bakes cookies. However when he takes a chew, yuck! “Unhealthy” would’ve been an understatement. Gordon shortly discovers the foundation trigger was the stale dough, which was left over from per week in the past.
Gordon, visibly pissed off, tosses the cookies into the bin. After he composes himself, he slowly turns to Jamie and asks, “Why is the dough not contemporary?”
“I needed to cease making them, Chef. There was an issue with the uncooked elements,” Jamie replies, making an attempt to remain calm within the face of Gordon’s anger. Sadly, the unhealthy cookies had already been served to shoppers they usually not belief the meals high quality of the bakery.
This slight detour is a cautionary story on the significance of validating the freshness of information sources. Within the story, Gordon’s success relies on Jamie, however they’re working independently with out speaking with one another. They “belief” that the opposite individual will do their job flawlessly. However as any information practitioner will know, all the things that may go unsuitable will go unsuitable in a knowledge pipeline.
Ideally, Gordon ought to test with Jamie whether or not she made dough not too long ago. As soon as he has confirmed, it implies that the dough is contemporary so he can proceed to bake his cookies. In any other case, cease baking and work out what went unsuitable.
You see, what Gordon wants… is an exterior job sensor.
An exterior job sensor checks whether or not different folks accomplished their assigned job. It senses the completion of an exterior job, therefore the identify.
Within the context of Airflow, Jamie and Gordon are DAGs. They’ve particular duties that they should full.
After we add an Exterior Job Sensor, it turns into the intermediary that coordinates between the 2 unbiased DAGs. The sensor will test on Jamie at a selected time to see if she has accomplished her job.
If Jamie efficiently completes her job, the sensor will inform Gordon in order that he can keep on together with his downstream duties.
If Jamie fails to finish her job, the sensor stops Gordon from doing any duties which have a dependency on the failed job.
Having this extra layer of validation basically stops stale information from trickling additional downstream and polluting the remainder of our pipeline with soiled, inaccurate information.
Airflow makes it very simple to create an Exterior Job Sensor — simply import them. The syntax will look one thing like this:
from airflow.sensors.external_task import ExternalTaskSensorext_task_sensor = ExternalTaskSensor(
dag=gordon_tasks,
task_id='check_dough_freshness',
external_dag_id='jamie_tasks',
external_task_id='make_new_dough',
e-mail=['gordon.damnsie@gmail.com', 'jamie@gmail.com'],
execution_delta=timedelta(minutes=30),
# execution_date_fn=my_function,
timeout=1800,
poke_interval=300,
mode='reschedule'
)
Right here’s what they imply:
dag
is the present DAG object. Since Gordon is the one who needs to test whether or not Jamie made dough, this could level to Gordon’s DAG.task_id
is the distinctive identify for this Exterior Job Sensor.external_dag_id
is the identify of the DAG you need to test. On this case, Jamie’s DAG.external_task_id
is the identify of the particular job you need to test. Ideally, we must always all the time specify this. In any other case, the sensor will test for the completion of the total DAG as an alternative of only one particular job. In different phrases, Gordon will do nothing till Jamie finishes chopping onions, washing dishes, and restocking the pantry, though we solely need to know whether or not she made dough. Or worse, if any one in all these irrelevant duties fails, the sensor will unnecessarily pause all the pipeline.e-mail
is the record of individuals you need Airflow to inform when the Exterior Job Sensor fails. Take into account that for this to work, you have to have the SMTP settings correctly configured within the Airflow configuration file.execution_delta
is arguably probably the most complicated half about Exterior Job Sensors but in addition crucial. So, I’m dedicating a complete part to it beneath. Maintain scrolling!execution_date_fn
and execution delta are very related. We are able to solely use one in all them at a time. Typically it’s simpler to make use of this slightly than execution delta. I’m additionally giving this its personal part beneath.timeout
limits how lengthy a sensor can keep alive. After we create a sensor, it consumes sources by occupying one employee slot. If the goal job by no means completes, these sensors will preserve checking indefinitely whereas hogging the employee slot. Over time, we are able to run right into a Sensor Impasse, the place all employee slots turn out to be occupied by ineffective sensors and no duties can run anymore. Subsequently, it’s greatest follow to set a most time restrict for the checks.poke_interval
is the length earlier than the sensor checks once more if the earlier test fails. The rationale is that we don’t need the sensor to test excessively like a madman, because it provides pointless hundreds to the server. On the flip aspect, checking too sometimes means the sensor will wait longer than obligatory, delaying the pipeline. The trick is to search out the candy spot primarily based on the anticipated run time of the exterior job.mode
is how we would like the sensor to behave. It may be set to “poke” or “reschedule”.
When set to “poke”, the sensor goes to sleep on failure and wakes up on the subsequent poke interval to attempt once more. It’s like being on standby mode. The sensor can be extra reactive, however because it’s on standby, the employee slot stays occupied all through the entire course of.
When set to “reschedule”, the sensor will test as soon as. If the test fails, the sensor will schedule one other test at a later time however terminates itself for now, releasing up the employee slot. Airflow recommends utilizing “reschedule” if the poke interval is bigger than 60 seconds.
Alright, that’s nearly each parameter we have to learn about Exterior Job Sensor. Granted that this record just isn’t exhaustive, understanding these 10 parameters can be greater than sufficient for us to arrange our Exterior Job Sensor correctly for nearly all use instances.
For completeness’ sake, I’ll embody Airflow’s official documentation for individuals who are desperate to discover it in additional element.
Within the part above, I’ve glossed over these two parameters as a result of they’re arguably probably the most infamous, annoying, and complicated a part of exterior job sensors. However I believe it’s time we sort out them.
So what are execution_delta
and execution_date_fn
?
Constructing on our analogy, external_task_id
tells the sensor to test if Jamie accomplished the make_dough()
job. However she makes a whole lot of dough — as soon as each hour. Are we checking if she baked previously hour, yesterday, or final week?
This ambiguity confuses Exterior Job Sensors and that’s why Airflow got here up with two methods for us to speak this info. Each execution_delta
and execution_date_fn
are supposed to inform sensors the particular time of the duty.
execution_delta
expresses time on a relative foundation, e.g.: “Did Jamie bake half-hour in the past?” It accepts adatetime.timedelta
object as its argument, e.g:datetime.timedelta(minutes=30)
.execution_date_fn
expresses time on an absolute foundation, e.g.: “Did Jamie bake on the third Could 2023 at 4.30 pm?” It accepts a callable Python perform as its argument. This perform ought to return the execution date of the duty that we need to test on, e.g:datetime.datetime(12 months=2023,month=5,day=3,hour=4,minute=30)
.
Since each of them convey the identical info, Airflow solely permits us to make use of one or the opposite, however not each on the similar time.
I usually use execution_delta
because the de-facto alternative. However, there are situations the place it’s too sophisticated to calculate the execution_delta
. In that case, I’d use execution_date_fn
as an alternative.
calculate execution_delta?
The phrase, execution_delta
, is brief for delta (a.ok.a distinction) of execution dates (a.ok.a the earlier runtime of our duties).
I’d like to focus on the key phrase right here — “earlier”.
A few of you might be questioning… Why does Airflow need the time distinction of earlier runs, however not the present runs? This used to confuse the crap out of me once I first began utilizing Airflow.
Seems there’s a completely good cause. Nevertheless, I don’t need to derail from the subject at hand so I’ll embody it within the later part (right here). For now, let’s simply settle for the components as-is and see how we’d apply this.
Suppose that Jamie makes dough each hour (e.g: 13:00 pm, 14:00 pm, 15:00 pm, …). Gordon additionally makes cookies each hour, however he makes them on the thirtieth minute of each hour (e.g: 13:30 pm, 14:30 pm, 15:30 pm, …).
At 14:30 pm sharp, Gordon will get able to bake his cookie. Earlier than he begins, he would want to test if Jamie made contemporary dough not too long ago. The newest run for make_dough()
could be 14:00 pm.
On condition that each Gordon and Jamie’s duties are scheduled hourly, their execution date (a.ok.a earlier runs) for the 14:30 pm run could be…
- Gordon’s execution date = 14:30 pm — 1 hour = 13:30 pm
- Jamie’s execution date = 14:00 pm — 1 hour = 13:00 pm
We are able to plug these values into the components, and voilà!
execution_delta
comes out to be datetime.timedelta(minute=30) for one particular run
. Picture by writer.You are able to do the identical calculation for various runs of the duties to get their respective execution_delta
.
On this (cherry-picked) instance, all the execution_delta
seems to be precisely the identical. We are able to cross this to our Exterior Job Sensor and all the things will work.
from airflow.sensors.external_task import ExternalTaskSensorext_task_sensor = ExternalTaskSensor(
dag=gordon_tasks,
task_id='check_dough_freshness',
external_dag_id='jamie_tasks',
external_task_id='make_new_dough',
e-mail=['gordon.damnsie@gmail.com', 'jamie@gmail.com'],
execution_delta=timedelta(minutes=30), # Cross the execution delta right here
timeout=1800,
poke_interval=300,
mode='reschedule'
)
However-!
The execution_delta
can be totally different typically. This often occurs when the schedule intervals of the 2 dags are totally different (e.g.: every day vs weekly, every day vs month-to-month, …).
For instance, let’s say that Jamie makes her dough weekly on Sunday at 14:00 pm, however Gordon makes his cookies every day at 14:30 pm.
If we do the identical calculations, you will note that the execution deltas differ for each run.
This turns into an issue as a result of execution_delta
solely accepts a single datetime
object as its argument. We are able to’t enter a unique worth of execution_delta
for each run.
In instances like this, we’d like execution_date_fn
.
calculate Execution Date Operate?
The execution_date_fn
is only a common Python perform. As with all Python features, it takes some argument(s) and returns some output(s). However the fantastic thing about utilizing a perform is the power to return a unique output primarily based on the perform’s inputs and logic.
Within the case of execution_date_fn
, Airflow passes the present job’s execution date as an argument and expects the perform to return the exterior job’s execution date. Word that these execution dates have to be expressed in UTC time.
def my_exec_date_fn(gordon_exec_date):
# Add your logic right here.
return jamie_exec_dateext_task_sensor = ExternalTaskSensor(
dag=gordon_tasks,
task_id='check_dough_freshness',
external_dag_id='jamie_tasks',
external_task_id='make_new_dough',
e-mail=['gordon.damnsie@gmail.com', 'jamie@gmail.com'],
execution_date_fn=my_exec_date_fn, # Cross the perform right here.
timeout=1800,
poke_interval=300,
mode='reschedule'
)
Primarily based on our earlier case research, our execution_date_fn
would want to do the next…
One naive manner might be hardcoding each single run, till the top of time.
# The naive manner (It is a unhealthy follow. Do not do that.)
def my_exec_date_fn(gordon_exec_date):
if gordon_exec_date == datetime(12 months=2023,month=3,day=14,hour=6,minute=30):
jamie_exec_date = datetime(12 months=2023,month=3,day=5,hour=6,minute=0)
elif gordon_exec_date == datetime(12 months=2023,month=3,day=15,hour=6,minute=30):
jamie_exec_date = datetime(12 months=2023,month=3,day=5,hour=6,minute=0)
elif gordon_exec_date == datetime(12 months=2023,month=3,day=16,hour=6,minute=30):
jamie_exec_date = datetime(12 months=2023,month=3,day=5,hour=6,minute=0)
elif gordon_exec_date == datetime(12 months=2023,month=3,day=17,hour=6,minute=30):
jamie_exec_date = datetime(12 months=2023,month=3,day=5,hour=6,minute=0)
...return jamie_exec_date
This works however it’s undoubtedly not probably the most environment friendly manner.
A greater strategy is to search for constant patterns and use that to programmatically derive the outputs. Often, place to search for patterns is the execution_delta
, because it comprises the connection between the execution dates (we talked about this right here).
Moreover, we are able to additionally take a look at datetime
attributes, such because the day of the week. If we actually give it some thought, our Exterior Job Sensor will all the time be pointing to a Sunday as a result of Jamie solely makes dough on Sunday. As we transfer by the week, Gordon’s job date can be additional and additional away from this Sunday till it resets once more the subsequent Sunday. Then, it repeats.
This means that day of the week will also be useful in developing with our execution_date_fn
. So let’s add the day of the week to our desk. I’ll be labeling Monday as 1 and Sunday as 7 as per the ISO 8601 normal.
By labeling them, it turns into instantly clear that…
- The
execution_delta
begins from 6 on a Saturday. - The
execution_delta
will increase by 1 day-after-day, as much as a most of 12 each Friday. - The
execution_delta
then resets again to a 6 on a Saturday.
We are able to re-create that relationship in a Python perform and assign this execution_date_fn
to our Exterior Job Sensor.
def my_exec_date_fn(gordon_exec_date):
day_of_week = gordon_exec_date.isoweekday()if day_of_week in (6, 7):
time_diff = timedelta(days=day_of_week, minute=30)
jamie_exec_date = gordon_exec_date - time_diff
elif day_of_week in (1, 2, 3, 4, 5):
time_diff = timedelta(days=day_of_week+7, minute=30)
jamie_exec_date = gordon_exec_date - time_diff
return jamie_exec_date
ext_task_sensor = ExternalTaskSensor(
dag=gordon_tasks,
task_id='check_dough_freshness',
external_dag_id='jamie_tasks',
external_task_id='make_new_dough',
e-mail=['gordon.damnsie@gmail.com', 'jamie@gmail.com'],
execution_date_fn=my_exec_date_fn,
timeout=1800,
poke_interval=300,
mode='reschedule'
)
There now we have it — our very personal execution_date_fn
. With a little bit of creativity, execution_date_fn
can cater to any situation.
Up till this level, we’ve lined all the things you have to know to get began with Exterior Job Sensor. On this part, I believed it’d be good to collate all the issues we’ve discovered to see how the items match collectively in our information pipelines.
To begin with, we’ll be creating Jamie DAG, in a file known as jamie_dag.py
.
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.sensors.external_task import ExternalTaskSensor# Outline job 1
def make_dough():
# embody your secret recipe right here!
return cookies
# Create DAG
jamie_tasks = DAG(
dag_id='jamie_tasks',
description='Jamie to do record. (a.ok.a making dough solely)',
schedule_interval='5 3 * * *',
...
)
# Embody job 0 in DAG (as a place to begin)
begin = DummyOperator(
dag=jamie_tasks,
task_id='begin'
)
# Embody job 1 in DAG
make_dough = PythonOperator(
dag=jamie_tasks,
task_id='make_dough',
python_callable=make_dough,
...
)
# Create dependencies (deciding the sequence of job to run)
begin >> make_dough
Then, we’ll be creating Gordon DAG, in one other file known as gordon_dag.py
.
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.sensors.external_task import ExternalTaskSensor# Outline job 1
def bake_cookies():
# embody your secret recipe right here!
return cookies
# Outline job 2
def make_money():
# embody your cash making approach step-by-step right here.
return cash
# Outline execution_date_fn for sensor 1
def my_exec_date_fn(gordon_exec_date):
day_of_week = gordon_exec_date.isoweekday()
if day_of_week in (6, 7):
time_diff = timedelta(days=day_of_week, minute=30)
jamie_exec_date = gordon_exec_date - time_diff
elif day_of_week in (1, 2, 3, 4, 5):
time_diff = timedelta(days=day_of_week+7, minute=30)
jamie_exec_date = gordon_exec_date - time_diff
return jamie_exec_date
# Create DAG
gordon_tasks = DAG(
dag_id='gordon_tasks',
description='Record of issues that Gordon must do.',
schedule_interval='5 3 * * *',
...
)
# Embody job 0 in DAG (as a place to begin)
begin = DummyOperator(
dag=gordon_tasks,
task_id='begin'
)
# Embody job 1 in DAG
bake_cookies = PythonOperator(
dag=gordon_tasks,
task_id='bake_cookies',
python_callable=bake_cookies,
...
)
# Embody job 2 in DAG
make_money = PythonOperator(
dag=gordon_tasks,
task_id='make_money',
python_callable=make_money,
...
)
# Create sensor 1
check_dough_freshness = ExternalTaskSensor(
dag=gordon_tasks,
task_id='check_dough_freshness',
external_dag_id='jamie_tasks',
external_task_id='make_new_dough',
e-mail=['gordon.damnsie@gmail.com', 'jamie@gmail.com'],
execution_date_fn=my_exec_date_fn,
timeout=1800,
poke_interval=300,
mode='reschedule'
)
# Create dependencies (deciding the sequence of job to run)
(begin
>> check_dough_freshness
>> bake_cookies
>> make_money)
Word that Exterior Job Sensor is in gordon_dag.py
and never jamie_dag.py
since we would like Gordon to be checking on Jamie, not the opposite manner round. Gordon’s DAG could be the present DAG and Jamie the exterior DAG.
And… there now we have it!
We’ve created our very first Exterior Job Sensor, check_dough_fresness
. This sensor will poke Jamie’s make_new_dough()
returns both Success or Fail. If it fails, bake_cookies()
and make_money()
is not going to run.
Dates in Apache Airflow are complicated as a result of there are such a lot of date-related terminologies, reminiscent of start_date
, end_date
, schedule_interval
, execution_date
, and so forth. It’s a large number, actually. However let’s try to determine it out with a narrative.
Suppose that our boss needs to know the gross sales efficiency of his firm. He needs this information to be refreshed day-after-day at 12 midnight for the subsequent 6 months.
First, we write an advanced SQL question that generates the gross sales efficiency information. It takes 6 hours to run the question.
task_start
is the beginning time of a job.task_end
is the top time of a job.task_duration
is the time it takes to run the duty.
Every single day, we might want to run this job at 12 midnight.
To automate this question, we create an Airflow DAG and specify the start_date
and end_date
. Airflow will execute the DAG so long as immediately’s date falls inside this era.
Then, we put the duty into the Airflow DAG.
We’d like this information refreshed as soon as a day at 12 midnight. So, we set the schedule_interval
to "0 0 * * *"
, which is the CRON equal of every day at 12 midnight.
The schedule_interval
basically provides a delay between every consecutive schedule, telling Airflow solely run the duty at a selected time, since we don’t need the duty to re-run once more as quickly because it finishes.
interval_start
refers back to the begin time of a specific schedule interval.interval_end
refers back to the finish time of a specific schedule interval.
Right here comes probably the most mind-blowing half — though seemingly counterintuitive, Airflow Scheduler triggers a DAG run on the finish of its schedule interval, slightly than in the beginning of it.
Which means Airflow is not going to do something within the first-ever schedule interval. Our question will run for the primary time on 2nd Jan 2023 at 12 am.
It is because Airflow is initially created as an ETL device. It’s constructed on the concept information from a time frame will get summarised on the finish of the interval.
For instance, if we wished to know the gross sales of cookies for the first of January, we wouldn’t create a gross sales report on the first of January at 1 pm as a result of the day hasn’t ended but and the gross sales quantity could be incomplete. As a substitute, we’d solely course of the information when the clock strikes 12 midnight. Right now, we can be processing yesterday’s information.
Why is that this necessary?
Since we’re summarizing the earlier run’s information, the gross sales report we’re producing on the 2nd of Jan describes the first of Jan gross sales, not the 2nd of Jan gross sales.
For that cause, Airflow finds it extra significant to confer with this run as the first of Jan run though it’s executed on the 2nd. To higher differentiate the dates, Airflow provides a particular identify to the start of a schedule interval—execution_date
.
That is why we all the time take the distinction of the “earlier” run once we calculate execution_delta
as a result of it’s the delta of the execution_dates
, which is actually the “earlier” run.
Exterior Job Sensors are like gatekeepers. They cease unhealthy information from going downstream by ensuring that duties are executed in a selected order and that the required dependencies are met earlier than continuing with subsequent duties.
For individuals who have by no means used Exterior Job Sensors earlier than, I hope the article was capable of convey its significance and persuade you to begin utilizing them. For individuals who have been utilizing them, I hope a number of the insights listed below are capable of assist deepen your understanding.
Thanks on your time, and have an incredible day.