Skip to content

baldur.adapters.memory — Layered, Incident & Archive Repositories

The layered (L1+L2) circuit-breaker variant — Memory + Redis/DB for high-throughput distributed access — plus the in-memory security-incident, postmortem, archive, and event-journal repositories.

LayeredCircuitBreakerStateRepository

LayeredCircuitBreakerStateRepository(
    l2_repo: CircuitBreakerStateRepository | None = None,
    sync_interval_seconds: float = 5.0,
    adapter_type: str = "unknown",
    drift_reconciler: DriftReconciler | None = None,
    sliding_window_size: int = 100,
)

Bases: L2LoadMixin, ErrorHandlingMixin, DriftOperationsMixin, L2SyncMixin, RepositoryOperationsMixin, MonitoringMixin, AuditHelpersMixin, LayeredRepositoryBase, CircuitBreakerStateRepository

Hybrid layered repository (L1 Memory + L2 Shared Storage).

Advantages: - Even if external dependencies (Redis/DB) die briefly, the system keeps running on L1 only - Maintains eventual consistency even in distributed environments - Does not intrude on the host DB (L2 is opt-in)

Usage

Memory only (default, single server)

repo = LayeredCircuitBreakerStateRepository()

Add Redis as L2 (distributed environment)

from baldur.adapters.redis import RedisCircuitBreakerStateRepository repo = LayeredCircuitBreakerStateRepository( l2_repo=RedisCircuitBreakerStateRepository(), sync_interval_seconds=5, )

InMemorySecurityIncidentRepository

InMemorySecurityIncidentRepository()

Bases: SecurityIncidentRepository

In-memory implementation of SecurityIncidentRepository.

Thread-safe storage for security incidents in memory.

create

create(
    incident_type: str,
    severity: str,
    description: str = "",
    source_ip: str | None = None,
    user_agent: str = "",
    user_id: int | None = None,
    entity_refs: dict[str, int] | None = None,
    raw_payload: dict[str, Any] | None = None,
) -> SecurityIncidentData

Create a new security incident (domain-neutral).

get_by_id

get_by_id(id: int) -> SecurityIncidentData | None

Get a security incident by ID.

update_status

update_status(
    id: int,
    status: str,
    investigation_notes: str = "",
    assigned_to_id: int | None = None,
) -> bool

Update incident status.

find_by_type

find_by_type(
    incident_type: str,
    status: str | None = None,
    limit: int = 100,
) -> list[SecurityIncidentData]

Find incidents by type.

find_by_source_ip

find_by_source_ip(
    source_ip: str, since: datetime | None = None
) -> list[SecurityIncidentData]

Find incidents by source IP.

count_by_source_ip

count_by_source_ip(source_ip: str, since: datetime) -> int

Count incidents by source IP since a given time.

get_open_incidents

get_open_incidents(
    limit: int = 100,
) -> list[SecurityIncidentData]

Get all open incidents.

get_by_type

get_by_type(
    incident_type: str, limit: int = 100
) -> list[SecurityIncidentData]

Get incidents by type.

get_by_severity

get_by_severity(
    severity: str, limit: int = 100
) -> list[SecurityIncidentData]

Get incidents by severity.

mark_as_resolved

mark_as_resolved(
    id: int, investigation_notes: str = ""
) -> bool

Mark incident as resolved.

get_recent_by_ip

get_recent_by_ip(
    source_ip: str, hours: int = 24, limit: int = 100
) -> list[SecurityIncidentData]

Get recent incidents from a specific IP.

count_by_type_since

count_by_type_since(
    incident_type: str, since: datetime
) -> int

Count incidents of a type since a given time.

get_statistics

get_statistics() -> dict[str, Any]

Get statistics about security incidents.

clear

clear() -> None

Clear all entries (for testing).

InMemoryPostmortemRepository

InMemoryPostmortemRepository()

Bases: PostmortemRepository

In-memory implementation for PostmortemRepository.

Thread-safe. Suitable for testing and non-Django environments.

save

save(data: PostmortemData) -> bool

Persist a postmortem record.

get_by_incident_id

get_by_incident_id(
    incident_id: str,
) -> PostmortemData | None

Retrieve a single postmortem by incident ID.

find

find(
    *,
    start_date: datetime | None = None,
    end_date: datetime | None = None,
    service: str | None = None,
    min_duration: float | None = None,
    offset: int = 0,
    limit: int = 100
) -> list[PostmortemData]

Query postmortems with optional filters.

count

count(
    *,
    start_date: datetime | None = None,
    end_date: datetime | None = None,
    service: str | None = None,
    min_duration: float | None = None
) -> int

Count postmortems matching filters.

update_fields

update_fields(
    incident_id: str, fields: dict[str, Any]
) -> bool

Partial update of specific fields.

clear

clear() -> None

Clear all entries (for test cleanup).

InMemoryCascadeEventArchiveRepository

InMemoryCascadeEventArchiveRepository()

Bases: CascadeEventArchiveRepository

In-memory implementation for CascadeEventArchiveRepository.

Thread-safe with RLock. Suitable for testing and non-Django environments.

save

save(data: CascadeEventData) -> bool

Persist a cascade event record (overwrite on duplicate).

get_by_cascade_id

get_by_cascade_id(
    cascade_id: str,
) -> CascadeEventData | None

Retrieve a single cascade event by ID.

find

find(
    *,
    namespace: str | None = None,
    trigger_type: str | None = None,
    start_date: datetime | None = None,
    end_date: datetime | None = None,
    is_test: bool | None = None,
    offset: int = 0,
    limit: int = 100
) -> list[CascadeEventData]

Query with optional filters. Results ordered by timestamp DESC.

count

count(
    *,
    namespace: str | None = None,
    trigger_type: str | None = None,
    start_date: datetime | None = None,
    end_date: datetime | None = None
) -> int

Count cascade events matching filters.

delete_older_than

delete_older_than(cutoff: datetime) -> int

Delete archived events older than cutoff.

get_chain

get_chain(
    namespace: str,
    *,
    start_date: datetime | None = None,
    end_date: datetime | None = None
) -> list[CascadeEventData]

Retrieve hash chain for integrity verification. Ordered by timestamp ASC.

clear

clear() -> None

Clear all entries (for test cleanup).

InMemoryRecoverySessionArchiveRepository

InMemoryRecoverySessionArchiveRepository()

Bases: RecoverySessionArchiveRepository

In-memory implementation for RecoverySessionArchiveRepository.

Thread-safe with RLock. Suitable for testing and non-Django environments.

save

save(data: RecoverySessionData) -> bool

Persist a recovery session record (overwrite on duplicate).

get_by_session_id

get_by_session_id(
    session_id: str,
) -> RecoverySessionData | None

Retrieve a single session by ID.

find

find(
    *,
    namespace: str | None = None,
    status: str | None = None,
    start_date: datetime | None = None,
    end_date: datetime | None = None,
    offset: int = 0,
    limit: int = 100
) -> list[RecoverySessionData]

Query with optional filters. Results ordered by started_at DESC.

count

count(
    *,
    namespace: str | None = None,
    status: str | None = None
) -> int

Count sessions matching filters.

update

update(data: RecoverySessionData) -> bool

Full update of an existing session record.

delete_older_than

delete_older_than(cutoff: datetime) -> int

Delete archived sessions older than cutoff.

clear

clear() -> None

Clear all entries (for test cleanup).

InMemoryEventJournalRepository

InMemoryEventJournalRepository(
    max_entries: int = 10000, max_query_limit: int = 10000
)

Bases: EventJournalRepository

Thread-safe in-memory implementation. For tests and single-process use.