Skip to content

Conversation

@mhenc
Copy link
Contributor

@mhenc mhenc commented Feb 3, 2023

In AIP-44 we need to send the TaskInstance object to worker for execution. Currently this object type doesn't support serialization(only SimpleTaskInstance does).

This change is mainly about new way to construct TaskInstance from_dict - which requires changing the constructor into from_task and refactoring all usages.

Note that deserialized TaskInstance will not have ORM state (so will not be able to refresh state automatically from DB etc), but this should be fine as it will be used on client side of Internal API.

related: #29320

@boring-cyborg boring-cyborg bot added area:API Airflow's REST/HTTP API area:CLI area:core-operators Operators, Sensors and hooks within Core Airflow provider:cncf-kubernetes Kubernetes (k8s) provider related issues area:Scheduler including HA (high availability) scheduler area:serialization area:webserver Webserver related Issues labels Feb 3, 2023
@mhenc mhenc force-pushed the ti_serialize branch 2 times, most recently from 04d67fa to f279acf Compare February 3, 2023 13:13
@mhenc mhenc marked this pull request as ready for review February 3, 2023 14:16
@mhenc
Copy link
Contributor Author

mhenc commented Feb 3, 2023

cc: @potiuk @vincbeck

@mhenc mhenc changed the title AIP-44 Support TaskInstance serialization. AIP-44 Support TaskInstance serialization/deserialization. Feb 3, 2023
Copy link
Contributor

@vincbeck vincbeck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice!

Copy link
Contributor

@kosteev kosteev left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In order to achieve the goal of constructing TaskInstance objects from "task" and "dict"
could we have constructor of TaskInstance extended to support new parameter, e.g. task_dict like this:

def __init__(
    self,
    task: Operator,
    execution_date: datetime | None = None,
    run_id: str | None = None,
    state: str | None = None,
    map_index: int = -1,
    task_dict: dict = None,
):
    if task_dict is not None:
        # init from task_dict
        return

    # old implementation

and then TaskInstance(task=t), TaskInstance(task_dict={...})

What do you think about this approach?

@ashb
Copy link
Member

ashb commented Feb 8, 2023

@bolkedebruin could you take a look? I know you've recently overhauled all of the serialisation code

1 similar comment
@ashb
Copy link
Member

ashb commented Feb 8, 2023

@bolkedebruin could you take a look? I know you've recently overhauled all of the serialisation code

Copy link
Contributor

@bolkedebruin bolkedebruin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall, great that this is happening! I would like to bring it more inline with the new serialization/deserialization code as I have been working on making it into 'one' (merging XCOM serialization and DAG serialization).

To seduce you: the new serialization code is an order of a magnitude faster than the old one (improvements over 50%) and can deal with versioning, while the old one cannot. The migration isn't fully completed yet (I am working on DAGs, but for obvious reasons that will take awhile), however that should not be in the way of making use of the new code (serde.py).

For your code this means moving away from to_dict/from_dict to the API of serialize() / deserialize(value, version) and setting __version__: ClassVar[int] = X in the TaskInstance class. This is pretty close to what you have now. serialization/deserialization should be called from serde and not directly. Alternatively you can add a serializer/deserializer to airflow.serialization.serializers.

I can help if needed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here you are extending legacy code, i suggest using the more generic serialization code from serde

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We rely on serialized_objects in InternalAPI in general
https://github.com/apache/airflow/blob/main/airflow/api_internal/internal_api_call.py#L107
Do you think we could switch to serde now? Is it compatible with what serialize_objects offer?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that you are going to rely on a lot of serialization and the new serializer/deserializer is significantly faster and more future proof (versioning) I think everything that does currently not have a schema (that's everything except a DAG*) should switch.

I am willing to help out to ease the migration if required.

  • I am working on DAG serialization/deserialization but untangling how it is done now and to improve the structure is taking time especially with all the edge cases.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great. I think we will migrate to the new serializer/deserializer in this case, but probably outside of this PR.
If you believe it would better to migrate first, then I can revert this change and get back to it when using new way.
WDYT?

Copy link
Contributor

@bolkedebruin bolkedebruin Feb 9, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Am I incorrect in thinking that migration is a 2 line change in internal_api_call and you are not relying on any of the other serialized_objects (basically DAG)? If so then I would say do not add technical dept and migrate now. This allows us to call serialized_objects as stale and soon to be deprecated.

Otherwise, keep it and and add it to the todo of AIP-44?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, we need to make change there and server-side:
https://github.com/apache/airflow/blob/main/airflow/api_internal/endpoints/rpc_api_endpoint.py#L76

There are more methods with internal_api_call decorator. I did a quick check and I see that (beside primitives) we already need serialization to Dag,DagRun, BaseXCom, CallbackRequest (and probably more soon)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question. (and really sorry I have not looked at it earlier) and also follow up on #29513 as well.

Are we REALLY sure we need to serialize the whole TaskInstance (and other) objects to be passed via internal_api_call ?

For me this is an indication that we either have too narrow of a scope for an @internal_api call (generally speaking the whole internal_api_call should span the whole DB transaction. And since we are trying to pass an ORM object (TaskInstance, DAGRun etc.) it means that that object must have been retrieved before within a transaction. So it means that our internal_api_call should wrap the retrieval as well. Which might simply mean that we need to do some refactoring and add extract new methods (and then decorate them).

That's of course a general statement and approach and there might be cases that this require a bit deeper refactoring.

Which methods are affected @mhenc (besides the #29513 one) ? maybe we can look toghether and figure out approach for all of them ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For example in #29513 (comment) I proposed the solution that would avoid TaskInstance serialization altogether. I am reasonable convinced, that similar approach can be done for all ORM objects of ours and that we do not need to serialize any of them (in which case the whole PR might not be needed).

Copy link
Contributor Author

@mhenc mhenc Feb 21, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But this works only for client -> Internal API side.

What about Internal API -> client, e.g. worker. We need to have TaskInstance object in worker to run the task, e.g.
https://github.com/apache/airflow/blob/main/airflow/cli/commands/task_command.py#L187

unless of course we are able to refactor it completely

Copy link
Member

@potiuk potiuk Feb 21, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's precisely what I am refactoring now (with my POC)

@bolkedebruin
Copy link
Contributor

bolkedebruin commented Feb 8, 2023

In order to achieve the goal of constructing TaskInstance objects from "task" and "dict" could we have constructor of TaskInstance extended to support new parameter, e.g. task_dict like this:

def init(
self,
task: Operator,
execution_date: datetime | None = None,
run_id: str | None = None,
state: str | None = None,
map_index: int = -1,
task_dict: dict = None,
):
if task_dict is not None:
# init from task_dict
return

# old implementation

and then TaskInstance(task=t), TaskInstance(task_dict={...})

What do you think about this approach?

I don't think that makes sense, a staticmethod or classmethod deserialize should cover this and used this way at other locations.

It could be as simple as this:

from airflow.serialization.serde import deserialize

ti = deserialize(serialized_ti)

@mhenc mhenc force-pushed the ti_serialize branch 2 times, most recently from 6583bc4 to ca38db6 Compare February 8, 2023 15:22
Copy link
Contributor

@bolkedebruin bolkedebruin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added some additional comments

@mhenc mhenc requested review from bolkedebruin, kosteev and uranusjr and removed request for kosteev February 9, 2023 13:38
Copy link
Contributor

@bolkedebruin bolkedebruin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Almost there :-)

Copy link
Contributor

@bolkedebruin bolkedebruin Feb 9, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Am I incorrect in thinking that migration is a 2 line change in internal_api_call and you are not relying on any of the other serialized_objects (basically DAG)? If so then I would say do not add technical dept and migrate now. This allows us to call serialized_objects as stale and soon to be deprecated.

Otherwise, keep it and and add it to the todo of AIP-44?

@mhenc
Copy link
Contributor Author

mhenc commented Mar 24, 2023

Closing in favor of #30282

@mhenc mhenc closed this Mar 24, 2023
@mhenc mhenc deleted the ti_serialize branch August 30, 2023 18:48
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:API Airflow's REST/HTTP API area:CLI area:core-operators Operators, Sensors and hooks within Core Airflow area:Scheduler including HA (high availability) scheduler area:serialization area:webserver Webserver related Issues provider:cncf-kubernetes Kubernetes (k8s) provider related issues

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants