Tuesday, May 30, 2023
HomeArtificial IntelligenceMastering ExternalTaskSensor in Apache Airflow: Calculate Execution Delta | by Casey...

Mastering ExternalTaskSensor in Apache Airflow: Calculate Execution Delta | by Casey Cheng | Could, 2023


Exterior Job Sensors cease unhealthy information from trickling downstream in a knowledge pipeline. Leverage them to create a dependable information infrastructure.

Exterior Job Sensors are like gatekeepers — they cease unhealthy information from trickling downstream. Picture by Freepik.

Orchestrating a knowledge pipeline is a fragile endeavor. In a knowledge pipeline, we are able to have hundreds of duties operating concurrently and they’re typically depending on each other. If we’re not cautious, a single level of failure can have a domino-like impact that trickles downstream and mess up the entire pipeline.

Apache Airflow launched the Exterior Job Sensor to place an finish to those points. Whereas it’s an especially highly effective function, it additionally comes with a point of complexity.

On this introductory piece, I hope to untangle a number of the confusion surrounding the Exterior Job Sensor and present how we are able to use it to reinforce the reliability of our information pipelines — making sense of sensors!

Meet Jamie, a rookie chef at Airflow Bakery. She’s new. Her solely accountability is to make a brand new batch of cookie dough each hour.

Jamie’s tasks as proven in a “DAG” format. Chef (F) icon by Freepik.

After which now we have Gordon Damnsie, the cookie grasp. Gordon takes the dough from Jamie and turns them into award-winning cookies.

Gordon’s tasks as proven in a “DAG” format. Chef (M) icon by Freepik.

One tremendous day, Gordon swoops in to seize the freshest dough he can discover and bakes cookies. However when he takes a chew, yuck! “Unhealthy” would’ve been an understatement. Gordon shortly discovers the foundation trigger was the stale dough, which was left over from per week in the past.

Gordon, visibly pissed off, tosses the cookies into the bin. After he composes himself, he slowly turns to Jamie and asks, “Why is the dough not contemporary?”

“I needed to cease making them, Chef. There was an issue with the uncooked elements,” Jamie replies, making an attempt to remain calm within the face of Gordon’s anger. Sadly, the unhealthy cookies had already been served to shoppers they usually not belief the meals high quality of the bakery.

This slight detour is a cautionary story on the significance of validating the freshness of information sources. Within the story, Gordon’s success relies on Jamie, however they’re working independently with out speaking with one another. They “belief” that the opposite individual will do their job flawlessly. However as any information practitioner will know, all the things that may go unsuitable will go unsuitable in a knowledge pipeline.

Ideally, Gordon ought to test with Jamie whether or not she made dough not too long ago. As soon as he has confirmed, it implies that the dough is contemporary so he can proceed to bake his cookies. In any other case, cease baking and work out what went unsuitable.

You see, what Gordon wants… is an exterior job sensor.

An exterior job sensor checks whether or not different folks accomplished their assigned job. It senses the completion of an exterior job, therefore the identify.

Within the context of Airflow, Jamie and Gordon are DAGs. They’ve particular duties that they should full.

After we add an Exterior Job Sensor, it turns into the intermediary that coordinates between the 2 unbiased DAGs. The sensor will test on Jamie at a selected time to see if she has accomplished her job.

If Jamie efficiently completes her job, the sensor will inform Gordon in order that he can keep on together with his downstream duties.

The exterior job sensor — check_dough() returns as successful after verifying that make_dough() ran efficiently. Chef (F) and Chef (M) icons by Freepik.

If Jamie fails to finish her job, the sensor stops Gordon from doing any duties which have a dependency on the failed job.

The exterior job sensor — check_dough() returns as a fail after verifying that make_dough() didn’t run efficiently. Chef (F) and Chef (M) icons by Freepik.

Having this extra layer of validation basically stops stale information from trickling additional downstream and polluting the remainder of our pipeline with soiled, inaccurate information.

Airflow makes it very simple to create an Exterior Job Sensor — simply import them. The syntax will look one thing like this:

from airflow.sensors.external_task import ExternalTaskSensor

ext_task_sensor = ExternalTaskSensor(
dag=gordon_tasks,
task_id='check_dough_freshness',
external_dag_id='jamie_tasks',
external_task_id='make_new_dough',
e-mail=['gordon.damnsie@gmail.com', 'jamie@gmail.com'],
execution_delta=timedelta(minutes=30),
# execution_date_fn=my_function,
timeout=1800,
poke_interval=300,
mode='reschedule'
)

Right here’s what they imply:

  1. dag is the present DAG object. Since Gordon is the one who needs to test whether or not Jamie made dough, this could level to Gordon’s DAG.
  2. task_id is the distinctive identify for this Exterior Job Sensor.
  3. external_dag_id is the identify of the DAG you need to test. On this case, Jamie’s DAG.
  4. external_task_id is the identify of the particular job you need to test. Ideally, we must always all the time specify this. In any other case, the sensor will test for the completion of the total DAG as an alternative of only one particular job. In different phrases, Gordon will do nothing till Jamie finishes chopping onions, washing dishes, and restocking the pantry, though we solely need to know whether or not she made dough. Or worse, if any one in all these irrelevant duties fails, the sensor will unnecessarily pause all the pipeline.
  5. e-mail is the record of individuals you need Airflow to inform when the Exterior Job Sensor fails. Take into account that for this to work, you have to have the SMTP settings correctly configured within the Airflow configuration file.
  6. execution_delta is arguably probably the most complicated half about Exterior Job Sensors but in addition crucial. So, I’m dedicating a complete part to it beneath. Maintain scrolling!
  7. execution_date_fn and execution delta are very related. We are able to solely use one in all them at a time. Typically it’s simpler to make use of this slightly than execution delta. I’m additionally giving this its personal part beneath.
  8. timeout limits how lengthy a sensor can keep alive. After we create a sensor, it consumes sources by occupying one employee slot. If the goal job by no means completes, these sensors will preserve checking indefinitely whereas hogging the employee slot. Over time, we are able to run right into a Sensor Impasse, the place all employee slots turn out to be occupied by ineffective sensors and no duties can run anymore. Subsequently, it’s greatest follow to set a most time restrict for the checks.
  9. poke_interval is the length earlier than the sensor checks once more if the earlier test fails. The rationale is that we don’t need the sensor to test excessively like a madman, because it provides pointless hundreds to the server. On the flip aspect, checking too sometimes means the sensor will wait longer than obligatory, delaying the pipeline. The trick is to search out the candy spot primarily based on the anticipated run time of the exterior job.
  10. mode is how we would like the sensor to behave. It may be set to “poke” or “reschedule”.
    When set to “poke”, the sensor goes to sleep on failure and wakes up on the subsequent poke interval to attempt once more. It’s like being on standby mode. The sensor can be extra reactive, however because it’s on standby, the employee slot stays occupied all through the entire course of.
    When set to “reschedule”, the sensor will test as soon as. If the test fails, the sensor will schedule one other test at a later time however terminates itself for now, releasing up the employee slot. Airflow recommends utilizing “reschedule” if the poke interval is bigger than 60 seconds.

Alright, that’s nearly each parameter we have to learn about Exterior Job Sensor. Granted that this record just isn’t exhaustive, understanding these 10 parameters can be greater than sufficient for us to arrange our Exterior Job Sensor correctly for nearly all use instances.

For completeness’ sake, I’ll embody Airflow’s official documentation for individuals who are desperate to discover it in additional element.

Within the part above, I’ve glossed over these two parameters as a result of they’re arguably probably the most infamous, annoying, and complicated a part of exterior job sensors. However I believe it’s time we sort out them.

So what are execution_delta and execution_date_fn?

Constructing on our analogy, external_task_id tells the sensor to test if Jamie accomplished the make_dough() job. However she makes a whole lot of dough — as soon as each hour. Are we checking if she baked previously hour, yesterday, or final week?

This ambiguity confuses Exterior Job Sensors and that’s why Airflow got here up with two methods for us to speak this info. Each execution_delta and execution_date_fn are supposed to inform sensors the particular time of the duty.

  1. execution_delta expresses time on a relative foundation, e.g.: “Did Jamie bake half-hour in the past?” It accepts a datetime.timedelta object as its argument, e.g: datetime.timedelta(minutes=30).
  2. execution_date_fn expresses time on an absolute foundation, e.g.: “Did Jamie bake on the third Could 2023 at 4.30 pm?” It accepts a callable Python perform as its argument. This perform ought to return the execution date of the duty that we need to test on, e.g: datetime.datetime(12 months=2023,month=5,day=3,hour=4,minute=30).

Since each of them convey the identical info, Airflow solely permits us to make use of one or the opposite, however not each on the similar time.

I usually use execution_delta because the de-facto alternative. However, there are situations the place it’s too sophisticated to calculate the execution_delta. In that case, I’d use execution_date_fn as an alternative.

calculate execution_delta?

The phrase, execution_delta, is brief for delta (a.ok.a distinction) of execution dates (a.ok.a the earlier runtime of our duties).

The components for execution_delta. Picture by writer.

I’d like to focus on the key phrase right here — “earlier”.

A few of you might be questioning… Why does Airflow need the time distinction of earlier runs, however not the present runs? This used to confuse the crap out of me once I first began utilizing Airflow.

Seems there’s a completely good cause. Nevertheless, I don’t need to derail from the subject at hand so I’ll embody it within the later part (right here). For now, let’s simply settle for the components as-is and see how we’d apply this.

Suppose that Jamie makes dough each hour (e.g: 13:00 pm, 14:00 pm, 15:00 pm, …). Gordon additionally makes cookies each hour, however he makes them on the thirtieth minute of each hour (e.g: 13:30 pm, 14:30 pm, 15:30 pm, …).

At 14:30 pm sharp, Gordon will get able to bake his cookie. Earlier than he begins, he would want to test if Jamie made contemporary dough not too long ago. The newest run for make_dough() could be 14:00 pm.

This time sequence reveals the duty dependencies between Jamie and Gordon. Gordon all the time checks whether or not Jamie accomplished her job half an hour in the past. Chef (F) and Chef (M) icons by Freepik.

On condition that each Gordon and Jamie’s duties are scheduled hourly, their execution date (a.ok.a earlier runs) for the 14:30 pm run could be…

  • Gordon’s execution date = 14:30 pm — 1 hour = 13:30 pm
  • Jamie’s execution date = 14:00 pm — 1 hour = 13:00 pm

We are able to plug these values into the components, and voilà!

The execution_delta comes out to be datetime.timedelta(minute=30) for one particular run. Picture by writer.

You are able to do the identical calculation for various runs of the duties to get their respective execution_delta.

When calculating execution delta, it’s useful to put them out in a format like this. We need to calculate the execution deltas for a number of runs, not only one, with a view to be sure that they’re all the identical! Picture by writer.

On this (cherry-picked) instance, all the execution_delta seems to be precisely the identical. We are able to cross this to our Exterior Job Sensor and all the things will work.

from airflow.sensors.external_task import ExternalTaskSensor

ext_task_sensor = ExternalTaskSensor(
dag=gordon_tasks,
task_id='check_dough_freshness',
external_dag_id='jamie_tasks',
external_task_id='make_new_dough',
e-mail=['gordon.damnsie@gmail.com', 'jamie@gmail.com'],
execution_delta=timedelta(minutes=30), # Cross the execution delta right here
timeout=1800,
poke_interval=300,
mode='reschedule'
)

However-!

The execution_delta can be totally different typically. This often occurs when the schedule intervals of the 2 dags are totally different (e.g.: every day vs weekly, every day vs month-to-month, …).

For instance, let’s say that Jamie makes her dough weekly on Sunday at 14:00 pm, however Gordon makes his cookies every day at 14:30 pm.

The arrow between Jamie’s job and Gordon’s sensor represents the execution delta. The execution delta will get longer over the week till it resets once more on Sunday. Chef (F) and Chef (M) by Freepik.

If we do the identical calculations, you will note that the execution deltas differ for each run.

Word that execution deltas can differ for various runs. Picture by writer.

This turns into an issue as a result of execution_delta solely accepts a single datetime object as its argument. We are able to’t enter a unique worth of execution_delta for each run.

In instances like this, we’d like execution_date_fn.

calculate Execution Date Operate?

The execution_date_fn is only a common Python perform. As with all Python features, it takes some argument(s) and returns some output(s). However the fantastic thing about utilizing a perform is the power to return a unique output primarily based on the perform’s inputs and logic.

Within the case of execution_date_fn, Airflow passes the present job’s execution date as an argument and expects the perform to return the exterior job’s execution date. Word that these execution dates have to be expressed in UTC time.

def my_exec_date_fn(gordon_exec_date):
# Add your logic right here.
return jamie_exec_date

ext_task_sensor = ExternalTaskSensor(
dag=gordon_tasks,
task_id='check_dough_freshness',
external_dag_id='jamie_tasks',
external_task_id='make_new_dough',
e-mail=['gordon.damnsie@gmail.com', 'jamie@gmail.com'],
execution_date_fn=my_exec_date_fn, # Cross the perform right here.
timeout=1800,
poke_interval=300,
mode='reschedule'
)

Primarily based on our earlier case research, our execution_date_fn would want to do the next…

My Airflow is configured to native time (GMT+8), so I have to deduct 8 hours to get the UTC time. Picture by writer.

One naive manner might be hardcoding each single run, till the top of time.

# The naive manner (It is a unhealthy follow. Do not do that.)
def my_exec_date_fn(gordon_exec_date):
if gordon_exec_date == datetime(12 months=2023,month=3,day=14,hour=6,minute=30):
jamie_exec_date = datetime(12 months=2023,month=3,day=5,hour=6,minute=0)
elif gordon_exec_date == datetime(12 months=2023,month=3,day=15,hour=6,minute=30):
jamie_exec_date = datetime(12 months=2023,month=3,day=5,hour=6,minute=0)
elif gordon_exec_date == datetime(12 months=2023,month=3,day=16,hour=6,minute=30):
jamie_exec_date = datetime(12 months=2023,month=3,day=5,hour=6,minute=0)
elif gordon_exec_date == datetime(12 months=2023,month=3,day=17,hour=6,minute=30):
jamie_exec_date = datetime(12 months=2023,month=3,day=5,hour=6,minute=0)
...

return jamie_exec_date

This works however it’s undoubtedly not probably the most environment friendly manner.

A greater strategy is to search for constant patterns and use that to programmatically derive the outputs. Often, place to search for patterns is the execution_delta, because it comprises the connection between the execution dates (we talked about this right here).

Moreover, we are able to additionally take a look at datetime attributes, such because the day of the week. If we actually give it some thought, our Exterior Job Sensor will all the time be pointing to a Sunday as a result of Jamie solely makes dough on Sunday. As we transfer by the week, Gordon’s job date can be additional and additional away from this Sunday till it resets once more the subsequent Sunday. Then, it repeats.

That is displaying the time distinction between the present runs for simplicity’s sake. Execution_date_fn seems to be at earlier runs, however we’ll see the identical patterns there too. Chef (F) and Chef (M) icons by Freepik.

This means that day of the week will also be useful in developing with our execution_date_fn. So let’s add the day of the week to our desk. I’ll be labeling Monday as 1 and Sunday as 7 as per the ISO 8601 normal.

The numbers in brackets are the week of day, the place Monday is 1 and Sunday is 7. Picture by writer.

By labeling them, it turns into instantly clear that…

  • The execution_delta begins from 6 on a Saturday.
  • The execution_delta will increase by 1 day-after-day, as much as a most of 12 each Friday.
  • The execution_delta then resets again to a 6 on a Saturday.

We are able to re-create that relationship in a Python perform and assign this execution_date_fn to our Exterior Job Sensor.

def my_exec_date_fn(gordon_exec_date):
day_of_week = gordon_exec_date.isoweekday()

if day_of_week in (6, 7):
time_diff = timedelta(days=day_of_week, minute=30)
jamie_exec_date = gordon_exec_date - time_diff
elif day_of_week in (1, 2, 3, 4, 5):
time_diff = timedelta(days=day_of_week+7, minute=30)
jamie_exec_date = gordon_exec_date - time_diff

return jamie_exec_date

ext_task_sensor = ExternalTaskSensor(
dag=gordon_tasks,
task_id='check_dough_freshness',
external_dag_id='jamie_tasks',
external_task_id='make_new_dough',
e-mail=['gordon.damnsie@gmail.com', 'jamie@gmail.com'],
execution_date_fn=my_exec_date_fn,
timeout=1800,
poke_interval=300,
mode='reschedule'
)

There now we have it — our very personal execution_date_fn. With a little bit of creativity, execution_date_fn can cater to any situation.

Up till this level, we’ve lined all the things you have to know to get began with Exterior Job Sensor. On this part, I believed it’d be good to collate all the issues we’ve discovered to see how the items match collectively in our information pipelines.

To begin with, we’ll be creating Jamie DAG, in a file known as jamie_dag.py.

from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.sensors.external_task import ExternalTaskSensor

# Outline job 1
def make_dough():
# embody your secret recipe right here!
return cookies

# Create DAG
jamie_tasks = DAG(
dag_id='jamie_tasks',
description='Jamie to do record. (a.ok.a making dough solely)',
schedule_interval='5 3 * * *',
...
)

# Embody job 0 in DAG (as a place to begin)
begin = DummyOperator(
dag=jamie_tasks,
task_id='begin'
)

# Embody job 1 in DAG
make_dough = PythonOperator(
dag=jamie_tasks,
task_id='make_dough',
python_callable=make_dough,
...
)

# Create dependencies (deciding the sequence of job to run)
begin >> make_dough

Then, we’ll be creating Gordon DAG, in one other file known as gordon_dag.py.

from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.sensors.external_task import ExternalTaskSensor

# Outline job 1
def bake_cookies():
# embody your secret recipe right here!
return cookies

# Outline job 2
def make_money():
# embody your cash making approach step-by-step right here.
return cash

# Outline execution_date_fn for sensor 1
def my_exec_date_fn(gordon_exec_date):
day_of_week = gordon_exec_date.isoweekday()

if day_of_week in (6, 7):
time_diff = timedelta(days=day_of_week, minute=30)
jamie_exec_date = gordon_exec_date - time_diff
elif day_of_week in (1, 2, 3, 4, 5):
time_diff = timedelta(days=day_of_week+7, minute=30)
jamie_exec_date = gordon_exec_date - time_diff

return jamie_exec_date

# Create DAG
gordon_tasks = DAG(
dag_id='gordon_tasks',
description='Record of issues that Gordon must do.',
schedule_interval='5 3 * * *',
...
)

# Embody job 0 in DAG (as a place to begin)
begin = DummyOperator(
dag=gordon_tasks,
task_id='begin'
)

# Embody job 1 in DAG
bake_cookies = PythonOperator(
dag=gordon_tasks,
task_id='bake_cookies',
python_callable=bake_cookies,
...
)

# Embody job 2 in DAG
make_money = PythonOperator(
dag=gordon_tasks,
task_id='make_money',
python_callable=make_money,
...
)

# Create sensor 1
check_dough_freshness = ExternalTaskSensor(
dag=gordon_tasks,
task_id='check_dough_freshness',
external_dag_id='jamie_tasks',
external_task_id='make_new_dough',
e-mail=['gordon.damnsie@gmail.com', 'jamie@gmail.com'],
execution_date_fn=my_exec_date_fn,
timeout=1800,
poke_interval=300,
mode='reschedule'
)

# Create dependencies (deciding the sequence of job to run)
(begin
>> check_dough_freshness
>> bake_cookies
>> make_money)

Word that Exterior Job Sensor is in gordon_dag.py and never jamie_dag.py since we would like Gordon to be checking on Jamie, not the opposite manner round. Gordon’s DAG could be the present DAG and Jamie the exterior DAG.

And… there now we have it!

We’ve created our very first Exterior Job Sensor, check_dough_fresness. This sensor will poke Jamie’s make_new_dough() returns both Success or Fail. If it fails, bake_cookies() and make_money() is not going to run.

Dates in Apache Airflow are complicated as a result of there are such a lot of date-related terminologies, reminiscent of start_date, end_date, schedule_interval, execution_date, and so forth. It’s a large number, actually. However let’s try to determine it out with a narrative.

Suppose that our boss needs to know the gross sales efficiency of his firm. He needs this information to be refreshed day-after-day at 12 midnight for the subsequent 6 months.

First, we write an advanced SQL question that generates the gross sales efficiency information. It takes 6 hours to run the question.

  • task_start is the beginning time of a job.
  • task_end is the top time of a job.
  • task_duration is the time it takes to run the duty.
A single job. Picture by writer.

Every single day, we might want to run this job at 12 midnight.

A single job, scheduled at 12am and runs for six hours. Picture by writer.

To automate this question, we create an Airflow DAG and specify the start_date and end_date. Airflow will execute the DAG so long as immediately’s date falls inside this era.

An Airflow DAG. Picture by writer.

Then, we put the duty into the Airflow DAG.

We’d like this information refreshed as soon as a day at 12 midnight. So, we set the schedule_interval to "0 0 * * *", which is the CRON equal of every day at 12 midnight.

The schedule_interval basically provides a delay between every consecutive schedule, telling Airflow solely run the duty at a selected time, since we don’t need the duty to re-run once more as quickly because it finishes.

  • interval_start refers back to the begin time of a specific schedule interval.
  • interval_end refers back to the finish time of a specific schedule interval.
Word that interval_start and interval_end can overlap. The interval_end of the earlier schedule interval would be the similar because the interval_start of the subsequent schedule interval. Picture by writer.

Right here comes probably the most mind-blowing half — though seemingly counterintuitive, Airflow Scheduler triggers a DAG run on the finish of its schedule interval, slightly than in the beginning of it.

Which means Airflow is not going to do something within the first-ever schedule interval. Our question will run for the primary time on 2nd Jan 2023 at 12 am.

The coloured bars are like information. All of the “yellow” information solely will get summarized on 2nd Jan. Picture by writer.

It is because Airflow is initially created as an ETL device. It’s constructed on the concept information from a time frame will get summarised on the finish of the interval.

For instance, if we wished to know the gross sales of cookies for the first of January, we wouldn’t create a gross sales report on the first of January at 1 pm as a result of the day hasn’t ended but and the gross sales quantity could be incomplete. As a substitute, we’d solely course of the information when the clock strikes 12 midnight. Right now, we can be processing yesterday’s information.

Why is that this necessary?

Since we’re summarizing the earlier run’s information, the gross sales report we’re producing on the 2nd of Jan describes the first of Jan gross sales, not the 2nd of Jan gross sales.

For that cause, Airflow finds it extra significant to confer with this run as the first of Jan run though it’s executed on the 2nd. To higher differentiate the dates, Airflow provides a particular identify to the start of a schedule interval—execution_date.

Though we run the “yellow” job on 2nd Jan, its execution date is definitely 1st Jan. Picture by writer.

That is why we all the time take the distinction of the “earlier” run once we calculate execution_delta as a result of it’s the delta of the execution_dates, which is actually the “earlier” run.

Exterior Job Sensors are like gatekeepers. They cease unhealthy information from going downstream by ensuring that duties are executed in a selected order and that the required dependencies are met earlier than continuing with subsequent duties.

For individuals who have by no means used Exterior Job Sensors earlier than, I hope the article was capable of convey its significance and persuade you to begin utilizing them. For individuals who have been utilizing them, I hope a number of the insights listed below are capable of assist deepen your understanding.

Thanks on your time, and have an incredible day.

RELATED ARTICLES

LEAVE A REPLY

Please enter your comment!
Please enter your name here

- Advertisment -

Most Popular

Recent Comments