Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions src/prefect/client/schemas/filters.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,22 @@ class FlowRunFilterStartTime(PrefectBaseModel):
)


class FlowRunFilterEndTime(PrefectBaseModel):
"""Filter by `FlowRun.end_time`."""

before_: Optional[DateTime] = Field(
default=None,
description="Only include flow runs ending at or before this time",
)
after_: Optional[DateTime] = Field(
default=None,
description="Only include flow runs ending at or after this time",
)
is_null_: Optional[bool] = Field(
default=None, description="If true, only return flow runs without an end time"
)


class FlowRunFilterExpectedStartTime(PrefectBaseModel):
"""Filter by `FlowRun.expected_start_time`."""

Expand Down Expand Up @@ -308,6 +324,9 @@ class FlowRunFilter(PrefectBaseModel, OperatorMixin):
start_time: Optional[FlowRunFilterStartTime] = Field(
default=None, description="Filter criteria for `FlowRun.start_time`"
)
end_time: Optional[FlowRunFilterEndTime] = Field(
default=None, description="Filter criteria for `FlowRun.end_time`"
)
expected_start_time: Optional[FlowRunFilterExpectedStartTime] = Field(
default=None, description="Filter criteria for `FlowRun.expected_start_time`"
)
Expand Down Expand Up @@ -430,6 +449,22 @@ class TaskRunFilterStartTime(PrefectBaseModel):
)


class TaskRunFilterEndTime(PrefectBaseModel):
"""Filter by `TaskRun.end_time`."""

before_: Optional[DateTime] = Field(
default=None,
description="Only include task runs ending at or before this time",
)
after_: Optional[DateTime] = Field(
default=None,
description="Only include task runs ending at or after this time",
)
is_null_: Optional[bool] = Field(
default=None, description="If true, only return task runs without an end time"
)


class TaskRunFilter(PrefectBaseModel, OperatorMixin):
"""Filter task runs. Only task runs matching all criteria will be returned"""

Expand All @@ -448,6 +483,9 @@ class TaskRunFilter(PrefectBaseModel, OperatorMixin):
start_time: Optional[TaskRunFilterStartTime] = Field(
default=None, description="Filter criteria for `TaskRun.start_time`"
)
end_time: Optional[TaskRunFilterEndTime] = Field(
default=None, description="Filter criteria for `TaskRun.end_time`"
)
subflow_runs: Optional[TaskRunFilterSubFlowRuns] = Field(
default=None, description="Filter criteria for `TaskRun.subflow_run`"
)
Expand Down
37 changes: 37 additions & 0 deletions src/prefect/server/schemas/filters.py
Original file line number Diff line number Diff line change
Expand Up @@ -955,6 +955,38 @@ def _get_filter_list(
return filters


class TaskRunFilterEndTime(PrefectFilterBaseModel):
"""Filter by `TaskRun.end_time`."""

before_: Optional[DateTime] = Field(
default=None,
description="Only include task runs ending at or before this time",
)
after_: Optional[DateTime] = Field(
default=None,
description="Only include task runs ending at or after this time",
)
is_null_: Optional[bool] = Field(
default=None, description="If true, only return task runs without an end time"
)

def _get_filter_list(
self, db: "PrefectDBInterface"
) -> Iterable[sa.ColumnExpressionArgument[bool]]:
filters: list[sa.ColumnExpressionArgument[bool]] = []
if self.before_ is not None:
filters.append(db.TaskRun.end_time <= self.before_)
if self.after_ is not None:
filters.append(db.TaskRun.end_time >= self.after_)
if self.is_null_ is not None:
filters.append(
db.TaskRun.end_time.is_(None)
if self.is_null_
else db.TaskRun.end_time.is_not(None)
)
return filters


class TaskRunFilterExpectedStartTime(PrefectFilterBaseModel):
"""Filter by `TaskRun.expected_start_time`."""

Expand Down Expand Up @@ -996,6 +1028,9 @@ class TaskRunFilter(PrefectOperatorFilterBaseModel):
start_time: Optional[TaskRunFilterStartTime] = Field(
default=None, description="Filter criteria for `TaskRun.start_time`"
)
end_time: Optional[TaskRunFilterEndTime] = Field(
default=None, description="Filter criteria for `TaskRun.end_time`"
)
expected_start_time: Optional[TaskRunFilterExpectedStartTime] = Field(
default=None, description="Filter criteria for `TaskRun.expected_start_time`"
)
Expand All @@ -1021,6 +1056,8 @@ def _get_filter_list(
filters.append(self.state.as_sql_filter())
if self.start_time is not None:
filters.append(self.start_time.as_sql_filter())
if self.end_time is not None:
filters.append(self.end_time.as_sql_filter())
if self.expected_start_time is not None:
filters.append(self.expected_start_time.as_sql_filter())
if self.subflow_runs is not None:
Expand Down
67 changes: 67 additions & 0 deletions tests/server/models/test_filters.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ def create_task_run(task_run):
tags=["db", "blue"],
state=prefect.server.schemas.states.Completed(),
deployment_id=d_1_1.id,
start_time=now("UTC") - timedelta(hours=1),
)
)

Expand All @@ -153,6 +154,7 @@ def create_task_run(task_run):
name="sad-duck",
tags=["db", "blue"],
state=prefect.server.schemas.states.Completed(),
start_time=now("UTC") - timedelta(hours=1),
)
)
await create_flow_run(
Expand All @@ -162,6 +164,7 @@ def create_task_run(task_run):
tags=["db", "red"],
state=prefect.server.schemas.states.Failed(),
deployment_id=d_1_1.id,
start_time=now("UTC") - timedelta(hours=1),
)
)
await create_flow_run(
Expand All @@ -187,6 +190,7 @@ def create_task_run(task_run):
name="another-test-happy-duck",
tags=["db", "blue"],
state=prefect.server.schemas.states.Completed(),
start_time=now("UTC") - timedelta(hours=1),
)
)

Expand All @@ -203,6 +207,7 @@ def create_task_run(task_run):
flow_id=f_2.id,
tags=["db", "red"],
state=prefect.server.schemas.states.Failed(),
start_time=now("UTC") - timedelta(hours=1),
)
)

Expand All @@ -215,6 +220,7 @@ def create_task_run(task_run):
state=prefect.server.schemas.states.Completed(),
deployment_id=d_3_1.id,
work_queue_id=wp.default_queue_id,
start_time=now("UTC") - timedelta(hours=1),
)
)

Expand Down Expand Up @@ -246,6 +252,7 @@ def create_task_run(task_run):
task_key="b",
state=prefect.server.schemas.states.Completed(),
dynamic_key="0",
start_time=now("UTC") - timedelta(hours=1),
)
)
await create_task_run(
Expand All @@ -255,6 +262,7 @@ def create_task_run(task_run):
task_key="c",
state=prefect.server.schemas.states.Completed(),
dynamic_key="0",
start_time=now("UTC") - timedelta(hours=1),
)
)

Expand All @@ -272,6 +280,7 @@ def create_task_run(task_run):
task_key="b",
state=prefect.server.schemas.states.Completed(),
dynamic_key="0",
start_time=now("UTC") - timedelta(hours=1),
)
)
await create_task_run(
Expand All @@ -280,6 +289,7 @@ def create_task_run(task_run):
task_key="c",
state=prefect.server.schemas.states.Completed(),
dynamic_key="0",
start_time=now("UTC") - timedelta(hours=1),
)
)

Expand All @@ -289,6 +299,7 @@ def create_task_run(task_run):
task_key="a",
state=prefect.server.schemas.states.Failed(),
dynamic_key="0",
start_time=now("UTC") - timedelta(hours=1),
)
)

Expand Down Expand Up @@ -584,6 +595,34 @@ class TestCountFlowRunModels:
),
1,
],
# flow runs with end_time set (completed or failed)
[
dict(flow_run_filter=filters.FlowRunFilter(end_time=dict(is_null_=False))),
6,
],
# flow runs with null end_time (running or scheduled)
[
dict(flow_run_filter=filters.FlowRunFilter(end_time=dict(is_null_=True))),
6,
],
# flow runs with end_time before now + 1 day (all completed/failed runs)
[
dict(
flow_run_filter=filters.FlowRunFilter(
end_time=dict(before_=now("UTC") + timedelta(days=1))
)
),
6,
],
# flow runs with end_time after now + 1 day (none)
[
dict(
flow_run_filter=filters.FlowRunFilter(
end_time=dict(after_=now("UTC") + timedelta(days=1))
)
),
0,
],
# empty filter
[dict(flow_filter=filters.FlowFilter()), 12],
# multiple empty filters
Expand Down Expand Up @@ -775,6 +814,34 @@ class TestCountTaskRunsModels:
),
0,
],
# task runs with end_time set (completed or failed)
[
dict(task_run_filter=filters.TaskRunFilter(end_time=dict(is_null_=False))),
5,
],
# task runs with null end_time (running or no state)
[
dict(task_run_filter=filters.TaskRunFilter(end_time=dict(is_null_=True))),
5,
],
# task runs with end_time before now + 1 day (all completed/failed runs)
[
dict(
task_run_filter=filters.TaskRunFilter(
end_time=dict(before_=now("UTC") + timedelta(days=1))
)
),
5,
],
# task runs with end_time after now + 1 day (none)
[
dict(
task_run_filter=filters.TaskRunFilter(
end_time=dict(after_=now("UTC") + timedelta(days=1))
)
),
0,
],
# empty filter
[dict(flow_filter=filters.FlowFilter()), 10],
# multiple empty filters
Expand Down
15 changes: 0 additions & 15 deletions tests/server/schemas/test_filters.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,21 +51,6 @@ def test_applies_multiple_conditions(self, db):


class TestFlowRunFilters:
def test_applies_flow_run_end_time_filter_before(self, db):
flow_run_filter = FlowRunFilter(end_time={"before_": NOW})
sql_filter = flow_run_filter.as_sql_filter()
assert sql_filter.compare(sa.and_(db.FlowRun.end_time <= NOW))

def test_applies_flow_run_end_time_filter_after(self, db):
flow_run_filter = FlowRunFilter(end_time={"after_": NOW})
sql_filter = flow_run_filter.as_sql_filter()
assert sql_filter.compare(sa.and_(db.FlowRun.end_time >= NOW))

def test_applies_flow_run_end_time_filter_null(self, db):
flow_run_filter = FlowRunFilter(end_time={"is_null_": True})
sql_filter = flow_run_filter.as_sql_filter()
assert sql_filter.compare(sa.and_(db.FlowRun.end_time.is_(None)))

def test_coalesces_start_time_and_expected_start_time_after_(self, db):
flow_run_filter = FlowRunFilter(start_time={"after_": NOW})
sql_filter = flow_run_filter.as_sql_filter()
Expand Down
23 changes: 23 additions & 0 deletions ui-v2/src/api/prefect.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8932,13 +8932,36 @@ export interface components {
state?: components["schemas"]["TaskRunFilterState"] | null;
/** @description Filter criteria for `TaskRun.start_time` */
start_time?: components["schemas"]["TaskRunFilterStartTime"] | null;
/** @description Filter criteria for `TaskRun.end_time` */
end_time?: components["schemas"]["TaskRunFilterEndTime"] | null;
/** @description Filter criteria for `TaskRun.expected_start_time` */
expected_start_time?: components["schemas"]["TaskRunFilterExpectedStartTime"] | null;
/** @description Filter criteria for `TaskRun.subflow_run` */
subflow_runs?: components["schemas"]["TaskRunFilterSubFlowRuns"] | null;
/** @description Filter criteria for `TaskRun.flow_run_id` */
flow_run_id?: components["schemas"]["TaskRunFilterFlowRunId"] | null;
};
/**
* TaskRunFilterEndTime
* @description Filter by `TaskRun.end_time`.
*/
TaskRunFilterEndTime: {
/**
* Before
* @description Only include task runs ending at or before this time
*/
before_?: string | null;
/**
* After
* @description Only include task runs ending at or after this time
*/
after_?: string | null;
/**
* Is Null
* @description If true, only return task runs without an end time
*/
is_null_?: boolean | null;
};
/**
* TaskRunFilterExpectedStartTime
* @description Filter by `TaskRun.expected_start_time`.
Expand Down
5 changes: 2 additions & 3 deletions ui-v2/src/api/zod/events/events.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
/**
* Generated by orval v7.13.0 🍺
* Generated by orval v7.13.1 🍺
* Do not edit manually.
* Prefect Prefect REST API
* OpenAPI spec version: 0.1.0
*/
import { z as zod } from "zod";
import * as zod from "zod";

/**
* Record a batch of Events.
Expand Down Expand Up @@ -68,7 +68,6 @@ export const readEventsEventsFilterPostBodyFilterResourceDistinctDefault = false
export const readEventsEventsFilterPostBodyFilterTextQueryMax = 200;
export const readEventsEventsFilterPostBodyLimitDefault = 50;
export const readEventsEventsFilterPostBodyLimitMin = 0;

export const readEventsEventsFilterPostBodyLimitMax = 50;

export const readEventsEventsFilterPostBody = zod.object({
Expand Down