baldur.decorators — Resilience Decorators
Opinionated @protected presets and orthogonal gates. The primitive
@protected / @aprotected lives at the top level; this module hosts
preset compositions (@dlq_protect) and orthogonal call-site gates
(@idempotent, @rate_limit, @domain_tag).
See also
Django quickstart — end-to-end decorator wiring on a real example app.
Decorators
dlq_protect
dlq_protect(
name: str,
*,
fallback: (
Callable[[], Any]
| Callable[[], Awaitable[Any]]
| None
) = None,
timeout: float | None = _TIMEOUT_UNSET,
context_from: (
Callable[..., PolicyContext] | None | Literal[False]
) = _CONTEXT_FROM_UNSET
) -> Callable[[Callable[..., T]], Callable[..., T]]
PRO-aliased @protected with zero-message-loss defaults pinned.
Equivalent to @protected(name, dlq=True, retry=True, circuit_breaker=True)
but communicates the PRO value proposition at the decoration site.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Identifier used for metrics/logging (passed to |
required |
fallback
|
Callable[[], Any] | Callable[[], Awaitable[Any]] | None
|
Optional fallback callable invoked when all branches fail. |
None
|
timeout
|
float | None
|
Per-call timeout in seconds. Omit to use |
_TIMEOUT_UNSET
|
context_from
|
Callable[..., PolicyContext] | None | Literal[False]
|
Forwarded to |
_CONTEXT_FROM_UNSET
|
Returns:
| Type | Description |
|---|---|
Callable[[Callable[..., T]], Callable[..., T]]
|
Decorator that auto-detects sync vs async and dispatches accordingly. |
Usage::
@dlq_protect("orders.charge")
def charge(order_id: int) -> None:
...
@dlq_protect("orders.charge_async")
async def charge_async(order_id: int) -> None:
...
idempotent
idempotent(
*,
domain: (
IdempotencyDomain | str
) = IdempotencyDomain.CUSTOM,
key_args: list[str] | None = None,
key_fn: (
Callable[..., str | IdempotencyKey] | None
) = None,
operation: str | None = None,
ttl: timedelta | None = None,
execution_ttl: timedelta | None = None
) -> Callable[[Callable[..., T]], Callable[..., T]]
Atomic do-not-run-twice decorator.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
domain
|
IdempotencyDomain | str
|
|
CUSTOM
|
key_args
|
list[str] | None
|
List of parameter names to extract from the wrapped function's
signature. The cache key combines the domain, the per-operation
component (see |
None
|
key_fn
|
Callable[..., str | IdempotencyKey] | None
|
|
None
|
operation
|
str | None
|
Per-operation key component for the |
None
|
ttl
|
timedelta | None
|
Dedup memory window — how long a completed (or failed) operation
is remembered, i.e. how long duplicates stay blocked after success.
|
None
|
execution_ttl
|
timedelta | None
|
In-flight execution window — how long a running claim
is honored before a crashed attempt becomes retryable (and before
a competing process may take the key over). |
None
|
Returns:
| Type | Description |
|---|---|
Callable[[Callable[..., T]], Callable[..., T]]
|
Decorator that auto-detects sync vs async. |
Raises (at decoration time):
TypeError: If both key_args and key_fn are supplied; or if
an annotated key_args parameter is non-primitive; or if neither
is supplied; or if operation is empty / contains a separator /
is combined with key_fn; or if ttl / execution_ttl is
not a positive timedelta.
Raises (at call time):
IdempotencyDuplicateError: On SKIP (already completed) or ABORT
(in-flight collision).
IdempotencyUnavailableError: When the cache is unavailable (e.g. Redis
down) during the check and fail_open_on_cache_error is False
(the default fail-closed posture); opting in treats the
unverifiable check as CONTINUE instead.
TypeError: If an unannotated key_args value resolves to a
non-primitive at runtime; or key_fn returns an unsupported type.
Usage::
@idempotent(domain=IdempotencyDomain.EXTERNAL_SERVICE, key_args=["order_id"])
def charge(order_id: int) -> None:
...
@idempotent(key_fn=lambda payload: payload["request_id"])
async def handle(payload: dict) -> None:
...
@idempotent(
domain=IdempotencyDomain.EXTERNAL_SERVICE,
key_args=["order_id"],
operation="billing.charge", # shared label across entry points
ttl=timedelta(hours=2), # remember completions for 2 h
execution_ttl=timedelta(minutes=5), # worst-case runtime bound
)
def charge_from_worker(order_id: int) -> None:
...
rate_limit
rate_limit(
*,
max_requests: int,
window_seconds: int = 60,
key_fn: Callable[..., str] | None = None,
raise_on_limit: bool = True
) -> Callable[[Callable[..., T]], Callable[..., T]]
Function-level sliding-window rate limiter.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
max_requests
|
int
|
Maximum requests allowed within |
required |
window_seconds
|
int
|
Sliding-window size in seconds. Defaults to 60. |
60
|
key_fn
|
Callable[..., str] | None
|
Optional |
None
|
raise_on_limit
|
bool
|
When |
True
|
Returns:
| Type | Description |
|---|---|
Callable[[Callable[..., T]], Callable[..., T]]
|
Decorator that auto-detects sync vs async. |
Raises (at call time):
RateLimitExceeded: When raise_on_limit=True and the limiter
rejects the call.
Usage::
@rate_limit(max_requests=10, window_seconds=60)
def search(q: str) -> list[str]:
...
@rate_limit(max_requests=5, window_seconds=1, key_fn=lambda user_id: f"u:{user_id}")
async def fetch(user_id: int) -> dict:
...
domain_tag
domain_tag(domain: str) -> Callable[[F], F]
Domain-tagging decorator.
Sets the domain context while the wrapped function executes and restores the previous context on exit (both sync and async callables are supported).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
domain
|
str
|
Domain name to apply for the duration of the call. |
required |
Returns:
| Type | Description |
|---|---|
Callable[[F], F]
|
Decorator that wraps the target callable. |
Usage
@domain_tag("payment") def process_payment(): # Errors raised here are tagged with the "payment" domain. ...
@domain_tag("order") async def create_order(): # Async functions are supported. ...
Reference
docs/baldur/middleware_system/75_CRISIS_BUDGET_MULTIPLIER.md §0.1 (item 6)
Context manager
DomainContext
DomainContext(domain: str)
Domain context manager.
Sets the domain for the enclosed code block via with. On block exit,
the previous domain context is restored automatically.
Usage
with DomainContext("payment"): # The domain is "payment" inside this block. process_payment()
The previous domain is restored on exit.
Attributes:
| Name | Type | Description |
|---|---|---|
domain |
Domain name to apply for the block. |
Initialize a DomainContext.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
domain
|
str
|
Domain name to apply. |
required |
Helpers
get_current_domain
get_current_domain() -> str | None
Return the current domain from the active context.
Returns:
| Type | Description |
|---|---|
str | None
|
Current domain name, or None if no context is set. |
Usage
@domain_tag("payment") def process_payment(): domain = get_current_domain() print(f"Current domain: {domain}") # "payment"
clear_domain_context
clear_domain_context() -> None
Clear the current domain context.
Primarily for test cleanup. In application code, DomainContext and
@domain_tag clean up automatically on scope exit.
Exceptions raised by the decorators above
IdempotencyDuplicateError
IdempotencyDuplicateError(
message: str = "",
*,
key: str = "",
domain: str = "",
decision: str = ""
)
Bases: BaldurError
Raised by @idempotent on a detected duplicate or in-flight collision.
Inherits BaldurError directly (correctness contract, not a
resilience stage). Non-retryable by default — outer @dlq_protect
layers should treat this as a terminal signal.
RateLimitExceeded
RateLimitExceeded(
message: str = "",
*,
key: str = "",
limit: int = 0,
window_seconds: int = 0,
reset_at: int = 0
)
Bases: ResilienceError
Raised by @rate_limit when a call is rejected by the limiter.
Function-level rejection signal — distinct from
RateLimitStorageError (storage-backend failure).
DomainValidationError
DomainValidationError(
message: str = "",
*,
original_domain: str = "",
reason: Any = None
)
Bases: BaldurError
Raised when a domain input string fails validation.
Carries the original (pre-normalization) input and a typed reject reason for downstream logging / metric labelling.
Modeled on RecoveryAdapterError: raised at validation sites that have
a loud failure mode (decoration-time, where a CI/dev surface can recover
via test or rename). Runtime APIs catch this and fall back to
FALLBACK_DOMAIN instead of propagating.