Skip to content
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ dependencies = [
"redo>=2.0",
"requests>=2.25",
"slugid>=2.0",
"taskcluster>=55.0",
"taskcluster-urls>=11.0",
"voluptuous>=0.12.1",
]
Expand Down
12 changes: 7 additions & 5 deletions src/taskgraph/actions/retrigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ def retrigger_action(parameters, graph_config, input, task_group_id, task_id):
)

task = taskcluster.get_task_definition(task_id)
label = task["metadata"]["name"]
label = task["metadata"]["name"] # type: ignore

with_downstream = " "
to_run = [label]
Expand All @@ -163,7 +163,8 @@ def retrigger_action(parameters, graph_config, input, task_group_id, task_id):
to_run = full_task_graph.graph.transitive_closure(
set(to_run), reverse=True
).nodes
to_run = to_run & set(label_to_taskid.keys())
if label_to_taskid:
to_run = to_run & set(label_to_taskid.keys())
with_downstream = " (with downstream) "

times = input.get("times", 1)
Expand Down Expand Up @@ -201,8 +202,8 @@ def rerun_action(parameters, graph_config, input, task_group_id, task_id):
decision_task_id, full_task_graph, label_to_taskid = fetch_graph_and_labels(
parameters, graph_config, task_group_id=task_group_id
)
label = task["metadata"]["name"]
if task_id not in label_to_taskid.values():
label = task["metadata"]["name"] # type: ignore
if label_to_taskid and task_id not in label_to_taskid.values():
logger.error(
f"Refusing to rerun {label}: taskId {task_id} not in decision task {decision_task_id} label_to_taskid!"
)
Expand Down Expand Up @@ -276,7 +277,8 @@ def retrigger_multiple(parameters, graph_config, input, task_group_id, task_id):
# In practice, this shouldn't matter, as only completed tasks
# are pulled in from other pushes and treeherder won't pass
# those labels.
_rerun_task(label_to_taskid[label], label)
if label_to_taskid and label in label_to_taskid:
_rerun_task(label_to_taskid[label], label)

for j in range(times):
suffix = f"{i}-{j}"
Expand Down
10 changes: 4 additions & 6 deletions src/taskgraph/actions/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,10 @@ def fetch_graph_and_labels(parameters, graph_config, task_group_id=None):
# for old ones
def fetch_action(task_id):
logger.info(f"fetching label-to-taskid.json for action task {task_id}")
try:
run_label_to_id = get_artifact(task_id, "public/label-to-taskid.json")
run_label_to_id = get_artifact(task_id, "public/label-to-taskid.json")
if label_to_taskid and run_label_to_id:
label_to_taskid.update(run_label_to_id)
except HTTPError as e:
if e.response.status_code != 404:
raise
else:
logger.debug(f"No label-to-taskid.json found for {task_id}: {e}")

# for backwards compatibility, look up actions via pushlog-id
Expand All @@ -84,7 +82,7 @@ def fetch_cron(task_id):
logger.info(f"fetching label-to-taskid.json for cron task {task_id}")
try:
run_label_to_id = get_artifact(task_id, "public/label-to-taskid.json")
label_to_taskid.update(run_label_to_id)
label_to_taskid.update(run_label_to_id) # type: ignore
except HTTPError as e:
if e.response.status_code != 404:
raise
Expand Down
21 changes: 3 additions & 18 deletions src/taskgraph/create.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,14 @@


import logging
import os
import sys
from concurrent import futures

from slugid import nice as slugid

from taskgraph.util import json
from taskgraph.util.parameterization import resolve_timestamps
from taskgraph.util.taskcluster import CONCURRENCY, get_session
from taskgraph.util.taskcluster import CONCURRENCY, get_session, get_taskcluster_client
from taskgraph.util.time import current_json_time

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -104,9 +103,6 @@ def submit(task_id, label, task_def):


def create_task(session, task_id, label, task_def):
# create the task using 'http://taskcluster/queue', which is proxied to the queue service
# with credentials appropriate to this task.

# Resolve timestamps
now = current_json_time(datetime_format=True)
task_def = resolve_timestamps(now, task_def)
Expand All @@ -123,16 +119,5 @@ def create_task(session, task_id, label, task_def):
return

logger.info(f"Creating task with taskId {task_id} for {label}")
proxy_url = os.environ.get("TASKCLUSTER_PROXY_URL", "http://taskcluster").rstrip(
"/"
)
res = session.put(
f"{proxy_url}/queue/v1/task/{task_id}",
json=task_def,
)
if res.status_code != 200:
try:
logger.error(res.json()["message"])
except Exception:
logger.error(res.text)
res.raise_for_status()
queue = get_taskcluster_client("queue")
queue.createTask(task_id, task_def)
10 changes: 5 additions & 5 deletions src/taskgraph/docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,12 +270,12 @@ def load_task(task_id, remove=True, user=None):
task_def = get_task_definition(task_id)

if (
impl := task_def.get("tags", {}).get("worker-implementation")
impl := task_def.get("tags", {}).get("worker-implementation") # type: ignore
) != "docker-worker":
print(f"Tasks with worker-implementation '{impl}' are not supported!")
return 1

command = task_def["payload"].get("command")
command = task_def["payload"].get("command") # type: ignore
if not command or not command[0].endswith("run-task"):
print("Only tasks using `run-task` are supported!")
return 1
Expand Down Expand Up @@ -308,18 +308,18 @@ def load_task(task_id, remove=True, user=None):
else:
task_cwd = "$TASK_WORKDIR"

image_task_id = task_def["payload"]["image"]["taskId"]
image_task_id = task_def["payload"]["image"]["taskId"] # type: ignore
image_tag = load_image_by_task_id(image_task_id)

# Set some env vars the worker would normally set.
env = {
"RUN_ID": "0",
"TASK_GROUP_ID": task_def.get("taskGroupId", ""),
"TASK_GROUP_ID": task_def.get("taskGroupId", ""), # type: ignore
"TASK_ID": task_id,
"TASKCLUSTER_ROOT_URL": get_root_url(False),
}
# Add the task's environment variables.
env.update(task_def["payload"].get("env", {}))
env.update(task_def["payload"].get("env", {})) # type: ignore

envfile = None
initfile = None
Expand Down
5 changes: 3 additions & 2 deletions src/taskgraph/optimize/strategies.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,15 @@ def should_replace_task(self, task, params, deadline, arg):
status = status_task(task_id)
# status can be `None` if we're in `testing` mode
# (e.g. test-action-callback)
if not status or status.get("state") in ("exception", "failed"):
if not status or status.get("state") in ("exception", "failed"): # type: ignore
logger.debug(
f"not replacing {task.label} with {task_id} because it is in failed or exception state"
)
continue

if deadline and datetime.strptime(
status["expires"], self.fmt
status["expires"], # type: ignore
self.fmt,
) < datetime.strptime(deadline, self.fmt):
logger.debug(
f"not replacing {task.label} with {task_id} because it expires before {deadline}"
Expand Down
Loading
Loading