baldur.adapters.memory — Core State Repositories
In-memory implementations of the highest-traffic repository interfaces: failed-operation persistence and circuit-breaker state. For testing, standalone (non-framework) usage, and prototyping.
InMemoryFailedOperationRepository
InMemoryFailedOperationRepository()
Bases: FailedOperationRepository
In-memory implementation of FailedOperationRepository.
Thread-safe storage for DLQ entries in memory. Data is lost when the process exits.
Maintains status/domain indexes for O(1) lookup instead of O(n) scan.
create
create(
domain: str,
failure_type: str,
error_message: str = "",
error_code: str = "",
entity_type: str | None = None,
entity_id: str | None = None,
entity_refs: dict[str, Any] | None = None,
user_id: int | None = None,
snapshot_data: dict[str, Any] | None = None,
request_data: dict[str, Any] | None = None,
response_data: dict[str, Any] | None = None,
metadata: dict[str, Any] | None = None,
retry_count: int = 0,
max_retries: int = 2,
next_action_hint: str = "",
recommended_action: str = "",
expires_at: datetime | None = None,
) -> FailedOperationData
Create a new failed operation record (domain-neutral).
get_by_id
get_by_id(id: str) -> FailedOperationData | None
Get a failed operation by ID.
get_pending_by_domain
get_pending_by_domain(
domain: str, limit: int = 100
) -> list[FailedOperationData]
Get pending operations for a specific domain.
get_pending_count_by_domain
get_pending_count_by_domain(domain: str) -> int
Get count of pending operations for a domain.
update_status
update_status(
id: str,
status: str,
resolution_type: str = "",
resolution_note: str = "",
resolved_by_id: int | None = None,
recommended_action: str = "",
) -> bool
Update the status of a failed operation.
increment_retry_count
increment_retry_count(id: str) -> bool
Increment retry count and update last_retry_at.
mark_as_resolved
mark_as_resolved(
id: str,
resolution_type: str,
resolution_note: str = "",
resolved_by_id: int | None = None,
) -> bool
Mark a failed operation as resolved.
get_expired_operations
get_expired_operations(
before_date: datetime, limit: int = 100
) -> list[FailedOperationData]
Get operations that have expired.
bulk_update_status
bulk_update_status(ids: list[str], status: str) -> int
Bulk update status for multiple operations.
find_by_status
find_by_status(
status: str,
domain: str | None = None,
failure_type: str | None = None,
limit: int = 100,
) -> list[FailedOperationData]
Find operations by status with optional filters.
find
find(
*,
status: str | None = None,
domain: str | None = None,
failure_type: str | None = None,
offset: int = 0,
limit: int = 100
) -> list[FailedOperationData]
Paginated cross-status query ordered by created_at DESC.
count
count(
*,
status: str | None = None,
domain: str | None = None,
failure_type: str | None = None
) -> int
Count operations matching filters (pre-slice set size).
count_created_in_window
count_created_in_window(
start: datetime, end: datetime
) -> int
Count entries whose created_at is within the inclusive [start, end].
find_replayable
find_replayable(
max_retries: int,
domain: str | None = None,
failure_type: str | None = None,
limit: int = 100,
) -> list[FailedOperationData]
Find operations that can be replayed.
find_sla_breached
find_sla_breached(
current_time: datetime,
sla_thresholds: dict[str, timedelta],
) -> list[FailedOperationData]
Find operations that have breached their SLA.
find_expired
find_expired(
current_time: datetime,
) -> list[FailedOperationData]
Find operations past their retention period.
get_statistics
get_statistics() -> dict[str, Any]
Get statistics about failed operations.
Adds pending-specific breakdowns for the daily report: - pending_by_domain: {domain: pending_count} (required by update_dlq_pending_gauges — pre-existing bug fix) - pending_by_domain_and_failure_type: {domain: {failure_type: count}} (powers DLQPendingBreakdown in daily report)
Memory adapter iterates the pending index (O(N) in-memory).
get_facet_counts
get_facet_counts(
*, status: str | None = None, domain: str | None = None
) -> dict[str, dict[str, int]]
Faceted status×domain counts via the 1D/2D indexes.
by_status is scoped by domain; by_domain is scoped by
status (faceted-search semantics). Empty buckets are dropped
explicitly with if ids: _remove_from_index discards ids
without deleting an emptied set, so a fully-drained status/domain key
lingers as an empty set and would otherwise surface as :0,
breaking zero-drop parity with the SQL/Redis adapters.
try_acquire_for_replay
try_acquire_for_replay(
id: str, max_retries: int, force: bool = False
) -> FailedOperationData | None
Atomically acquire a DLQ entry for replay.
force=True bypasses the cap gate (operator cap-override): it accepts
a {PENDING, REQUIRES_REVIEW} source, resets retry_count to a fresh budget
(this redrive is attempt 1), and stamps the metadata history scar before
the reset. See FailedOperationRepository.try_acquire_for_replay.
complete_replay
complete_replay(
id: str,
success: bool,
resolution_type: str = "",
note: str = "",
resolved_by_id: int | None = None,
error_details: dict[str, Any] | None = None,
) -> bool
Complete a replay operation by updating the final status.
release_stale_replaying
release_stale_replaying(
older_than_minutes: int = 30,
) -> int
Release DLQ entries stuck in REPLAYING state.
clear
clear() -> None
Clear all entries (for testing).
archive_old_resolved
archive_old_resolved(older_than_days: int = 30) -> int
Archive resolved entries older than N days.
purge_archived
purge_archived(
ids: list[str] | None = None,
older_than_days: int | None = None,
) -> int
Permanently delete archived entries.
count_archived_older_than
count_archived_older_than(older_than_days: int) -> int
Count archived entries older than N days.
count_all
count_all() -> int
Return active DLQ item count (excludes resolved/rejected/archived).
Matches Redis adapter semantics where resolved entries are removed from the PENDING_KEY sorted set.
count_by_domain
count_by_domain(domain: str) -> int
Return DLQ item count for a specific domain.
get_oldest_ids
get_oldest_ids(
count: int, domain: str | None = None
) -> list[str]
Return IDs of the oldest items (by created_at).
delete
delete(entry_id: str) -> bool
Delete a single entry by ID. Returns True if deleted.
evict_oldest
evict_oldest(count: int, domain: str | None = None) -> int
Delete the oldest items, skipping entries in protected statuses.
get_cleanup_stats
get_cleanup_stats() -> dict[str, Any]
Get statistics for cleanup operations.
compress_and_evict_oldest
compress_and_evict_oldest(
count: int, domain: str | None = None
) -> int
Compress then evict oldest entries (in-memory implementation).
Same logical flow as Redis adapter but uses Python dict/list.
store_compressed_entry
store_compressed_entry(entry: DLQCompressedEntry) -> bool
Store compressed entry in memory dict.
get_compressed_entries
get_compressed_entries(
domain: str | None = None,
status: str | None = None,
limit: int = 100,
) -> list[DLQCompressedEntry]
Query compressed entries from memory, newest first.
get_compressed_summary
get_compressed_summary() -> dict[str, Any]
Aggregate statistics of compressed entries.
update_compressed_status
update_compressed_status(
entry_id: str, new_status: str
) -> bool
Transition compressed entry lifecycle status.
InMemoryCircuitBreakerStateRepository
InMemoryCircuitBreakerStateRepository(
sliding_window_size: int = 100,
)
Bases: CircuitBreakerStateRepository
In-memory implementation of CircuitBreakerStateRepository.
Thread-safe storage for circuit breaker states in memory.
When sliding_window_size is specified, record_failure() / record_success() perform Ring Buffer-based Sliding Window counting. failure_count and success_count reflect the count within the most recent N calls.
get_by_service_name
get_by_service_name(
service_name: str,
) -> CircuitBreakerStateData | None
Get circuit breaker state by service name.
get_or_create
get_or_create(service_name: str) -> CircuitBreakerStateData
Get or create a circuit breaker state.
Double-checked locking — steady-state callers (entry exists) bypass
the RLock entirely and pay only a GIL-atomic dict.get. Only the
first-call create needs the lock. Mirrors the precedent in
protect.py and the project-wide policy in factory/base.py
("read-only dict operations rely on CPython GIL atomicity").
Removes the surviving read-path acquire that was the contention floor under high concurrency.
update_state
update_state(
service_name: str,
state: str,
failure_count: int | None = None,
success_count: int | None = None,
opened_at: datetime | None = None,
last_failure_at: datetime | None = None,
half_open_request_count: int | None = None,
reset_half_open_count: bool = False,
) -> bool
Update circuit breaker state.
increment_failure_count
increment_failure_count(
service_name: str,
last_failure_at: datetime | None = None,
) -> int
Increment failure count.
reset_counts
reset_counts(service_name: str) -> bool
Reset failure/success counts and clear the OPEN-era timestamp.
opened_at is cleared alongside the counters so a CLOSED
DTO does not carry a stale OPEN-era timestamp. The L2-authoritative
close path in LayeredCircuitBreakerStateRepository invokes this
before transitioning L1 to CLOSED.
set_manual_control
set_manual_control(
service_name: str,
state: str,
controlled_by_id: int | None = None,
reason: str = "",
expires_at: datetime | None = None,
) -> bool
Set manual control override.
clear_manual_control
clear_manual_control(
service_name: str, preserve_reason: bool = False
) -> bool
Clear manual control override.
Only the manual-control flag is cleared. state and the counters (failure_count, success_count) are not modified. If a state transition is needed, the caller must invoke update_state first.
record_failure
record_failure(
service_name: str,
) -> CircuitBreakerStateData
Record a failure and return updated state.
Records a failure into the Sliding Window ring buffer, then updates state using the in-window failure/success counts.
record_success
record_success(
service_name: str,
) -> CircuitBreakerStateData
Record a success and return updated state.
Records a success into the Sliding Window ring buffer, then updates state using the in-window failure/success counts.
record_success_with_close_check
record_success_with_close_check(
service_name: str, success_threshold: int
) -> CircuitBreakerCloseAttempt
Atomic record-success + threshold-check + close transition.
Whole sequence executes under self._lock: eviction-aware window
increment, threshold check, and (if crossed from HALF_OPEN) the
close transition with _clear_window are one critical section.
Closes the TOCTOU race where multiple stale-view callers each pass
the threshold check and emit duplicate CIRCUIT_BREAKER_CLOSED
events for the same logical recovery.
get_all_states
get_all_states() -> list[CircuitBreakerStateData]
Get all circuit breaker states.
reset
reset(service_name: str) -> bool
Reset circuit breaker to initial closed state.
atomic_force_open
atomic_force_open(
service_name: str,
reason: str = "",
controlled_by_id: int | None = None,
ttl_minutes: int = 90,
) -> tuple[bool, str, str]
Atomically force open a circuit breaker.
atomic_force_close
atomic_force_close(
service_name: str,
reason: str = "",
controlled_by_id: int | None = None,
) -> tuple[bool, str, str]
Atomically force close a circuit breaker.
atomic_reset
atomic_reset(
service_name: str,
reason: str = "",
controlled_by_id: int | None = None,
) -> tuple[bool, str, str]
Atomically reset a circuit breaker to initial state.
try_acquire_half_open_slot
try_acquire_half_open_slot(
service_name: str,
limit: int,
stuck_timeout_seconds: int,
) -> tuple[bool, str, str]
Atomic HALF_OPEN slot acquisition under RLock.
reset_half_open_count
reset_half_open_count(service_name: str) -> None
Reset HALF_OPEN counter and clear window watermark.
get_open_states
get_open_states(
limit: int | None = None,
) -> list[CircuitBreakerStateData]
Get OPEN circuit breaker states, oldest-first.
get_all_open
get_all_open() -> list[CircuitBreakerStateData]
Get all open circuit breakers.
delete
delete(service_name: str) -> bool
Delete a circuit breaker state.
delete_state
delete_state(service_name: str) -> bool
Delete circuit breaker state (alias for delete).
clear
clear() -> None
Clear all entries (for testing).