Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AIP-82 Handle trigger serialization #45562

Open
wants to merge 12 commits into
base: main
Choose a base branch
from

Conversation

vincbeck
Copy link
Contributor

@vincbeck vincbeck commented Jan 10, 2025

Handle serialization of triggers in DAGs as part of AIP-82. With AIP-82 (External event driven scheduling in Airflow) you can define DAGs as below:

trigger = FileTrigger(....)
asset = Asset("<my_queue>", watchers=[AssetWatcher(name="my_file_watcher", trigger=trigger)])
 
with DAG(
    dag_id=DAG_ID,
    schedule=asset,
):
    empty_task = EmptyOperator(task_id="empty_task")
 
    chain(empty_task)

Triggers can be instantiated as part of a DAG. As such, we need to serialize them.


^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

@vincbeck vincbeck force-pushed the vincbeck/aip-82-serialize branch 4 times, most recently from fa9f975 to 5f2a621 Compare January 11, 2025 00:10
airflow/triggers/base.py Outdated Show resolved Hide resolved
airflow/triggers/base.py Outdated Show resolved Hide resolved
@vincbeck
Copy link
Contributor Author

vincbeck commented Jan 17, 2025

I addressed your comments but I also added something new. It was actually a suggestion from @dstandish that I think is a great idea! I introduced the concept of AssetWatcher that is a thin wrapper around trigger. Explanations:

Instead of having:

trigger = FileTrigger(....)
asset = Asset("<my_queue>", watchers=[trigger])

Now it is:

trigger = FileTrigger(....)
asset = Asset("<my_queue>", watchers=[AssetWatcher(name="my_queue_watcher", trigger=trigger)])

AssetWatcher does not add much besides just a name, but I think it is important because, in the future, we might want to surface the relations between assets and triggers in the graph view in the UI. Representing a trigger in the graph can be tricky because triggers have 2 pieces of information: classpath and kwargs. None of them are very suitable for displaying purposes. classpath is debatable but it is anyway not representable of the trigger. Example: you might have an asset with 2 watchers: each one uses the trigger FileTrigger to monitor a different file. Having only the classpath displayed will not help the user to understand which one is which. Hence the name.

@vincbeck vincbeck force-pushed the vincbeck/aip-82-serialize branch from f64e72d to 29b9834 Compare January 17, 2025 14:46
@vincbeck
Copy link
Contributor Author

@uranusjr @ashb what do you guys think?

@ashb
Copy link
Member

ashb commented Jan 21, 2025

I'll take another look today or tomorrow.

Copy link
Member

@ashb ashb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks much better thanks, but a few changes I'd like you to make please.

Comment on lines 748 to 749
cast(dict, watcher.trigger)["classpath"], cast(dict, watcher.trigger)["kwargs"]
): cast(dict, watcher.trigger)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These casts look odd. What's going on here?

At least this looks like it should work?

Suggested change
cast(dict, watcher.trigger)["classpath"], cast(dict, watcher.trigger)["kwargs"]
): cast(dict, watcher.trigger)
watcher.trigger["classpath"], watcher.trigger["kwargs"]
): cast(dict, watcher.trigger)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The trigger property in AssetWatcher can be a BaseTrigger or a dict depending on where the object AssetWatcher is created:

  • In the DAG definition (by users), trigger is a BaseTrigger
  • When deserializing the DAG from the DB, trigger is a dict to avoid creating the AssetWatcher as part of the deserialization process (based on your comment and suggestion by TP here)

Here I cast it to make Mypy happy


file_path = tempfile.NamedTemporaryFile().name
file_path = "/tmp/test"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did we need to change this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes to make one serialization/deserialization test happy. One of the tests serialize and deserialize all the example DAGs and expect the actual example DAGs to equal the serialized -> deserialized one (round trip transformation). Having this made the test fail because file_path was different. I could have mocked it but I also realized that /tmp/ is something widely use as temporary directory in tests so I want with the easy solution :)

Let me know if you want to keep it, I'll have to play with mocks then

airflow/example_dags/example_asset_with_watchers.py Outdated Show resolved Hide resolved
Comment on lines 310 to 313
name=var["name"],
uri=var["uri"],
group=var["group"],
extra=var["extra"],
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consistency please -- There is another deserialize (why are there two? Separate question I guess) but

Suggested change
name=var["name"],
uri=var["uri"],
group=var["group"],
extra=var["extra"],
**var,

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dont have the answer for this one. What I know is one if at the root level of deserialize (then deserializing an asset) and the other one is in decode_asset_condition so deserializing an asset condition. I can create a private function to factorize code if you like it better

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't mind two versions per se, but it'd be nice if they had the same implementation 😁

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I applied this one but reverted because var contains some keys we do not want to pass to Asset. See error here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I factorized the code in a new function decode_asset

name: str
# This attribute serves double purpose. For a "normal" asset instance
# loaded from DAG, this holds the trigger used to monitor an external resource.
# For an asset recreated from a serialized DAG, however, this holds the serialized data of the trigger.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we should make this class do double duty like this -- We already have BaseSerializedOperator etc. and I want to try very hard to avoid putting a run-time dependency on TaskSDK code in the core Scheduler.

i.e. I'd like the scheduler to operate only on a class that is deserialized from the DB that does not inherit from this class.

(I have yet to do that for SerializedDAG and SerializedBaseOperator, but I need to before we can release 3.0)

Copy link
Contributor Author

@vincbeck vincbeck Jan 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you suggesting we should not use this class when we deserialize the DAG? Currently this class is used by users when defining their DAGs and by the scheduler when it deserializes the DAG from the DB. Do you want to use it only by users? I am trying to understand the path forward :)

Do not we have the same problem with classes like Asset?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you want to use it only by users? I am trying to understand the path forward :)

That is my end goal yes, and Asset would need updating too. Maybe it's best all done at once. For instance on a SchedulerDag object or a SchedulerBaseOperator (not the final names) the interface will be greatly reduced, and contain only the minimal needed to set relationships between tasks, but a lot of the "richness" of API and compat would not be needed in that class.

But it's probably more confusing to have one class be like that but not others.

That said, one thing you might be able to do here is have a SerializedAssetWatcher class which is a subclass of this one with this field as a different type? This pattern is what we have already for SerializedBaseOperator -- i.e. derserialize code should create instances of SerializedAssetWatcher not AssetWatcher.

If that works with mypy (it might complain about the type of the field not matching the parent) I think this approach is a small change here but makes my life much easier on this bigger refactor when I get to it.

Am I making any sense?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are :) A lot! Thanks a lot for the details, that helps a lot to better understand. I created a new class SerializedAssetWatcher and use that class whenever I deserialize. In collection.py, when I fetch the watchers, I cast them as SerializedAssetWatcher because asset.watchers can be AssetWatcher or SerializedAssetWatcher. Let me know if this is what you had in mind :)

@vincbeck vincbeck force-pushed the vincbeck/aip-82-serialize branch from 3859d7a to 021c863 Compare January 22, 2025 20:25
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants