Skip to content

Conversation

desertaxle
Copy link
Member

@desertaxle desertaxle commented Aug 28, 2025

This PR adds a dedicated @retry decorator providing fine-grained control over retry behavior for both sync and async functions outside of the existing @flow and @task decorators.

At a high level, this includes:

  • New prefect.retries module with @retry decorator
  • Stop conditions system with composable boolean logic (&, |, ~ operators)
  • Flexible wait strategies supporting fixed delays, callable wait providers, and custom timing logic
  • Hook system for observing retry attempts with before_attempt, on_success, on_failure, before_wait, and after_wait hooks
  • Full async support with separate async attempt generators and context managers

I'm planning on incorporating these changes with the flow and task engines, so I'll leave this in draft until I've made those changes in a stacked PR.

Examples

Basic retry with an attempt limit

from prefect.retries import retry
from prefect.retries.stop_conditions import AttemptsExhausted

@retry(until=AttemptsExhausted(3))
def fetch_data():
    # Will retry up to 3 times on any exception
    response = requests.get("https://api.example.com/data")
    response.raise_for_status()
    return response.json()

Retry with fixed wait time

from prefect.retries import retry
from prefect.retries.stop_conditions import AttemptsExhausted

@retry(until=AttemptsExhausted(5), wait=2.5)
def process_file(filename):
    # Wait 2.5 seconds between retry attempts
    with open(filename) as f:
        return expensive_processing(f.read())

Exponential backoff strategy

from prefect.retries import retry
from prefect.retries.stop_conditions import AttemptsExhausted

def exponential_backoff(last_attempt, current_attempt):
    return min(2 ** (current_attempt.attempt - 1), 60)  # Cap at 60 seconds

@retry(until=AttemptsExhausted(6), wait=exponential_backoff)
def api_call_with_backoff():
    # 1s, 2s, 4s, 8s, 16s, 32s, 60s wait times
    return make_api_request()

Stop on specific exception types

from prefect.retries import retry
from prefect.retries.stop_conditions import ExceptionMatches

@retry(until=~ExceptionMatches(ValueError))  # Stop if ValueError is raised
def parse_data(raw_data):
    # Will retry on any exception EXCEPT ValueError
    return json.loads(raw_data)

Composable stop conditions

from prefect.retries import retry
from prefect.retries.stop_conditions import AttemptsExhausted, ExceptionMatches

# Retry up to 5 times OR until we get a ValueError
@retry(until=AttemptsExhausted(5) | ExceptionMatches(ValueError))
def complex_operation():
    return perform_complex_task()

Using hooks for observability

from prefect.retries import retry
from prefect.retries.stop_conditions import AttemptsExhausted

@retry(until=AttemptsExhausted(3), wait=1)
def monitored_function():
    return unreliable_operation()

@monitored_function.before_attempt
def log_attempt(state):
    print(f"Starting attempt {state.attempt}")

@monitored_function.on_failure
def log_failure(state):
    print(f"Attempt {state.attempt} failed: {state.exception}")

@monitored_function.on_success
def log_success(state):
    print(f"Succeeded on attempt {state.attempt}")

Async function support

@retry(until=AttemptsExhausted(3), wait=1.0)
async def fetch_async_data():
    async with httpx.AsyncClient() as client:
        response = await client.get("https://api.example.com/data")
        response.raise_for_status()
        return response.json()

Dynamic wait times

from prefect.retries import retry
from prefect.retries.stop_conditions import AttemptsExhausted

def adaptive_wait(last_attempt, current_attempt):
    if current_attempt.attempt <= 2:
        return 1  # Short wait for first retries
    else:
        return 10  # Longer wait for later retries

@retry(until=AttemptsExhausted(5), wait=adaptive_wait)
def adaptive_retry_function():
    return flaky_operation()

Copy link

codspeed-hq bot commented Aug 28, 2025

CodSpeed Performance Report

Merging #18814 will not alter performance

Comparing add-retry-decorator (a70ff4d) with main (9a19b45)

Summary

✅ 2 untouched benchmarks

@desertaxle desertaxle added feature A new feature and removed docs labels Aug 28, 2025
@zzstoatzz
Copy link
Collaborator

can we consolidate / replace prefect._internal.retries with this?

@desertaxle
Copy link
Member Author

desertaxle commented Sep 5, 2025

can we consolidate / replace prefect._internal.retries with this?

Yeah, I'd like to replace all other retry implementations with this one once this is merged, which will include removing tenacity from some of the integration libraries too.

Copy link
Contributor

This pull request is stale because it has been open 14 days with no activity. To keep this pull request open remove stale label or comment.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feature A new feature
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants