baldur.interfaces — Core State Repositories
The two highest-traffic repository interfaces: failed-operation persistence and circuit-breaker state. Adapter authors implement these to back Baldur on a new storage layer. The shared enums and DTOs live on the data-model page.
FailedOperationRepository
Bases: ABC
Abstract repository for FailedOperation (DLQ) data access.
Concrete implementations: - InMemoryFailedOperationRepository: in-process dict storage - SQLFailedOperationRepository: any DB-API 2.0 database - RedisDLQRepository: Redis via ResilientStorageBackend
create
abstractmethod
create(
domain: str,
failure_type: str,
error_message: str = "",
error_code: str = "",
entity_type: str | None = None,
entity_id: str | None = None,
entity_refs: dict[str, Any] | None = None,
user_id: int | None = None,
snapshot_data: dict[str, Any] | None = None,
request_data: dict[str, Any] | None = None,
response_data: dict[str, Any] | None = None,
metadata: dict[str, Any] | None = None,
retry_count: int = 0,
max_retries: int = 2,
next_action_hint: str = "",
recommended_action: str = "",
expires_at: datetime | None = None,
) -> FailedOperationData
Create a new failed operation record
get_by_id
abstractmethod
get_by_id(id: str) -> FailedOperationData | None
Get a failed operation by ID
get_pending_by_domain
abstractmethod
get_pending_by_domain(
domain: str, limit: int = 100
) -> list[FailedOperationData]
Get pending operations for a specific domain
get_pending_count_by_domain
abstractmethod
get_pending_count_by_domain(domain: str) -> int
Get count of pending operations for a domain
update_status
abstractmethod
update_status(
id: str,
status: str,
resolution_type: str = "",
resolution_note: str = "",
resolved_by_id: int | None = None,
recommended_action: str = "",
) -> bool
Update the status of a failed operation.
recommended_action: Suggested operator action (e.g., "escalate", "manual_check", "replay"). Empty string preserves the existing value.
increment_retry_count
abstractmethod
increment_retry_count(id: str) -> bool
Increment retry count and update last_retry_at
mark_as_resolved
abstractmethod
mark_as_resolved(
id: str,
resolution_type: str,
resolution_note: str = "",
resolved_by_id: int | None = None,
) -> bool
Mark a failed operation as resolved
get_expired_operations
abstractmethod
get_expired_operations(
before_date: datetime, limit: int = 100
) -> list[FailedOperationData]
Get operations that have expired
bulk_update_status
abstractmethod
bulk_update_status(ids: list[str], status: str) -> int
Bulk update status for multiple operations
find_by_status
abstractmethod
find_by_status(
status: str,
domain: str | None = None,
failure_type: str | None = None,
limit: int = 100,
) -> list[FailedOperationData]
Find operations by status with optional filters.
Distinct contract from find: positional status is required,
results are ordered created_at ASC, and there is no offset. Used
by replay/SLA paths that consume a single status oldest-first.
find
abstractmethod
find(
*,
status: str | None = None,
domain: str | None = None,
failure_type: str | None = None,
offset: int = 0,
limit: int = 100
) -> list[FailedOperationData]
Paginated cross-status query with optional filters.
Results ordered by created_at DESC (newest-first). No filter
returns all statuses — the default scope is "no filter = all", so
escalated/terminal statuses (permanently_failed,
requires_review) are visible by default. Mirrors the sibling
archive repositories' find(*, ..., offset, limit) contract.
count
abstractmethod
count(
*,
status: str | None = None,
domain: str | None = None,
failure_type: str | None = None
) -> int
Count operations matching filters (no filter = all statuses).
count_created_in_window
abstractmethod
count_created_in_window(
start: datetime, end: datetime
) -> int
Count operations whose created_at is in the inclusive [start, end].
Counts every status (no status scope): an entry created in the window that was later resolved/archived still consumed budget when it failed, so the windowed inflow count must not be status-scoped. Powers the Error Budget windowed inflow source.
find_replayable
abstractmethod
find_replayable(
max_retries: int,
domain: str | None = None,
failure_type: str | None = None,
limit: int = 100,
) -> list[FailedOperationData]
Find operations that can be replayed (pending and retry_count < max_retries)
find_sla_breached
abstractmethod
find_sla_breached(
current_time: datetime,
sla_thresholds: dict[str, timedelta],
) -> list[FailedOperationData]
Find operations that have breached their SLA
find_expired
abstractmethod
find_expired(
current_time: datetime,
) -> list[FailedOperationData]
Find operations past their retention period
get_statistics
abstractmethod
get_statistics() -> dict[str, Any]
Get statistics about failed operations
get_facet_counts
abstractmethod
get_facet_counts(
*, status: str | None = None, domain: str | None = None
) -> dict[str, dict[str, int]]
Faceted status×domain counts for the admin-console DLQ filter.
Returns {"by_status": {status: n, ...}, "by_domain": {domain: n, ...}}
with zero-count buckets dropped.
Standard faceted-search semantics: each facet excludes its own
selection — by_status is scoped by the domain filter and
by_domain is scoped by the status filter, so the dimension
being chosen keeps all of its options. An unfiltered call returns
the complete by_status + by_domain. Matching on both
dimensions is exact (no substring/fuzzy).
try_acquire_for_replay
abstractmethod
try_acquire_for_replay(
id: str, max_retries: int, force: bool = False
) -> FailedOperationData | None
Atomically acquire a DLQ entry for replay.
Normal mode (force=False) MUST:
1. Check if status is PENDING and retry_count < max_retries
2. If eligible, atomically set status to REPLAYING and increment retry_count
3. Return the FailedOperationData if acquired, None if not eligible
Force mode (force=True) is the operator-driven cap-override escape
hatch for re-driving an at-cap entry after a root-cause fix. It MUST:
1. Accept status in {PENDING, REQUIRES_REVIEW} and reject every other
status (RESOLVED / ARCHIVED / REPLAYING / REVIEWING / ... -> None)
2. Skip the retry_count < max_retries check entirely
3. Reset retry_count to a fresh budget (the redrive attempt is attempt 1
-> retry_count == 1 after acquire), so the entry becomes an ordinary
under-cap entry for all downstream lifecycle logic
4. Before resetting, stamp the persisted metadata with
previous_total_retries (the pre-reset count accumulated across
prior force-redrives) and force_redrive_count (incremented), so
the budget reset preserves the forensic scar
5. Atomically set status to REPLAYING
The poison-pill convergence guarantee is preserved: a still-broken
force-redriven entry re-converges to REQUIRES_REVIEW within
max_retries further automatic attempts via complete_replay.
Implementation should use row-level locking (SELECT FOR UPDATE) or optimistic locking (version/updated_at check) to prevent race conditions.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
id
|
str
|
The DLQ entry ID to acquire |
required |
max_retries
|
int
|
Maximum allowed retry attempts |
required |
force
|
bool
|
Operator cap-override — bypass the cap gate and accept an at-cap REQUIRES_REVIEW entry (see Force mode above) |
False
|
Returns:
| Type | Description |
|---|---|
FailedOperationData | None
|
FailedOperationData if successfully acquired, None otherwise |
Example Django implementation (normal mode): with transaction.atomic(): entry = FailedOperation.objects.select_for_update().get(id=id) if entry.status != 'pending' or entry.retry_count >= max_retries: return None entry.status = 'replaying' entry.retry_count += 1 entry.last_retry_at = now() entry.save() return FailedOperationData.from_model(entry)
complete_replay
abstractmethod
complete_replay(
id: str,
success: bool,
resolution_type: str = "",
note: str = "",
resolved_by_id: int | None = None,
error_details: dict[str, Any] | None = None,
) -> bool
Complete a replay operation by updating the final status.
Should be called after replay execution to set final state: - success=True: Mark as RESOLVED with resolution details - success=False: Revert to PENDING (for retry) or REQUIRES_REVIEW (if escalated)
This method is safe to call without transaction wrapper as it only updates an already-acquired entry.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
id
|
str
|
The DLQ entry ID |
required |
success
|
bool
|
Whether the replay succeeded |
required |
resolution_type
|
str
|
Type of resolution (for successful replays) |
''
|
note
|
str
|
Resolution note or error message |
''
|
resolved_by_id
|
int | None
|
User ID who resolved (None for system) |
None
|
error_details
|
dict[str, Any] | None
|
Additional error context (for failed replays) |
None
|
Returns:
| Type | Description |
|---|---|
bool
|
True if update succeeded, False otherwise |
release_stale_replaying
abstractmethod
release_stale_replaying(
older_than_minutes: int = 30,
) -> int
Release DLQ entries stuck in REPLAYING state.
Entries can get stuck if the replay process crashes after acquiring but before completing. This method reverts them to PENDING for retry.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
older_than_minutes
|
int
|
Consider entries older than this as stale |
30
|
Returns:
| Type | Description |
|---|---|
int
|
Number of entries released |
archive_old_resolved
abstractmethod
archive_old_resolved(older_than_days: int = 30) -> int
Archive resolved entries older than N days.
Changes status from RESOLVED to ARCHIVED. This is a soft-delete operation - data is preserved.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
older_than_days
|
int
|
Archive entries resolved more than this many days ago |
30
|
Returns:
| Type | Description |
|---|---|
int
|
Number of entries archived |
purge_archived
abstractmethod
purge_archived(
ids: list[str] | None = None,
older_than_days: int | None = None,
) -> int
Permanently delete archived entries.
IMPORTANT: This is a destructive operation. Only archived entries can be purged. Either specify IDs or older_than_days, not both.
With neither argument the call is a no-op (returns 0): a destructive
purge with no selection criteria deletes nothing (fail-safe default).
To purge every archived entry, pass older_than_days=0 ("older than
0 days" matches all archived entries).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
ids
|
list[str] | None
|
Specific entry IDs to purge (must be ARCHIVED status) |
None
|
older_than_days
|
int | None
|
Purge archived entries older than N days; |
None
|
Returns:
| Type | Description |
|---|---|
int
|
Number of entries permanently deleted |
Raises:
| Type | Description |
|---|---|
ValueError
|
If trying to purge non-archived entries |
get_cleanup_stats
abstractmethod
get_cleanup_stats() -> dict[str, Any]
Get statistics for cleanup operations.
Returns:
| Name | Type | Description |
|---|---|---|
dict[str, Any]
|
Dict with counts by status, age distributions, etc. |
|
Example |
dict[str, Any]
|
|
dict[str, Any]
|
{ "total": 1500, "by_status": { "pending": 50, "resolved": 1200, "archived": 250, }, "resolved_older_than_30_days": 800, "archived_older_than_90_days": 100, |
|
dict[str, Any]
|
} |
count_archived_older_than
abstractmethod
count_archived_older_than(older_than_days: int) -> int
Count archived entries older than N days.
Pushes counting to the repository layer where SQL adapters can use SELECT COUNT(*) instead of loading objects into memory.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
older_than_days
|
int
|
Count archived entries resolved more than this many days ago |
required |
Returns:
| Type | Description |
|---|---|
int
|
Number of matching archived entries |
count_all
abstractmethod
count_all() -> int
Return active DLQ item count (excludes resolved/rejected/archived).
count_by_domain
abstractmethod
count_by_domain(domain: str) -> int
Return DLQ item count for a specific domain.
get_oldest_ids
abstractmethod
get_oldest_ids(
count: int, domain: str | None = None
) -> list[str]
Return IDs of the oldest items (by score/timestamp).
delete
abstractmethod
delete(entry_id: str) -> bool
Delete a single DLQ entry by ID. Returns True if deleted.
evict_oldest
abstractmethod
evict_oldest(count: int, domain: str | None = None) -> int
Delete the oldest items. Returns number of items actually deleted.
compress_and_evict_oldest
compress_and_evict_oldest(
count: int, domain: str | None = None
) -> int
Summarize then evict oldest items. Default delegates to evict_oldest.
store_compressed_entry
store_compressed_entry(entry: DLQCompressedEntry) -> bool
Store a compressed summary entry. Returns True if stored.
get_compressed_entries
get_compressed_entries(
domain: str | None = None,
status: str | None = None,
limit: int = 100,
) -> list[DLQCompressedEntry]
Return compressed DLQ entries, optionally filtered.
get_compressed_summary
get_compressed_summary() -> dict[str, Any]
Return aggregate statistics of compressed entries.
update_compressed_status
update_compressed_status(
entry_id: str, new_status: str
) -> bool
Transition compressed entry status. Returns True if updated.
CircuitBreakerStateRepository
Bases: ABC
Abstract repository for CircuitBreakerState data access.
Manages circuit breaker state persistence and retrieval.
get_or_create
abstractmethod
get_or_create(service_name: str) -> CircuitBreakerStateData
Get existing state or create new one for a service
get_by_service_name
abstractmethod
get_by_service_name(
service_name: str,
) -> CircuitBreakerStateData | None
Get circuit breaker state by service name
update_state
abstractmethod
update_state(
service_name: str,
state: str,
failure_count: int | None = None,
success_count: int | None = None,
opened_at: datetime | None = None,
last_failure_at: datetime | None = None,
half_open_request_count: int | None = None,
reset_half_open_count: bool = False,
) -> bool
Update circuit breaker state.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
service_name
|
str
|
Service identifier |
required |
state
|
str
|
New state (closed, open, half_open) |
required |
failure_count
|
int | None
|
Optional failure count |
None
|
success_count
|
int | None
|
Optional success count |
None
|
opened_at
|
datetime | None
|
Optional time when circuit was opened |
None
|
half_open_request_count
|
int | None
|
Optional half-open request counter value |
None
|
reset_half_open_count
|
bool
|
If True, atomically clear the half-open counter and watermark in the same write. Used by cold-path transitions out of HALF_OPEN (record_failure, record_success, force_open, force_close) so the state change and counter reset commit in a single round-trip. |
False
|
Returns:
| Type | Description |
|---|---|
bool
|
True on success |
record_failure
abstractmethod
record_failure(
service_name: str,
) -> CircuitBreakerStateData
Record a failure and return updated state
record_success
abstractmethod
record_success(
service_name: str,
) -> CircuitBreakerStateData
Record a success and return updated state
record_success_with_close_check
record_success_with_close_check(
service_name: str, success_threshold: int
) -> CircuitBreakerCloseAttempt
Record a success and atomically check whether to close the circuit.
Race-unsafe default implementation: invokes record_success followed
by a separate update_state call when the threshold is met. Adapters
that can perform the read-decide-write atomically (e.g., InMemory
under a single lock acquire, Redis via Lua, SQL under a transaction)
MUST override this to close the TOCTOU window that allows multiple
callers to each observe a passing threshold and emit duplicate
CIRCUIT_BREAKER_CLOSED events for the same logical transition.
The race-unsafe default exists so non-InMemory adapters compile and function without change — at the cost of retaining the duplicate-emit race they previously had. Their distributed-safe override is tracked as the "Distributed Redis-backed CB race" out-of-scope item.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
service_name
|
str
|
Circuit breaker identifier. |
required |
success_threshold
|
int
|
Number of HALF_OPEN successes required to
transition to CLOSED. Comes from |
required |
Returns:
| Type | Description |
|---|---|
CircuitBreakerCloseAttempt
|
|
CircuitBreakerCloseAttempt
|
True only for the single caller that crossed the threshold under |
CircuitBreakerCloseAttempt
|
the adapter's atomicity guarantee — concurrent stale-view callers |
CircuitBreakerCloseAttempt
|
see |
set_manual_control
abstractmethod
set_manual_control(
service_name: str,
state: str,
controlled_by_id: int | None = None,
reason: str = "",
expires_at: datetime | None = None,
) -> bool
Set manual control on a circuit breaker
clear_manual_control
abstractmethod
clear_manual_control(
service_name: str, preserve_reason: bool = False
) -> bool
Clear manual control from a circuit breaker
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
service_name
|
str
|
Name of the service |
required |
preserve_reason
|
bool
|
If True, keep the existing control_reason value |
False
|
get_all_states
abstractmethod
get_all_states() -> list[CircuitBreakerStateData]
Get all circuit breaker states.
Scale bound: intended for OSS / PRO deployments where the total
circuit-breaker count is well under ~1K (typically <= a few hundred).
Used by admin dashboards and IPC snapshots that genuinely want a
full picture. Callers needing larger result sets, or needing only
a subset, should use get_open_states(limit=...) or filter via
a future paginated API instead of growing this method.
get_open_states
get_open_states(
limit: int | None = None,
) -> list[CircuitBreakerStateData]
Get circuit breaker states in OPEN state.
More efficient than get_all_states() + filter for large keyspaces. Default implementation filters get_all_states(); adapters may override with optimized queries (e.g., SCAN instead of KEYS in Redis).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
limit
|
int | None
|
Maximum number of results. None means no limit. |
None
|
Returns:
| Type | Description |
|---|---|
list[CircuitBreakerStateData]
|
List of CircuitBreakerStateData with state == OPEN, |
list[CircuitBreakerStateData]
|
ordered by opened_at ascending (oldest first). |
reset
abstractmethod
reset(service_name: str) -> bool
Reset circuit breaker to initial closed state
delete_state
abstractmethod
delete_state(service_name: str) -> bool
Delete circuit breaker state entirely.
Used by reconciliation jobs to remove orphaned CB entries.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
service_name
|
str
|
Service identifier (may be Composite Key) |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if deleted, False if not found |
atomic_force_open
abstractmethod
atomic_force_open(
service_name: str,
reason: str = "",
controlled_by_id: int | None = None,
ttl_minutes: int = 90,
) -> tuple[bool, str, str]
Atomically force open a circuit breaker.
This method MUST use row-level locking to prevent concurrent modifications. Creates the circuit breaker if it doesn't exist.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
service_name
|
str
|
Name of the service |
required |
reason
|
str
|
Reason for opening |
''
|
controlled_by_id
|
int | None
|
User ID who initiated the change |
None
|
ttl_minutes
|
int
|
TTL for manual override |
90
|
Returns:
| Type | Description |
|---|---|
tuple[bool, str, str]
|
Tuple of (success, previous_state, new_state) |
Example Django implementation
with transaction.atomic(): state, created = CircuitBreakerState.objects.select_for_update().get_or_create( service_name=service_name ) previous = state.state state.state = 'open' state.manually_controlled = True state.save() return (True, previous, 'open')
atomic_force_close
abstractmethod
atomic_force_close(
service_name: str,
reason: str = "",
controlled_by_id: int | None = None,
) -> tuple[bool, str, str]
Atomically force close a circuit breaker.
This method MUST use row-level locking to prevent concurrent modifications.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
service_name
|
str
|
Name of the service |
required |
reason
|
str
|
Reason for closing |
''
|
controlled_by_id
|
int | None
|
User ID who initiated the change |
None
|
Returns:
| Type | Description |
|---|---|
tuple[bool, str, str]
|
Tuple of (success, previous_state, new_state) |
atomic_reset
abstractmethod
atomic_reset(
service_name: str,
reason: str = "",
controlled_by_id: int | None = None,
) -> tuple[bool, str, str]
Atomically reset a circuit breaker to initial state.
This method MUST use row-level locking to prevent concurrent modifications. Resets all counters and clears manual control.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
service_name
|
str
|
Name of the service |
required |
reason
|
str
|
Reason for reset |
''
|
controlled_by_id
|
int | None
|
User ID who initiated the change |
None
|
Returns:
| Type | Description |
|---|---|
tuple[bool, str, str]
|
Tuple of (success, previous_state, new_state) |
try_acquire_half_open_slot
abstractmethod
try_acquire_half_open_slot(
service_name: str,
limit: int,
stuck_timeout_seconds: int,
) -> tuple[bool, str, str]
Atomically acquire a HALF_OPEN trial slot.
This is the single race-free entry point for the should_allow HALF_OPEN branch. Implementations MUST perform the state-machine evaluation and the counter increment as one atomic operation (Redis Lua / SQL SELECT FOR UPDATE / in-memory under RLock).
State-machine branches (in evaluation order):
state == "half_open"ANDcount >= limitANDnow - half_open_window_started_at > stuck_timeout_seconds: Stuck-window auto-reset. Treat as a fresh OPEN→HALF_OPEN combo: resethalf_open_request_count = 1,success_count = 0, refresh the window watermark. Return(True, "half_open", "half_open").state == "open"(recovery_timeout already verified by the caller): Atomic OPEN→HALF_OPEN transition: writestate = "half_open",success_count = 0,half_open_request_count = 1, set the window watermark. Return(True, "open", "half_open").state == "half_open"ANDcount < limit: increment the counter. Return(True, "half_open", "half_open").state == "half_open"ANDcount >= limit(within window): Reject. Return(False, "half_open", "half_open").- Otherwise (CLOSED, manual override, etc.): no-op. Return
(False, current_state, current_state).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
service_name
|
str
|
Service identifier |
required |
limit
|
int
|
Maximum HALF_OPEN trial slots (cluster-wide) |
required |
stuck_timeout_seconds
|
int
|
Window age (seconds) past which a maxed-out HALF_OPEN window is considered stalled (worker died mid-trial) and auto-reset on the next acquire |
required |
Returns:
| Type | Description |
|---|---|
bool
|
Tuple of |
str
|
emits |
str
|
|
reset_half_open_count
abstractmethod
reset_half_open_count(service_name: str) -> None
Reset the HALF_OPEN counter and clear the window watermark.
Used by manual_control flows where a counter reset is needed without
a state change. Hot-path transitions out of HALF_OPEN should prefer
update_state(..., reset_half_open_count=True) (single
round-trip).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
service_name
|
str
|
Service identifier |
required |