-
Notifications
You must be signed in to change notification settings - Fork 115
Description
After discussing with @flaviuvadan and @samj1912 we hit on a very intuitive, Pythonic way of declaring Argo concepts[1]: if we make use of context managers, then we can mirror Argo's yaml spec and sprinkle on syntactic sugar where appropriate. This gives us 1 to 1 mapping between Hera and Argo, while allowing us to specialise certain use cases, such as a "simple workflow" use case which is the current standard Hera assumption of requiring a DAG made up of Tasks (which ties up Tasks<>Templates in a 1-1 mapping).
This issue is to collect ideal usage examples and discuss Hera V5 features -- V5 is currently in the hera/v5
working branch, with the main PR as #437 which will have PRs into, and a V5 alpha will be released from this branch.
Pythonic Steps Workflow Declaration vs YAML
coinflip.py | coinflip.yaml |
---|---|
def _flip_coin():
import random
result = "heads" if random.randint(0, 1) == 0 else "tails"
print(result)
with Workflow(name="coinflip", generate_name=True) as w:
# Implicitly added in __init__ when in a Workflow context
flip_coin = ScriptTemplate(
name="flip-coin",
source=_flip_coin,
)
heads = ContainerTemplate(
name="heads",
image="alpine:3.6",
command=["sh", "-c"],
args=['echo "it was heads"'],
)
tails = ContainerTemplate(
name="tails",
image="alpine:3.6",
command=["sh", "-c"],
args=['echo "it was tails"'],
)
with Steps(name="coinflip") as coinflip:
Step(name="flip-coin", template=flip_coin)
Step(
name="heads", template=heads, when=f"{flip_coin.get_result()} == heads"
)
Step(
name="tails", template=tails, when=f"{flip_coin.get_result()} == tails"
)
w.set_entrypoint(coinflip)
w.annotations = {
"workflows.argoproj.io/description": "This is an example of coin flip defined as a sequence of conditional steps."
} |
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: coinflip-
annotations:
workflows.argoproj.io/description: |
This is an example of coin flip defined as a sequence of conditional steps.
You can also run it in Python: https://couler-proj.github.io/couler/examples/#coin-flip
spec:
entrypoint: coinflip
templates:
- name: coinflip
steps:
- - name: flip-coin
template: flip-coin
- - name: heads
template: heads
when: "{{steps.flip-coin.outputs.result}} == heads"
- name: tails
template: tails
when: "{{steps.flip-coin.outputs.result}} == tails"
- name: flip-coin
script:
image: python:alpine3.6
command: [python]
source: |
import random
result = "heads" if random.randint(0,1) == 0 else "tails"
print(result)
- name: heads
container:
image: alpine:3.6
command: [sh, -c]
args: ["echo \"it was heads\""]
- name: tails
container:
image: alpine:3.6
command: [sh, -c]
args: ["echo \"it was tails\""] |
Here we can see the direct 1-1 mapping and the yaml shown from the Argo example should be exactly what we get from Hera when calling w.to_yaml()
. It also opens the door for the coinflip-recursive.yaml example as we are no longer forced to use a DAG which prevent cyclic relationships.
Pythonic DAG Workflow Declaration vs YAML
dag_coinflip.py | dag-coinflip.yaml |
---|---|
def _flip_coin():
import random
result = "heads" if random.randint(0, 1) == 0 else "tails"
print(result)
with Workflow(name="coinflip", generate_name=True) as w:
ScriptTemplate(
name="flip-coin",
source=_flip_coin,
)
ContainerTemplate(
name="heads",
image="alpine:3.6",
command=["sh", "-c"],
args=['echo "it was heads"'],
)
ContainerTemplate(
name="tails",
image="alpine:3.6",
command=["sh", "-c"],
args=['echo "it was tails"'],
)
with Steps(name="coinflip") as coinflip:
Step(name="flip-coin", template=flip_coin)
Step(
name="heads", template=heads, when=f"{flip_coin.get_result()} == heads"
)
Step(
name="tails", template=tails, when=f"{flip_coin.get_result()} == tails"
)
with DAG(name="diamond") as diamond:
a = Task(name="A", template=coinflip)
b = Task(name="B", template=coinflip)
c = Task(name="C", template=coinflip)
d = Task(name="D", template=coinflip)
a >> [b, c] >> d # rrshift usage required to be under a DAG context manager
w.set_entrypoint(diamond)
w.annotations = {
"workflows.argoproj.io/description": "This is an example of coin flip defined as a DAG."
} |
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: dag-diamond-coinflip-
annotations:
workflows.argoproj.io/description: |
This is an example of coin flip defined as a DAG.
You can also run it in Python: https://couler-proj.github.io/couler/examples/#dag
spec:
entrypoint: diamond
templates:
- name: diamond
dag:
tasks:
- name: A
template: coinflip
- name: B
depends: "A"
template: coinflip
- name: C
depends: "A"
template: coinflip
- name: D
depends: "B && C"
template: coinflip
- name: coinflip
steps:
- - name: flip-coin
template: flip-coin
- - name: heads
template: heads
when: "{{steps.flip-coin.outputs.result}} == heads"
- name: tails
template: coinflip
when: "{{steps.flip-coin.outputs.result}} == tails"
- name: flip-coin
script:
image: python:alpine3.6
command: [python]
source: |
import random
result = "heads" if random.randint(0,1) == 0 else "tails"
print(result)
- name: heads
container:
image: alpine:3.6
command: [sh, -c]
args: ["echo \"it was heads\""] |
Note here that the Task
s defined under the DAG
context would be different to the current Hera Task
as it would expect a template
parameter to be passed in, and would no longer be used for generating the top level templates now being stored in w.templates
. Also, we may even want to consider explicitly adding the tasks to the DAG, and then defining the relationship between them, otherwise we are not sure what a declaration of a Task
actually means. Like so:
with DAG(name="diamond") as diamond:
a = Task(name="A", template=coinflip)
b = Task(name="B", template=coinflip)
c = Task(name="C", template=coinflip)
d = Task(name="D", template=coinflip)
diamond.add_tasks([a, b, c, d])
a >> [b, c] >> d # we can then validate that you are referencing existing tasks in the dag
Pythonic "Simple Workflow" Declaration
If we still want to support the canonical "simple workflow" shown in the README, where developers don't care so much about the Argo details and just want to orchestrate some tasks, we can include a SimpleWorkflow
class which assumes a DAG is used and allows rrshift DAG declaration of Tasks:
hello_hera.py
from hera import Task, SimpleWorkflow
def say(message: str):
print(message)
with SimpleWorkflow("diamond") as w:
a = Task('a', say, ['This is task A!'])
b = Task('b', say, ['This is task B!'])
c = Task('c', say, ['This is task C!'])
d = Task('d', say, ['This is task D!'])
a >> [b, c] >> d # Implicit DAG declaration only allowed in a SimpleWorkflow context manager
Note that here we don't pass in a template
to the Task
(this might require a separate class to the proposed lightweight Task
used in the examples above).
[1] we did this by trying to convert the coinflip Argo example which uses steps
, which is a missing feature in Hera, using whatever syntax we thought made sense without worrying about implementation details
Edits
- Incorporate feedback from @samj1912 to remove templates dict