Skip to content

Issue with Pod Log Retrieval in KubernetesJobOperator When Using Deferred Mode  #42144

@jordi-crespo

Description

@jordi-crespo

Apache Airflow Provider(s)

cncf-kubernetes

Versions of Apache Airflow Providers

apache-airflow-providers-cncf-kubernetes==8.3.4

Apache Airflow version

2.9.2

Operating System

Docker, creating the image from python:3.10-slim-buster

Deployment

Official Apache Airflow Helm Chart

Deployment details

No response

What happened

When using KubernetesJobOperator in deferred mode, the job executes correctly, but logs from the associated pod are not retrieved.

The issue arises in the execute_complete method, where the pod_name and pod_namespace passed from the trigger are None, which causes the following code to fail:

if self.get_logs:
    pod_name = event["pod_name"]
    pod_namespace = event["pod_namespace"]
    self.pod = self.hook.get_pod(pod_name, pod_namespace)
    if not self.pod:
        raise PodNotFoundException("Could not find pod after resuming from deferral")
    self._write_logs(self.pod)

First, I create a V1Pod object that I pass to the kubernetesJobOperator using the parameter full_pod_spec. This works fine as well in the kubernetesPodOperator. The kubernetesJobOperator creates the job it creates a V1Pod from my full_pod_spec. Later in this line of code it creates the self.pod object.

Because I am executing the opeartor as deferred the self.execute_deferrable() is executed. This method uses the KubernetesJobTrigger class and passes the pod_name and pod_namespace. Because I don't get an error, I assume that the self.pod object exist, because it can access the following pod attributes:

pod_name=self.pod.metadata.name,  # type: ignore[union-attr]
pod_namespace=self.pod.metadata.namespace,  # type: ignore[union-attr]

KubernetesJobTrigger, yields TriggerEvent object, passing the pod_nameand pod_namespace like this:

yield TriggerEvent(
            {
                "pod_name": pod.metadata.name if self.get_logs else None,
                "pod_namespace": pod.metadata.namespace if self.get_logs else None,
  
            }
        )

This pod object comes from this piece of code:

if self.get_logs or self.do_xcom_push:
pod = await self.hook.get_pod(name=self.pod_name, namespace=self.pod_namespace)

Next, the method execute_complete is executed and it receives the pod_nameand pod_namespace from the event.

But I found that there these values are None. I believe that the TriggerEvent is yielding None values.

I suspect that this is happening, getting a None value for the pod name and namespace when the code is trying to do:

if self.get_logs or self.do_xcom_push:
pod = await self.hook.get_pod(name=self.pod_name, namespace=self.pod_namespace)

I think that the pod object may not have the name and namespace because the pod is still in the Pending state, meaning Kubernetes has accepted the pod but hasn't fully scheduled or initialized it yet, so certain metadata fields like name and namespace might not be populated.

As a workaround, I had to manually extract the pod name from the job itself, by inheriting the kubernetesJobOperator and creating a new execute_complete method.

pod_list_response = self.hook.get_namespaced_pod_list(
    label_selector=f"job-name={job_name}", namespace=job_namespace
)

pod_list = json.loads(pod_list_response.read())
pods = pod_list.get("items", [])
if not pods:
    raise PodNotFoundException(f"No pods found for job: {job_name} in namespace: {job_namespace}")

pod = pods[0]
pod_name = pod["metadata"]["name"]
pod_namespace = pod["metadata"]["namespace"]

After retrieving the pod details, I encountered another issue with the client used to fetch logs. By default, self.client in KubernetesJobOperator points to the BatchV1Api:

@cached_property
def client(self) -> BatchV1Api:
    return self.hook.batch_v1_client

This is happening in the following piece of code:

if self.get_logs:
            pod_name = event["pod_name"]
            pod_namespace = event["pod_namespace"]
            self.pod = self.hook.get_pod(pod_name, pod_namespace)
            if not self.pod:
                raise PodNotFoundException("Could not find pod after resuming from deferral")
            self._write_logs(self.pod)

self._write_logs(self.pod) is created in the kubernetesPodOperator that is creating the self.client using use CoreV1Api. I resolved this by switching the client before calling the _write_logs` function:

self.client = self.hook.core_v1_client
self._write_logs(self.pod)

What you think should happen instead

The KubernetesJobTrigger should be able to send the pod_name and pod_namespace so that they are not None. This can be achieved by passing the pod_name and pod_namespace directly from the KubernetesJobTrigger parameters self.pod_name and self.pod_namespace.

TriggerEvent(
            {
                "name": job.metadata.name,
                "namespace": job.metadata.namespace,
                "pod_name": self.pod_name if self.get_logs else None,
                "pod_namespace": self.pod_namespace if self.get_logs else None,
                "status": "error" if error_message else "success",
                "message": f"Job failed with error: {error_message}"
                if error_message
                else "Job completed successfully",
                "job": job_dict,
                "xcom_result": xcom_result if self.do_xcom_push else None,
            }
        )

Also, a raise error is executed if the job is in failed state before being able to print the logs:


    def execute_complete(self, context: Context, event: dict, **kwargs):
        ti = context["ti"]
        ti.xcom_push(key="job", value=event["job"])
        if event["status"] == "error":
            raise AirflowException(event["message"])

        if self.get_logs:
            pod_name = event["pod_name"]
            pod_namespace = event["pod_namespace"]
            self.pod = self.hook.get_pod(pod_name, pod_namespace)
            if not self.pod:
                raise PodNotFoundException("Could not find pod after resuming from deferral")
            self._write_logs(self.pod)

But even if the job fails, we would still want to see the logs:


    def execute_complete(self, context: Context, event: dict, **kwargs):
        ti = context["ti"]
        ti.xcom_push(key="job", value=event["job"])
       

        if self.get_logs:
            pod_name = event["pod_name"]
            pod_namespace = event["pod_namespace"]
            self.pod = self.hook.get_pod(pod_name, pod_namespace)
            if not self.pod:
                raise PodNotFoundException("Could not find pod after resuming from deferral")
            self._write_logs(self.pod)

 if event["status"] == "error":
            raise AirflowException(event["message"])

How to reproduce

Write a DAG with the following operator:

from kubernetes.client import V1Pod, V1ObjectMeta, V1PodSpec, V1Container
from airflow.providers.cncf.kubernetes.operators.job import KubernetesJobOperator

# Define the pod
pod = V1Pod(
    metadata=V1ObjectMeta(name="example-pod"),
    spec=V1PodSpec(
        containers=[V1Container(
            name="example-container",
            image="busybox",
            command=["/bin/sh", "-c", "echo Hello, Kubernetes! && sleep 60"]
        )],
        restart_policy="Never"
    )
)

# Define the KubernetesJobOperator
job = KubernetesJobOperator(
    task_id="example_k8s_job",
    full_pod_spec=pod,
    get_logs=True,
    in_cluster=True
)

Anything else

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions