baldur.adapters.memory — Layered, Incident & Archive Repositories
The layered (L1+L2) circuit-breaker variant — Memory + Redis/DB for high-throughput distributed access — plus the in-memory security-incident, postmortem, archive, and event-journal repositories.
LayeredCircuitBreakerStateRepository
LayeredCircuitBreakerStateRepository(
l2_repo: CircuitBreakerStateRepository | None = None,
sync_interval_seconds: float = 5.0,
adapter_type: str = "unknown",
drift_reconciler: DriftReconciler | None = None,
sliding_window_size: int = 100,
)
Bases: L2LoadMixin, ErrorHandlingMixin, DriftOperationsMixin, L2SyncMixin, RepositoryOperationsMixin, MonitoringMixin, AuditHelpersMixin, LayeredRepositoryBase, CircuitBreakerStateRepository
Hybrid layered repository (L1 Memory + L2 Shared Storage).
Advantages: - Even if external dependencies (Redis/DB) die briefly, the system keeps running on L1 only - Maintains eventual consistency even in distributed environments - Does not intrude on the host DB (L2 is opt-in)
Usage
Memory only (default, single server)
repo = LayeredCircuitBreakerStateRepository()
Add Redis as L2 (distributed environment)
from baldur.adapters.redis import RedisCircuitBreakerStateRepository repo = LayeredCircuitBreakerStateRepository( l2_repo=RedisCircuitBreakerStateRepository(), sync_interval_seconds=5, )
InMemorySecurityIncidentRepository
InMemorySecurityIncidentRepository()
Bases: SecurityIncidentRepository
In-memory implementation of SecurityIncidentRepository.
Thread-safe storage for security incidents in memory.
create
create(
incident_type: str,
severity: str,
description: str = "",
source_ip: str | None = None,
user_agent: str = "",
user_id: int | None = None,
entity_refs: dict[str, int] | None = None,
raw_payload: dict[str, Any] | None = None,
) -> SecurityIncidentData
Create a new security incident (domain-neutral).
get_by_id
get_by_id(id: int) -> SecurityIncidentData | None
Get a security incident by ID.
update_status
update_status(
id: int,
status: str,
investigation_notes: str = "",
assigned_to_id: int | None = None,
) -> bool
Update incident status.
find_by_type
find_by_type(
incident_type: str,
status: str | None = None,
limit: int = 100,
) -> list[SecurityIncidentData]
Find incidents by type.
find_by_source_ip
find_by_source_ip(
source_ip: str, since: datetime | None = None
) -> list[SecurityIncidentData]
Find incidents by source IP.
count_by_source_ip
count_by_source_ip(source_ip: str, since: datetime) -> int
Count incidents by source IP since a given time.
get_open_incidents
get_open_incidents(
limit: int = 100,
) -> list[SecurityIncidentData]
Get all open incidents.
get_by_type
get_by_type(
incident_type: str, limit: int = 100
) -> list[SecurityIncidentData]
Get incidents by type.
get_by_severity
get_by_severity(
severity: str, limit: int = 100
) -> list[SecurityIncidentData]
Get incidents by severity.
mark_as_resolved
mark_as_resolved(
id: int, investigation_notes: str = ""
) -> bool
Mark incident as resolved.
get_recent_by_ip
get_recent_by_ip(
source_ip: str, hours: int = 24, limit: int = 100
) -> list[SecurityIncidentData]
Get recent incidents from a specific IP.
count_by_type_since
count_by_type_since(
incident_type: str, since: datetime
) -> int
Count incidents of a type since a given time.
get_statistics
get_statistics() -> dict[str, Any]
Get statistics about security incidents.
clear
clear() -> None
Clear all entries (for testing).
InMemoryPostmortemRepository
InMemoryPostmortemRepository()
Bases: PostmortemRepository
In-memory implementation for PostmortemRepository.
Thread-safe. Suitable for testing and non-Django environments.
save
save(data: PostmortemData) -> bool
Persist a postmortem record.
get_by_incident_id
get_by_incident_id(
incident_id: str,
) -> PostmortemData | None
Retrieve a single postmortem by incident ID.
find
find(
*,
start_date: datetime | None = None,
end_date: datetime | None = None,
service: str | None = None,
min_duration: float | None = None,
offset: int = 0,
limit: int = 100
) -> list[PostmortemData]
Query postmortems with optional filters.
count
count(
*,
start_date: datetime | None = None,
end_date: datetime | None = None,
service: str | None = None,
min_duration: float | None = None
) -> int
Count postmortems matching filters.
update_fields
update_fields(
incident_id: str, fields: dict[str, Any]
) -> bool
Partial update of specific fields.
clear
clear() -> None
Clear all entries (for test cleanup).
InMemoryCascadeEventArchiveRepository
InMemoryCascadeEventArchiveRepository()
Bases: CascadeEventArchiveRepository
In-memory implementation for CascadeEventArchiveRepository.
Thread-safe with RLock. Suitable for testing and non-Django environments.
save
save(data: CascadeEventData) -> bool
Persist a cascade event record (overwrite on duplicate).
get_by_cascade_id
get_by_cascade_id(
cascade_id: str,
) -> CascadeEventData | None
Retrieve a single cascade event by ID.
find
find(
*,
namespace: str | None = None,
trigger_type: str | None = None,
start_date: datetime | None = None,
end_date: datetime | None = None,
is_test: bool | None = None,
offset: int = 0,
limit: int = 100
) -> list[CascadeEventData]
Query with optional filters. Results ordered by timestamp DESC.
count
count(
*,
namespace: str | None = None,
trigger_type: str | None = None,
start_date: datetime | None = None,
end_date: datetime | None = None
) -> int
Count cascade events matching filters.
delete_older_than
delete_older_than(cutoff: datetime) -> int
Delete archived events older than cutoff.
get_chain
get_chain(
namespace: str,
*,
start_date: datetime | None = None,
end_date: datetime | None = None
) -> list[CascadeEventData]
Retrieve hash chain for integrity verification. Ordered by timestamp ASC.
clear
clear() -> None
Clear all entries (for test cleanup).
InMemoryRecoverySessionArchiveRepository
InMemoryRecoverySessionArchiveRepository()
Bases: RecoverySessionArchiveRepository
In-memory implementation for RecoverySessionArchiveRepository.
Thread-safe with RLock. Suitable for testing and non-Django environments.
save
save(data: RecoverySessionData) -> bool
Persist a recovery session record (overwrite on duplicate).
get_by_session_id
get_by_session_id(
session_id: str,
) -> RecoverySessionData | None
Retrieve a single session by ID.
find
find(
*,
namespace: str | None = None,
status: str | None = None,
start_date: datetime | None = None,
end_date: datetime | None = None,
offset: int = 0,
limit: int = 100
) -> list[RecoverySessionData]
Query with optional filters. Results ordered by started_at DESC.
count
count(
*,
namespace: str | None = None,
status: str | None = None
) -> int
Count sessions matching filters.
update
update(data: RecoverySessionData) -> bool
Full update of an existing session record.
delete_older_than
delete_older_than(cutoff: datetime) -> int
Delete archived sessions older than cutoff.
clear
clear() -> None
Clear all entries (for test cleanup).
InMemoryEventJournalRepository
InMemoryEventJournalRepository(
max_entries: int = 10000, max_query_limit: int = 10000
)
Bases: EventJournalRepository
Thread-safe in-memory implementation. For tests and single-process use.