Skip to content

Conversation

@pierrejeambrun
Copy link
Member

closes: #29317

@boring-cyborg boring-cyborg bot added the area:Scheduler including HA (high availability) scheduler label Feb 13, 2023
@pierrejeambrun pierrejeambrun force-pushed the 29317-migrate-check-and-change-state-before-execution-to-rpc branch from d530689 to 7a3c9d3 Compare February 13, 2023 21:06
@pierrejeambrun pierrejeambrun force-pushed the 29317-migrate-check-and-change-state-before-execution-to-rpc branch from 7a3c9d3 to 80058ff Compare February 13, 2023 21:46
@pierrejeambrun pierrejeambrun changed the title Migrate TaskInstance.check_and_change_state_before_execution to rpc AIP-44 Migrate TaskInstance.check_and_change_state_before_execution to Internal API Feb 13, 2023
@pierrejeambrun pierrejeambrun added the AIP-44 Airflow Internal API label Feb 13, 2023
@pierrejeambrun pierrejeambrun requested a review from mhenc February 13, 2023 22:27
@pierrejeambrun pierrejeambrun force-pushed the 29317-migrate-check-and-change-state-before-execution-to-rpc branch from 80058ff to 9d5250d Compare February 14, 2023 00:09
@pierrejeambrun
Copy link
Member Author

I would have to also check here. There might be part of this fn that shouldn’t run on the internal api server

@mhenc
Copy link
Contributor

mhenc commented Feb 14, 2023

Thanks for looking into that!
I don't see any place that could access customer code there. Seems this function is like the very last set of checks before running the task.
But it's possible that one of the sub-methods does it.

@pierrejeambrun
Copy link
Member Author

pierrejeambrun commented Feb 14, 2023

I took a look and overall I think you are right about user code here. Most of the function performs checks and db update for the task instance.

I just have one question about the gethostname() call from within check_and_change_state_before_execution. That would now be called within the internal api server. Wondering if this is expected. This would be overriden by the following call toself._run_raw_task which also updates the TI hostname, but could this still change the task instance hostname value in an unexpected way ?

@potiuk
Copy link
Member

potiuk commented Feb 15, 2023

Nice, but conflict :(

Copy link
Member

@potiuk potiuk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changing my +1 (It was too quick). The get_hostname() question from @pierrejeambrun made me think 💭

I looked more closely and I think we have a bit of a problem here that we have not thought about (and possibly we should re-review other internal_api_call implementation (I looked quickly and I think this one is the first one, but would be worth checking).

The internal_api_call has an implicit assumption - which we have not formulated yet - that parameters passed should be immutable. There is no mechanism in our solution currently to pass back any mutation that happens inside the call, back to the caller.

UPDATE: This missed the fact that TI is also SQL Alchemy model. I proposed a different solution to handle it (basically pass primary key of TI in the method and retrieve TaskInstance internally). The below general note however remains valid in case we find a non-ORM model that we need to mutate.

And in MOST cases we don't have to - because most of those @internal_api_call methods will mutate the stae in the DB (which is what we care about) - but some might mutate local in-memory object state as well (this might be especially the case when the original method was an object method, with self - llke in this case, where TaskInstance gets modified.

I think there are few possible approaches to solve it, but one that I can think of is to refactor those methods so that that they do not mutate object in-place but so that they return the mutated object(s). The function would return the mutated object and caller would look like:

ti = the_method(ti) 

In case of local call that assignment would be a noop, In case of server call we would serialize object, mutate it , serialize back, and replace the original object. It has some possible side-effects (because it will be not-in-place modification) but I think this is is the only "easy" way and we need to review those "mutation" places looking at the possible side-effects it might have.

@potiuk
Copy link
Member

potiuk commented Feb 15, 2023

cc: @mhenc ^^

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All right, after looking at #29355 I have a better proposal how to solve this one. The fact that this method uses Task Instance, does not mean that we should pass the TaskInstance as parameter. In case of server side of the Internal API call we can just retrieve the TaskInstance object from the the database. Trying to pass it as parameter is a bad idea (see the mutation case explained). What we really want to do is to pass primiary keys of TaskInstance and retrieve it is as the first step.

For in-process call this will be no-op (TaskInstance will be already the same as self.task - we need to double check if we need to do any kind of refresh with the SQL alchemy object we update in this way, but it should be easy.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree on this one. I tried to do this while working on #28900 and basically call TaskInstance.get_task_instance in the method itself instead of passing the whole TaskInstance object. Though, I confess, I got lazy and did not do it for all methods since #29355 was coming.

However, it will add some additional database calls when the DB isolation is turned off, I am just not sure of the consequences of it (is it sustainable or not). Since it is simple queries on primary key, it seems okay to me.

Also, while working on #28900, I found it hard to know whether an object (e.g. DAG, DagRun, ...) could be passed as parameter (then serialized) or not and then use the technique suggested in the proposal to pass primary keys and then retrieve the object. It would be great/helpful if we can make the inventory of objects which can be serialized and objects which cannot

Copy link
Member Author

@pierrejeambrun pierrejeambrun Feb 16, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the pointers. This approach seems fair to me.

@vincbeck I believe custom objects that are serializable can be found in the BaseSerialization.serialize. But maybe this list is not exhaustive and an inventory would definitely help.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Definately there are some missing parts in serialization (that's why I work on #29355)
Just a note, even with my PR merged, we should rather avoid passing TaskInstance client->Internal API.
As I said in the description: the SQL Alchemy state is not serializable, so the TaskInstance object that we will get in InternalAPI will not be fully functional with SQL Alchemy (e.g. for session.merge).
I am not sure if we can any solution other that introducing new DTO for it, but that would require a lot of changes in the whole codebase.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree. Having TaskInstace ORM objects as plain DataObject without even having session/connection with the DB sounds like a bad idea. I will take a closer look at that during the next week and try to come up with a better solution - maybe it will be just enough to extract some new methods wrapping a bit more of the code.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a comment: I am working on a small POC to see the scope of refactoring that we would have to do to avoid serializing TaskInstance over the internal API call. Stay tuned. It's not a "few lines" one after the closer look so it might take a moment before I figure out the extend of modifications needed (I want to make it as small of a change to airflow internal code as possible to avoid huge set of changes). I am not sure yet if that will be as simple as I thought, but .. stay tuned please :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, staying tuned :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey everyone (mostly @mhenc @vincbeck @pierrejeambrun - but also others @kaxil @ashb @bolkedebruin ) #29776 is the first part of the POC showing how we can slightly refactor our code for the AIP-44 (the rest will follow if we agree that what I proposed makes sense).

I tried a few approaches how we can use the "DB" bound models in the internal API and I thin the one that I came up with is by far simplests - it has almost no boilerplate code, it seems to be easy to maintain and it harnesses the power of powerful Pydantic in order to convert ORM models to serializable classes that we will be able to send over the internal_api calls.

We discussed about using Pydantic before, to do such tasks, and after looking at how easy it is to do the job of converting virtually any of our ORM models to fully serializable and validatea-ble models, I find this approach best.

Some of the properties:

  • as little as possible code duplication - the only duplication is that we have to define fields and their types in a separate "pydantic" class. This is a very small overhead (those classes change rather infrequently)

  • the mapping is done automatically by Pydantic, it has "orm_mode" that deals wiht ORM mapping to Pydantic and sqlalchemy is first-class-citizen for those

  • I did not map all the relations and fields yet - only those that were easy and likely needed, but if needs be, Pydantic cna also handle relations in orm_mode and it can serialize linked object (for example DagRun for TaskInstance) - it's just a matter of defining the field in Pydantic model.

  • in the future we can add more validation for those objecst (also powered by Pydantic) - for now I restricted definition of the Pydantic class fields to use the "stdlib" types, by with Pydantic we could use more pydantic constructs to add much more detailed validations (string lengths, min/max values etc). This is internal API so we do not have to, but maybe we will decide it is worth it

  • pydantic is highly recommended by a lot of very serious Python projects using it and it has bright future https://docs.pydantic.dev/blog/pydantic-v2/ with Rust implementation being many times faster.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Absolutely brilliant. And I mean that.

Copy link
Member Author

@pierrejeambrun pierrejeambrun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@potiuk @vincbeck Taking into account your suggestions, this is how it could look like. (it's a quick implementation to make it easier to reason/discuss).

I'll be out of office for a week, feel free to update as needed

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Primary key for task instance is composite with dag_id, run_id, task_id, map_index. So the call is a little bit more verbose

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sometime this function is called but there is not yet a db record for the task instance. cf test_check_and_change_state_before_execution_dep_not_met which highlight this case.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Task instance is expired if the session was not passed down from the parent caller but provided by @provide_session. We need to make_transient to be able to do:

ti = TI.check_and_change_state_before_execution()

We have only 1 case where there is no session in the parent, LocalTaskJob._execute.

@pierrejeambrun pierrejeambrun force-pushed the 29317-migrate-check-and-change-state-before-execution-to-rpc branch from d992d94 to 298b5a0 Compare February 24, 2023 16:24
@ephraimbuddy ephraimbuddy added the changelog:skip Changes that should be skipped from the changelog (CI, tests, etc..) label Apr 11, 2023
@github-actions
Copy link

This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions.

@github-actions github-actions bot added the stale Stale PRs per the .github/workflows/stale.yml policy file label May 27, 2023
@github-actions github-actions bot closed this Jun 1, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

AIP-44 Airflow Internal API area:Scheduler including HA (high availability) scheduler changelog:skip Changes that should be skipped from the changelog (CI, tests, etc..) stale Stale PRs per the .github/workflows/stale.yml policy file

Projects

None yet

Development

Successfully merging this pull request may close these issues.

AIP-44 Migrate TaskInstance.check_and_change_state_before_execution to Internal API

6 participants