Retries and timeouts
Flyte provides robust error handling through configurable retry strategies and timeout controls. These parameters help ensure task reliability and prevent resource waste from runaway processes.
Retries
The retries
parameter controls how many times a failed task should be retried before giving up.
A “retry” is any attempt after the initial attempt.
In other words, retries=3
means the task may be attempted up to 4 times in total (1 initial + 3 retries).
The retries
parameter can be configured in either the @env.task
decorator or using override
when invoking the task.
It cannot be configured in the TaskEnvironment
definition.
The code for the examples below can be found on GitHub.
Retry example
First we import the required modules and set up a task environment:
import random
from datetime import timedelta
import flyte
env = flyte.TaskEnvironment(name="my-env")
Then we configure our task to retry up to 3 times if it fails (for a total of 4 attempts). We also define the driver task main
that calls the retry
task:
@env.task(retries=3)
async def retry() -> str:
if random.random() < 0.7: # 70% failure rate
raise Exception("Task failed!")
return "Success!"
@env.task
async def main() -> list[str]:
results = []
try:
results.append(await retry())
except Exception as e:
results.append(f"Failed: {e}")
try:
results.append(await retry.override(retries=5)())
except Exception as e:
results.append(f"Failed: {e}")
return results
Note that we call retry
twice: first without any override
, and then with an override
to increase the retries to 5 (for a total of 6 attempts).
Finally, we configure flyte and invoke the main
task:
if __name__ == "__main__":
flyte.init_from_config()
run = flyte.run(main)
print(run.name)
print(run.url)
run.wait(run)
Timeouts
The timeout
parameter sets limits on how long a task can run, preventing resource waste from stuck processes.
It supports multiple formats for different use cases.
The timeout
parameter can be configured in either the @env.task
decorator or using override
when invoking the task.
It cannot be configured in the TaskEnvironment
definition.
The code for the example below can be found on GitHub.
Timeout example
First, we import the required modules and set up a task environment:
import random
from datetime import timedelta
import asyncio
import flyte
from flyte import Timeout
env = flyte.TaskEnvironment(name="my-env")
Our first task sets a timeout using seconds as an integer:
@env.task(timeout=60) # 60 seconds
async def timeout_seconds() -> str:
await asyncio.sleep(random.randint(0, 120)) # Random wait between 0 and 120 seconds
return "timeout_seconds completed"
We can also set a timeout using a timedelta
object for more readable durations:
@env.task(timeout=timedelta(minutes=1))
async def timeout_timedelta() -> str:
await asyncio.sleep(random.randint(0, 120)) # Random wait between 0 and 120 seconds
return "timeout_timedelta completed"
You can also set separate timeouts for maximum execution time and maximum queue time using the Timeout
class:
@env.task(timeout=Timeout(
max_runtime=timedelta(minutes=1), # Max execution time per attempt
max_queued_time=timedelta(minutes=1) # Max time in queue before starting
))
async def timeout_advanced() -> str:
await asyncio.sleep(random.randint(0, 120)) # Random wait between 0 and 120 seconds
return "timeout_advanced completed"
You can also combine retries and timeouts for resilience and resource control:
@env.task(
retries=3,
timeout=Timeout(
max_runtime=timedelta(minutes=1),
max_queued_time=timedelta(minutes=1)
)
)
async def timeout_with_retry() -> str:
await asyncio.sleep(random.randint(0, 120)) # Random wait between 0 and 120 seconds
return "timeout_advanced completed"
Here we specify:
- Up to 3 retry attempts.
- Each attempt times out after 1 minute.
- Task fails if queued for more than 1 minute.
- Total possible runtime: 1 minute queue + (1 minute × 3 attempts).
We define the main
driver task that calls all the timeout tasks concurrently and returns their outputs as a list. The return value for failed tasks will indicate failure:
@env.task
async def main() -> list[str]:
tasks = [
timeout_seconds(),
timeout_seconds.override(timeout=120)(), # Override to 120 seconds
timeout_timedelta(),
timeout_advanced(),
timeout_with_retry(),
]
results = await asyncio.gather(*tasks, return_exceptions=True)
output = []
for r in results:
if isinstance(r, Exception):
output.append(f"Failed: {r}")
else:
output.append(r)
return output
Note that we also demonstrate overriding the timeout for timeout_seconds
to 120 seconds when calling it.
Finally, we configure Flyte and invoke the main
task:
if __name__ == "__main__":
flyte.init_from_config()
run = flyte.run(main)
print(run.name)
print(run.url)
run.wait(run)
Proper retry and timeout configuration ensures your Flyte workflows are both reliable and efficient, handling transient failures gracefully while preventing resource waste.