Skip to content

baldur — Types & Service Access

Core public data types and the entry points for resolving services: the ProviderRegistry, the circuit-breaker service accessor, and the replay service.

Core types

CircuitState

Bases: str, Enum

Circuit breaker states

FailedOperationData dataclass

FailedOperationData(
    id: str,
    domain: str,
    failure_type: str,
    status: str,
    entity_type: str | None = None,
    entity_id: str | None = None,
    entity_refs: dict[str, Any] = dict(),
    user_id: int | None = None,
    snapshot_data: dict[str, Any] = dict(),
    error_code: str = "",
    error_message: str = "",
    retry_count: int = 0,
    max_retries: int = 2,
    last_retry_at: datetime | None = None,
    request_data: dict[str, Any] = dict(),
    response_data: dict[str, Any] = dict(),
    metadata: dict[str, Any] = dict(),
    resolved_at: datetime | None = None,
    resolved_by_id: int | None = None,
    resolution_type: str = "",
    resolution_note: str = "",
    next_action_hint: str = "",
    recommended_action: str = "",
    created_at: datetime | None = None,
    updated_at: datetime | None = None,
    expires_at: datetime | None = None,
)

Data transfer object for FailedOperation model.

Contains all necessary fields for DLQ operations without Django model dependencies.

is_pending property

is_pending: bool

Check if operation is pending review

is_resolved property

is_resolved: bool

Check if operation is resolved

can_retry property

can_retry: bool

Check if operation can be retried

Service access

ProviderRegistry

Central registry for all pluggable components.

Each adapter/repository/strategy type is a GenericProviderRegistry class attribute. Thread-safe singleton creation is handled by GenericProviderRegistry's DCL pattern.

Sub-registries

cache, queue, async_queue — adapters audit, traffic_routing — adapters notification, alert — adapters rate_limit_storage — adapters failed_op_repo, circuit_breaker_repo, security_repo — repositories event_journal_repo, mesh_override_store — repositories correlation_strategy, root_cause_strategy — strategies graph_build_strategy — strategies anomaly_detection, forecast, classification — ML strategies optimization — ML strategies finops_service, learning_service — feature services compliance_engine, predictive_forecaster_service — feature services worker_background_starts — start callables shutdown_integrations, startup_integrations — lifecycle callables

Special (non-generic) registries: statistics — singleton adapter, not name-based

get_cache classmethod

get_cache(
    name: str | None = None, singleton: bool = True
) -> CacheProviderInterface

Get cache provider instance, wrapped with metrics decorator.

Parameters:

Name Type Description Default
name str | None

Provider name (e.g., 'redis', 'memory')

None
singleton bool

If True, return cached instance

True

Returns:

Type Description
CacheProviderInterface

CacheProviderInterface instance (metrics-wrapped)

get_audit_adapter classmethod

get_audit_adapter(
    name: str | None = None, singleton: bool = True
) -> AuditLogAdapter

Get audit adapter instance.

Parameters:

Name Type Description Default
name str | None

Adapter name (e.g., 'file', 'stdout', 'null')

None
singleton bool

If True, return cached instance

True

Returns:

Type Description
AuditLogAdapter

AuditLogAdapter instance

get_event_journal_repo classmethod

get_event_journal_repo(
    name: str | None = None, singleton: bool = True
) -> EventJournalRepository

Get event journal repository instance.

When name is None the registry default is used. The default is "memory" at module load and is rewired by init() to "redis"/"sql"/"memory" via the event_journal_repo PRIORITY_CHAIN row, honoring BALDUR_REDIS_URL and the BALDUR_EVENT_JOURNAL_BACKEND operator override. Passing an explicit name bypasses that wired default.

Parameters:

Name Type Description Default
name str | None

Repository name (e.g., 'memory', 'redis', 'sql')

None
singleton bool

If True, return cached instance

True

Returns:

Type Description
EventJournalRepository

EventJournalRepository instance

register_statistics_adapter classmethod

register_statistics_adapter(
    adapter: StatisticsRepositoryInterface,
) -> None

Register a statistics adapter.

Should be called during app initialization (e.g., Django's AppConfig.ready()). Only one statistics adapter can be registered at a time.

Parameters:

Name Type Description Default
adapter StatisticsRepositoryInterface

StatisticsRepositoryInterface implementation

required

get_statistics_repo classmethod

get_statistics_repo() -> StatisticsRepositoryInterface

Get statistics repository instance.

Returns the registered statistics adapter, or NullStatisticsRepository if no adapter is registered.

Returns:

Type Description
StatisticsRepositoryInterface

StatisticsRepositoryInterface instance

has_statistics_adapter classmethod

has_statistics_adapter() -> bool

Check if a statistics adapter is registered.

register_postmortem_repo classmethod

register_postmortem_repo(
    name: str, repo_class: type
) -> None

Register a postmortem repository.

get_postmortem_repo classmethod

get_postmortem_repo(
    name: str | None = None, singleton: bool = True
) -> PostmortemRepository

Get postmortem repository instance.

has_postmortem_repo classmethod

has_postmortem_repo() -> bool

Check if any postmortem repository is registered.

register_cascade_event_repo classmethod

register_cascade_event_repo(
    name: str, repo_class: type
) -> None

Register a cascade event archive repository.

get_cascade_event_repo classmethod

get_cascade_event_repo(
    name: str | None = None, singleton: bool = True
) -> CascadeEventArchiveRepository

Get cascade event archive repository instance.

register_recovery_session_repo classmethod

register_recovery_session_repo(
    name: str, repo_class: type
) -> None

Register a recovery session archive repository.

get_recovery_session_repo classmethod

get_recovery_session_repo(
    name: str | None = None, singleton: bool = True
) -> RecoverySessionArchiveRepository

Get recovery session archive repository instance.

register_cache classmethod

register_cache(name: str, provider_class: type) -> None

Register a cache provider adapter.

register_queue classmethod

register_queue(name: str, provider_class: type) -> None

Register a task queue adapter.

register_async_queue classmethod

register_async_queue(
    name: str, adapter_class: type
) -> None

Register an async task queue adapter.

register_failed_operation_repo classmethod

register_failed_operation_repo(
    name: str, repo_class: type
) -> None

Register a failed operation repository.

register_circuit_breaker_repo classmethod

register_circuit_breaker_repo(
    name: str, repo_class: type
) -> None

Register a circuit breaker state repository.

register_security_repo classmethod

register_security_repo(name: str, repo_class: type) -> None

Register a security incident repository.

register_event_journal_repo classmethod

register_event_journal_repo(
    name: str, repo_class: type
) -> None

Register an event journal repository.

register_audit_adapter classmethod

register_audit_adapter(
    name: str, adapter_class: type
) -> None

Register an audit log adapter.

register_traffic_routing classmethod

register_traffic_routing(
    name: str, adapter_class: type
) -> None

Register a traffic routing adapter.

register_notification classmethod

register_notification(
    name: str, adapter_class: type | Callable[..., Any]
) -> None

Register a notification adapter (class or zero-arg factory).

register_alert classmethod

register_alert(
    name: str, factory: type | Callable[..., Any]
) -> None

Register an alert adapter (class or zero-arg factory).

register_correlation_strategy classmethod

register_correlation_strategy(
    name: str, strategy_class: type
) -> None

Register a correlation strategy.

register_root_cause_strategy classmethod

register_root_cause_strategy(
    name: str, strategy_class: type
) -> None

Register a root cause strategy.

register_graph_build_strategy classmethod

register_graph_build_strategy(
    name: str, strategy_class: type
) -> None

Register a graph build strategy.

register_mesh_override_store classmethod

register_mesh_override_store(
    name: str, store_class: type
) -> None

Register a mesh override store implementation.

get_queue classmethod

get_queue(
    name: str | None = None, singleton: bool = True
) -> TaskQueueInterface

Get task queue instance.

get_async_queue classmethod

get_async_queue(
    name: str | None = None, singleton: bool = True
) -> AsyncTaskQueueInterface

Get async task queue instance.

get_failed_operation_repo classmethod

get_failed_operation_repo(
    name: str | None = None, singleton: bool = True
) -> FailedOperationRepository

Get failed operation repository instance.

get_circuit_breaker_repo classmethod

get_circuit_breaker_repo(
    name: str | None = None, singleton: bool = True
) -> CircuitBreakerStateRepository

Get circuit breaker state repository instance.

get_security_repo classmethod

get_security_repo(
    name: str | None = None, singleton: bool = True
) -> SecurityIncidentRepository

Get security incident repository instance.

get_traffic_routing classmethod

get_traffic_routing(
    name: str | None = None, singleton: bool = True
) -> TrafficRoutingAdapter

Get traffic routing adapter instance.

get_notification classmethod

get_notification(
    name: str | None = None,
) -> NotificationAdapter

Get notification adapter instance.

get_alert classmethod

get_alert(name: str | None = None) -> AlertAdapter

Get alert adapter instance.

get_mesh_override_store classmethod

get_mesh_override_store(
    name: str | None = None, singleton: bool = True
) -> Any

Get a mesh override store instance.

Vestigial API-compat accessor: the store implementation moved to the private distribution with the circuit_mesh feature, so the slot is empty on OSS installs and this raises AdapterNotFoundError unless a provider was registered via register_mesh_override_store.

get_correlation_strategy classmethod

get_correlation_strategy(
    name: str,
) -> type | Callable[..., Any]

Get registered correlation strategy class (not instance).

Strategy registries store class references that callers instantiate themselves, so we return the provider class directly rather than calling GenericProviderRegistry.get() which would instantiate it.

get_root_cause_strategy classmethod

get_root_cause_strategy(
    name: str,
) -> type | Callable[..., Any]

Get registered root cause strategy class (not instance).

get_graph_build_strategy classmethod

get_graph_build_strategy(
    name: str,
) -> type | Callable[..., Any]

Get registered graph build strategy class (not instance).

set_defaults classmethod

set_defaults(
    cache: str | None = None,
    queue: str | None = None,
    repo: str | None = None,
) -> None

Set default providers.

Parameters:

Name Type Description Default
cache str | None

Default cache provider name

None
queue str | None

Default task queue name

None
repo str | None

Default repository name (applied to all repo registries)

None

get_defaults classmethod

get_defaults() -> dict[str, str | None]

Get current default provider names.

list_providers classmethod

list_providers() -> dict[str, Any]

List all registered providers.

clear_instances classmethod

clear_instances() -> None

Clear all cached instances (providers remain registered).

reset classmethod

reset() -> None

Reset registry to initial state (all providers and instances cleared).

health_check_all classmethod

health_check_all() -> dict[str, bool]

Run health checks on all default providers.

Returns:

Type Description
dict[str, bool]

Dict mapping provider type to health status

get_circuit_breaker_service

get_circuit_breaker_service() -> CircuitBreakerService

Return the runtime-scoped CircuitBreakerService singleton.

Delegates to the active :class:~baldur.runtime.BaldurRuntime — test isolation, copy_context() scoping, and runtime swap-in work transparently through the runtime's singleton store.

Explicit-def wrapper (instead of tuple-unpacking the make_singleton_factory return value directly) so the symbol is statically discoverable by docs tooling (mkdocstrings/griffe) and carries a Public-surface docstring per the reference page contract.

Replay

ReplayService

ReplayService(
    repository: FailedOperationRepository | None = None,
    cache: CacheProviderInterface | None = None,
)

Bases: EventEmitterMixin

DLQ Replay Service.

Orchestrates replay operations for failed operations.

Usage

service = ReplayService()

Single replay

result = service.replay_single(dlq_id=123)

Batch replay

batch_result = service.replay_batch( failure_type="PG_TIMEOUT", max_items=50 )

For testing with mock repository

mock_repo = Mock(spec=FailedOperationRepository) service = ReplayService(repository=mock_repo)

Initialize the replay service.

Parameters:

Name Type Description Default
repository FailedOperationRepository | None

Optional repository for DI, uses Django adapter if None

None
cache CacheProviderInterface | None

Optional cache provider for the per-service inflight lock guarding replay_on_circuit_close. If omitted, the provider is lazy-resolved via ProviderRegistry on first use. If resolution fails or the resolved provider does not support get_lock(), the guard fails open with a WARNING log.

None

repository property

repository: FailedOperationRepository

Get the repository using ProviderRegistry with fallback policy.

cache property

cache: CacheProviderInterface | None

Lazy-resolve the cache provider for the circuit-close inflight lock.

Returns None if no provider can be resolved — caller falls open in that case. The inflight lock uses cache.get_lock() (owner-fenced DistributedLock), so adapter-level lock support is validated at acquire-time rather than via a separate setnx gate.

getattr with defaults handles test fixtures that bypass __init__ via ReplayService.__new__(...). A bypassed-init instance is observationally identical to a fresh instance with cache=None for this fail-open guard.

replay_single

replay_single(dlq_id: str) -> ReplayResult

Replay a single DLQ entry.

This method uses atomic acquisition to prevent race conditions when multiple workers try to replay the same entry simultaneously.

Safety Checks (via check_all_governance): 1. Kill Switch - system-wide deactivation check 2. Emergency Level - blocked at LEVEL_2+ to protect resources 3. ErrorBudgetGate - automation blocked when the error budget is exhausted

Audit Logging: - Blocks are automatically recorded in the AuditLog

Parameters:

Name Type Description Default
dlq_id str

ID of the FailedOperation to replay

required

Returns:

Type Description
ReplayResult

ReplayResult indicating success or failure

replay_batch

replay_batch(
    domain: str | None = None,
    failure_type: str | None = None,
    max_items: int = 100,
    use_adaptive: bool | None = None,
    use_priority: bool | None = None,
) -> BatchReplayResult

Replay multiple DLQ entries matching criteria.

Safety Checks (via check_all_governance): 1. Kill Switch - system-wide deactivation check 2. Emergency Level - blocked at LEVEL_2+ to protect resources 3. ErrorBudgetGate - automation blocked when the error budget is exhausted

Adaptive Mode: - When adaptive_enabled=True in RuntimeConfig, batch size is dynamic - High failure rate (>=20%) reduces batch size by 20% - 3 consecutive perfect batches increases batch size by 5

Priority Mode: - When priority_enabled=True in RuntimeConfig, domains are processed by priority - Critical domains are processed first, then normal, then low - Respects domain-specific max_retries overrides

Audit Logging: - Blocks are automatically recorded in the AuditLog

Parameters:

Name Type Description Default
domain str | None

Filter by domain (optional, ignored in priority mode)

None
failure_type str | None

Filter by failure type (optional)

None
max_items int

Maximum number of items to replay (ignored in adaptive mode)

100
use_adaptive bool | None

Override adaptive mode setting (None = use RuntimeConfig)

None
use_priority bool | None

Override priority mode setting (None = use RuntimeConfig)

None

Returns:

Type Description
BatchReplayResult

BatchReplayResult with summary and individual results

replay_on_circuit_close

replay_on_circuit_close(
    service_name: str,
    max_items: int = 50,
    escalate_failures: bool = True,
    service_failure_type_map: (
        dict[str, list[str]] | None
    ) = None,
) -> BatchReplayResult

Replay entries when circuit breaker closes.

This is triggered when an external service recovers. Only replays entries related to the recovered service.

IMPORTANT: When triggered by force_close with trigger_replay=True, any replay failures are escalated to REQUIRES_REVIEW status. This is because operator-initiated recovery implies the operator intended to resolve these items, so failures need explicit attention.

Parameters:

Name Type Description Default
service_name str

Name of the service that recovered

required
max_items int

Maximum number of items to replay

50
escalate_failures bool

If True, mark failed replays as REQUIRES_REVIEW

True
service_failure_type_map dict[str, list[str]] | None

Custom mapping of service names to failure types. If None, uses RuntimeConfig fallback. Example: {"my_service": ["TIMEOUT", "CONNECTION_ERROR"]}

None

Returns:

Type Description
BatchReplayResult

BatchReplayResult with summary. inflight_skipped=True indicates

BatchReplayResult

the per-service inflight lock rejected this call as a duplicate.