Retries and timeouts

Flyte provides robust error handling through configurable retry strategies and timeout controls. These parameters help ensure task reliability and prevent resource waste from runaway processes.

Retries

The retries parameter controls how many times a failed task should be retried before giving up. A “retry” is any attempt after the initial attempt. In other words, retries=3 means the task may be attempted up to 4 times in total (1 initial + 3 retries).

The retries parameter can be configured in either the @env.task decorator or using override when invoking the task. It cannot be configured in the TaskEnvironment definition.

The code for the examples below can be found on GitHub.

Retry example

First we import the required modules and set up a task environment:

import random
from datetime import timedelta

import flyte


env = flyte.TaskEnvironment(name="my-env")

Then we configure our task to retry up to 3 times if it fails (for a total of 4 attempts). We also define the driver task main that calls the retry task:

@env.task(retries=3)
async def retry() -> str:
    if random.random() < 0.7:  # 70% failure rate
        raise Exception("Task failed!")
    return "Success!"


@env.task
async def main() -> list[str]:
    results = []
    try:
        results.append(await retry())
    except Exception as e:
        results.append(f"Failed: {e}")
    try:
        results.append(await retry.override(retries=5)())
    except Exception as e:
        results.append(f"Failed: {e}")
    return results

Note that we call retry twice: first without any override, and then with an override to increase the retries to 5 (for a total of 6 attempts).

Finally, we configure flyte and invoke the main task:

if __name__ == "__main__":
    flyte.init_from_config()
    run = flyte.run(main)
    print(run.name)
    print(run.url)
    run.wait(run)

Timeouts

The timeout parameter sets limits on how long a task can run, preventing resource waste from stuck processes. It supports multiple formats for different use cases.

The timeout parameter can be configured in either the @env.task decorator or using override when invoking the task. It cannot be configured in the TaskEnvironment definition.

The code for the example below can be found on GitHub.

Timeout example

First, we import the required modules and set up a task environment:

import random
from datetime import timedelta
import asyncio

import flyte
from flyte import Timeout

env = flyte.TaskEnvironment(name="my-env")

Our first task sets a timeout using seconds as an integer:

@env.task(timeout=60)  # 60 seconds
async def timeout_seconds() -> str:
    await asyncio.sleep(random.randint(0, 120))  # Random wait between 0 and 120 seconds
    return "timeout_seconds completed"

We can also set a timeout using a timedelta object for more readable durations:

@env.task(timeout=timedelta(minutes=1))
async def timeout_timedelta() -> str:
    await asyncio.sleep(random.randint(0, 120))  # Random wait between 0 and 120 seconds
    return "timeout_timedelta completed"

You can also set separate timeouts for maximum execution time and maximum queue time using the Timeout class:

@env.task(timeout=Timeout(
    max_runtime=timedelta(minutes=1),      # Max execution time per attempt
    max_queued_time=timedelta(minutes=1)   # Max time in queue before starting
))
async def timeout_advanced() -> str:
    await asyncio.sleep(random.randint(0, 120))  # Random wait between 0 and 120 seconds
    return "timeout_advanced completed"

You can also combine retries and timeouts for resilience and resource control:

@env.task(
    retries=3,
    timeout=Timeout(
        max_runtime=timedelta(minutes=1),
        max_queued_time=timedelta(minutes=1)
    )
)
async def timeout_with_retry() -> str:
    await asyncio.sleep(random.randint(0, 120))  # Random wait between 0 and 120 seconds
    return "timeout_advanced completed"

Here we specify:

  • Up to 3 retry attempts.
  • Each attempt times out after 1 minute.
  • Task fails if queued for more than 1 minute.
  • Total possible runtime: 1 minute queue + (1 minute × 3 attempts).

We define the main driver task that calls all the timeout tasks concurrently and returns their outputs as a list. The return value for failed tasks will indicate failure:

@env.task
async def main() -> list[str]:
    tasks = [
        timeout_seconds(),
        timeout_seconds.override(timeout=120)(),  # Override to 120 seconds
        timeout_timedelta(),
        timeout_advanced(),
        timeout_with_retry(),
    ]
    results = await asyncio.gather(*tasks, return_exceptions=True)
    output = []
    for r in results:
        if isinstance(r, Exception):
            output.append(f"Failed: {r}")
        else:
            output.append(r)
    return output

Note that we also demonstrate overriding the timeout for timeout_seconds to 120 seconds when calling it.

Finally, we configure Flyte and invoke the main task:

if __name__ == "__main__":
    flyte.init_from_config()
    run = flyte.run(main)
    print(run.name)
    print(run.url)
    run.wait(run)

Proper retry and timeout configuration ensures your Flyte workflows are both reliable and efficient, handling transient failures gracefully while preventing resource waste.