baldur.interfaces — Task Queue
The task-queue contract (sync and async) with its status/priority enums, result and option DTOs, and the queue exception hierarchy. Adapter authors implement these to back Baldur on Celery, RQ, arq, and others.
Enums
TaskStatus
Bases: str, Enum
Task execution status
TaskPriority
Bases: IntEnum
Task priority levels (higher = processed sooner)
DTOs
TaskResult
dataclass
TaskResult(
task_id: str,
status: TaskStatus,
result: Any | None = None,
error: str | None = None,
traceback: str | None = None,
retries: int = 0,
started_at: datetime | None = None,
completed_at: datetime | None = None,
)
Result of task execution or status check.
Immutable dataclass representing the outcome or current state of a queued task.
Attributes:
| Name | Type | Description |
|---|---|---|
task_id |
str
|
Unique task identifier |
status |
TaskStatus
|
Current task status |
result |
Any | None
|
Return value if task succeeded |
error |
str | None
|
Error message if task failed |
traceback |
str | None
|
Full traceback string if task failed |
retries |
int
|
Number of retry attempts made |
started_at |
datetime | None
|
When task execution began |
completed_at |
datetime | None
|
When task execution completed |
is_finished
property
is_finished: bool
Check if task has completed (success or failure).
is_successful
property
is_successful: bool
Check if task completed successfully.
duration
property
duration: timedelta | None
Calculate task execution duration.
TaskOptions
dataclass
TaskOptions(
countdown: int | None = None,
eta: datetime | None = None,
expires: datetime | None = None,
retry: bool = True,
max_retries: int = 3,
retry_backoff: bool = True,
retry_backoff_max: int = 600,
retry_jitter: bool = True,
queue: str | None = None,
priority: TaskPriority = TaskPriority.NORMAL,
timeout: int | None = None,
soft_timeout: int | None = None,
)
Options for task enqueueing.
Configures how a task should be executed, including scheduling, retries, and queue selection.
Attributes:
| Name | Type | Description |
|---|---|---|
countdown |
int | None
|
Delay in seconds before execution |
eta |
datetime | None
|
Exact time to execute task |
expires |
datetime | None
|
Task expiration time (won't run after this) |
retry |
bool
|
Enable automatic retries on failure |
max_retries |
int
|
Maximum retry attempts |
retry_backoff |
bool
|
Use exponential backoff between retries |
retry_backoff_max |
int
|
Maximum backoff delay in seconds |
retry_jitter |
bool
|
Add randomness to backoff delays |
queue |
str | None
|
Target queue name (default uses 'default') |
priority |
TaskPriority
|
Task priority (higher = processed sooner) |
timeout |
int | None
|
Task execution timeout in seconds |
soft_timeout |
int | None
|
Soft timeout (raises SoftTimeLimitExceeded) |
with_countdown
with_countdown(seconds: int) -> TaskOptions
Create new options with countdown.
with_priority
with_priority(priority: TaskPriority) -> TaskOptions
Create new options with priority.
ScheduleInfo
dataclass
ScheduleInfo(
schedule_id: str,
task_name: str,
interval: timedelta,
args: tuple = tuple(),
kwargs: dict = dict(),
last_run: datetime | None = None,
next_run: datetime | None = None,
enabled: bool = True,
)
Information about a periodic schedule.
Attributes:
| Name | Type | Description |
|---|---|---|
schedule_id |
str
|
Unique schedule identifier |
task_name |
str
|
Name of the scheduled task |
interval |
timedelta
|
Execution interval |
args |
tuple
|
Positional arguments for task |
kwargs |
dict
|
Keyword arguments for task |
last_run |
datetime | None
|
When task last executed |
next_run |
datetime | None
|
When task will next execute |
enabled |
bool
|
Whether schedule is active |
Exceptions
TaskQueueError
TaskQueueError(message: str = '', *, code: str = '')
TaskNotFoundError
TaskNotFoundError(message: str = '', *, code: str = '')
TaskTimeoutError
TaskTimeoutError(message: str = '', *, code: str = '')
TaskRevokedError
TaskRevokedError(message: str = '', *, code: str = '')
Interfaces
TaskQueueInterface
Bases: ABC
Abstract interface for async task queues.
This interface defines the contract for background task execution systems. It enables the baldur system to work with different task queue backends interchangeably.
Implementations
- CeleryTaskAdapter (current - Celery)
- RQTaskAdapter (planned - Redis Queue)
- DramatiqTaskAdapter (planned)
- SyncTaskAdapter (for testing - synchronous execution)
Example
queue = ProviderRegistry.get_queue()
Enqueue a task
task_id = queue.enqueue( ... "process_payment", ... args=(order_id,), ... options=TaskOptions(priority=TaskPriority.HIGH), ... )
Check result later
result = queue.get_result(task_id) if result.is_successful: ... print(f"Payment processed: {result.result}")
provider_name
abstractmethod
property
provider_name: str
Return the provider name.
Returns:
| Type | Description |
|---|---|
str
|
Provider identifier (e.g., 'celery', 'rq', 'dramatiq') |
task
abstractmethod
task(
name: str | None = None,
bind: bool = False,
max_retries: int = 3,
autoretry_for: tuple[type[Exception], ...] = (),
retry_backoff: bool = True,
retry_backoff_max: int = 600,
retry_jitter: bool = True,
rate_limit: str | None = None,
time_limit: int | None = None,
soft_time_limit: int | None = None,
) -> Callable[[F], F]
Decorator to register a function as a task.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str | None
|
Task name (default: function qualified name) |
None
|
bind
|
bool
|
If True, pass task instance as first argument |
False
|
max_retries
|
int
|
Maximum retry attempts on failure |
3
|
autoretry_for
|
tuple[type[Exception], ...]
|
Exception types to automatically retry |
()
|
retry_backoff
|
bool
|
Use exponential backoff between retries |
True
|
retry_backoff_max
|
int
|
Maximum backoff delay in seconds |
600
|
retry_jitter
|
bool
|
Add randomness to prevent thundering herd |
True
|
rate_limit
|
str | None
|
Rate limit (e.g., "10/m" for 10 per minute) |
None
|
time_limit
|
int | None
|
Hard time limit in seconds |
None
|
soft_time_limit
|
int | None
|
Soft time limit (raises exception) |
None
|
Returns:
| Type | Description |
|---|---|
Callable[[F], F]
|
Decorator function |
Example
@queue.task(max_retries=5, autoretry_for=(ConnectionError,)) ... def process_payment(payment_id: int): ... # Process the payment ... pass
register_task
register_task(
func: Callable, name: str | None = None, **options: Any
) -> str
Register a function as a task programmatically.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
func
|
Callable
|
Function to register |
required |
name
|
str | None
|
Task name (default: function qualified name) |
None
|
**options
|
Any
|
Additional task options |
{}
|
Returns:
| Type | Description |
|---|---|
str
|
Registered task name |
enqueue
abstractmethod
enqueue(
task_name: str,
args: tuple = (),
kwargs: dict | None = None,
options: TaskOptions | None = None,
) -> str
Enqueue a task for async execution.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
task_name
|
str
|
Registered task name |
required |
args
|
tuple
|
Positional arguments for task |
()
|
kwargs
|
dict | None
|
Keyword arguments for task |
None
|
options
|
TaskOptions | None
|
Execution options |
None
|
Returns:
| Type | Description |
|---|---|
str
|
Task ID for tracking |
Raises:
| Type | Description |
|---|---|
TaskNotFoundError
|
If task_name is not registered |
Example
task_id = queue.enqueue( ... "send_notification", ... args=(user_id, "Welcome!"), ... options=TaskOptions(countdown=60), ... )
enqueue_many
abstractmethod
enqueue_many(
tasks: list[tuple[str, tuple, dict]],
options: TaskOptions | None = None,
) -> list[str]
Enqueue multiple tasks atomically.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
tasks
|
list[tuple[str, tuple, dict]]
|
List of (task_name, args, kwargs) tuples |
required |
options
|
TaskOptions | None
|
Shared execution options for all tasks |
None
|
Returns:
| Type | Description |
|---|---|
list[str]
|
List of task IDs in same order as input |
Note
Implementations should ensure either all tasks are enqueued or none are (atomic operation).
delay
delay(task_name: str, *args: Any, **kwargs: Any) -> str
Convenience method to enqueue a task immediately.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
task_name
|
str
|
Registered task name |
required |
*args
|
Any
|
Positional arguments for task |
()
|
**kwargs
|
Any
|
Keyword arguments for task |
{}
|
Returns:
| Type | Description |
|---|---|
str
|
Task ID for tracking |
apply_async
apply_async(
task_name: str,
args: tuple = (),
kwargs: dict | None = None,
countdown: int | None = None,
eta: datetime | None = None,
**extra_options: Any
) -> str
Enqueue a task with common options as keyword arguments.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
task_name
|
str
|
Registered task name |
required |
args
|
tuple
|
Positional arguments |
()
|
kwargs
|
dict | None
|
Keyword arguments |
None
|
countdown
|
int | None
|
Delay in seconds |
None
|
eta
|
datetime | None
|
Exact execution time |
None
|
**extra_options
|
Any
|
Additional TaskOptions fields |
{}
|
Returns:
| Type | Description |
|---|---|
str
|
Task ID for tracking |
get_result
abstractmethod
get_result(
task_id: str, timeout: float | None = None
) -> TaskResult
Get task result (may block if timeout provided).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
task_id
|
str
|
Task ID from enqueue |
required |
timeout
|
float | None
|
Max seconds to wait for completion |
None
|
Returns:
| Type | Description |
|---|---|
TaskResult
|
TaskResult with status and result/error |
Note
If timeout is None, returns immediately with current status. If timeout is provided, blocks until task completes or timeout expires.
revoke
abstractmethod
revoke(
task_id: str,
terminate: bool = False,
signal: str = "SIGTERM",
) -> bool
Cancel a pending or running task.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
task_id
|
str
|
Task ID to cancel |
required |
terminate
|
bool
|
If True, terminate running task |
False
|
signal
|
str
|
Signal to send if terminating |
'SIGTERM'
|
Returns:
| Type | Description |
|---|---|
bool
|
True if task was revoked |
Note
Revoking a pending task prevents execution. Terminating a running task sends the specified signal.
retry
abstractmethod
retry(
task_id: str,
countdown: int | None = None,
max_retries: int | None = None,
) -> str
Retry a failed task.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
task_id
|
str
|
Original task ID |
required |
countdown
|
int | None
|
Delay before retry |
None
|
max_retries
|
int | None
|
Override maximum retries |
None
|
Returns:
| Type | Description |
|---|---|
str
|
New task ID for the retry |
Note
This creates a new task based on the original. The original task's state remains unchanged.
forget
forget(task_id: str) -> bool
Forget a task result (cleanup).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
task_id
|
str
|
Task ID to forget |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if result was forgotten |
schedule_periodic
abstractmethod
schedule_periodic(
task_name: str,
schedule: timedelta,
args: tuple = (),
kwargs: dict | None = None,
name: str | None = None,
) -> str
Schedule a periodic task.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
task_name
|
str
|
Registered task name |
required |
schedule
|
timedelta
|
Execution interval |
required |
args
|
tuple
|
Positional arguments for task |
()
|
kwargs
|
dict | None
|
Keyword arguments for task |
None
|
name
|
str | None
|
Unique schedule name (auto-generated if not provided) |
None
|
Returns:
| Type | Description |
|---|---|
str
|
Schedule ID |
Example
schedule_id = queue.schedule_periodic( ... "cleanup_expired_tokens", ... schedule=timedelta(hours=1), ... )
unschedule
abstractmethod
unschedule(schedule_id: str) -> bool
Remove a periodic schedule.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
schedule_id
|
str
|
Schedule ID to remove |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if schedule was removed |
get_schedule
get_schedule(schedule_id: str) -> ScheduleInfo | None
Get information about a periodic schedule.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
schedule_id
|
str
|
Schedule ID to query |
required |
Returns:
| Type | Description |
|---|---|
ScheduleInfo | None
|
ScheduleInfo or None if not found |
list_schedules
list_schedules() -> list[ScheduleInfo]
List all periodic schedules.
Returns:
| Type | Description |
|---|---|
list[ScheduleInfo]
|
List of ScheduleInfo objects |
purge_queue
abstractmethod
purge_queue(queue_name: str = 'default') -> int
Remove all pending tasks from a queue.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
queue_name
|
str
|
Queue to purge |
'default'
|
Returns:
| Type | Description |
|---|---|
int
|
Number of tasks purged |
Warning
This permanently removes all pending tasks. Use with caution in production.
queue_length
abstractmethod
queue_length(queue_name: str = 'default') -> int
Get number of pending tasks in queue.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
queue_name
|
str
|
Queue to check |
'default'
|
Returns:
| Type | Description |
|---|---|
int
|
Number of pending tasks |
list_queues
list_queues() -> list[str]
List all known queue names.
Returns:
| Type | Description |
|---|---|
list[str]
|
List of queue names |
active_count
active_count() -> int
Get number of currently executing tasks.
Returns:
| Type | Description |
|---|---|
int
|
Number of active tasks across all workers |
health_check
abstractmethod
health_check() -> bool
Check if task queue backend is reachable.
Returns:
| Type | Description |
|---|---|
bool
|
True if broker and backend are healthy |
worker_count
worker_count() -> int
Get number of active workers.
Returns:
| Type | Description |
|---|---|
int
|
Number of workers processing tasks |
ping
ping() -> bool
Simple connectivity check.
Returns:
| Type | Description |
|---|---|
bool
|
True if connection is alive |
AsyncTaskQueueInterface
Bases: ABC
Async interface for task queues.
For async-native backends (arq, Taskiq, SAQ). Sync backends (Celery, RQ) use TaskQueueInterface instead.
Implementations
- ArqTaskAdapter (arq - Redis-based async)
- AsyncSyncTaskAdapter (for testing - async wrapper over sync)
provider_name
abstractmethod
property
provider_name: str
Return the provider name (e.g., 'arq', 'taskiq').
task
abstractmethod
task(
name: str | None = None,
*,
max_retries: int = 3,
timeout: int | None = None,
queue: str | None = None
) -> Callable[
[Callable[P, Awaitable[R]]], Callable[P, Awaitable[R]]
]
Decorator to register an async function as a task.
Registered functions MUST be async (coroutines). Type signature enforces this at type-check time — mypy will reject sync function registration.
enqueue
abstractmethod
async
enqueue(
task_name: str,
args: tuple = (),
kwargs: dict | None = None,
options: TaskOptions | None = None,
) -> str
Enqueue a task for async execution. Returns task ID.
enqueue_many
abstractmethod
async
enqueue_many(
tasks: list[tuple[str, tuple, dict]],
options: TaskOptions | None = None,
) -> list[str]
Enqueue multiple tasks. Returns list of task IDs.
Raises:
| Type | Description |
|---|---|
PartialEnqueueError
|
Implementations may raise this when some tasks fail. Contains succeeded (index, ID) pairs and failed (index, exception) pairs for caller-side recovery. |
delay
async
delay(task_name: str, *args: Any, **kwargs: Any) -> str
Convenience: enqueue immediately with positional/keyword args.
get_result
abstractmethod
async
get_result(
task_id: str, timeout: float | None = None
) -> TaskResult
Get task result. Non-blocking if timeout is None.
revoke
abstractmethod
async
revoke(task_id: str) -> bool
Cancel a pending task. Returns True if revoked.
queue_length
abstractmethod
async
queue_length(queue_name: str = 'default') -> int
Get number of pending tasks in queue.
health_check
abstractmethod
async
health_check() -> bool
Check if task queue backend is reachable.
schedule_periodic
async
schedule_periodic(
task_name: str,
cron: str | None = None,
interval: timedelta | None = None,
args: tuple = (),
kwargs: dict | None = None,
) -> str
Schedule a periodic task.
Not all async backends support this natively. arq supports cron-based scheduling; others may require external schedulers (APScheduler, K8s CronJob).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
cron
|
str | None
|
Cron expression (e.g., "/5 * * * ") |
None
|
interval
|
timedelta | None
|
Alternative to cron — fixed interval |
None
|
unschedule
async
unschedule(schedule_id: str) -> bool
Remove a periodic schedule.
startup
async
startup() -> None
Initialize connections (called once at app startup).
shutdown
async
shutdown() -> None
Close connections (called once at app shutdown).