baldur.services — Service Access
The re-export facade for service getters and the core service classes. Resolve the circuit-breaker and replay services here; the SLA-threshold helper exposes the configured breach thresholds.
Service getters
get_circuit_breaker_service
get_circuit_breaker_service() -> CircuitBreakerService
Return the runtime-scoped CircuitBreakerService singleton.
Delegates to the active :class:~baldur.runtime.BaldurRuntime —
test isolation, copy_context() scoping, and runtime
swap-in work transparently through the runtime's singleton store.
Explicit-def wrapper (instead of tuple-unpacking the
make_singleton_factory return value directly) so the symbol is
statically discoverable by docs tooling (mkdocstrings/griffe) and carries
a Public-surface docstring per the reference page contract.
get_replay_service
get_replay_service() -> ReplayService
Get the singleton replay service instance.
get_sla_thresholds
get_sla_thresholds() -> SLASettings
Get SLA thresholds configuration.
Core service classes
CircuitBreakerService
CircuitBreakerService(
config: CircuitBreakerConfig | None = None,
repository: CircuitBreakerStateRepository | None = None,
)
Bases: EventEmitterMixin, ProtectionMixin, ManualControlMixin
Circuit Breaker Service.
Provides management operations for circuit breaker states. Designed for manual (toggle-based) control by operators.
Usage
service = CircuitBreakerService()
Force open (block requests)
result = service.force_open( service_name="external_api", reason="External service maintenance", controlled_by=admin_user )
Force close (allow requests)
result = service.force_close( service_name="external_api", reason="Service recovered", controlled_by=admin_user, trigger_replay=True )
Check if requests should be allowed
if service.should_allow("external_api"): # proceed with request
For testing with mock repository
mock_repo = Mock(spec=CircuitBreakerStateRepository) service = CircuitBreakerService(repository=mock_repo)
Initialize the circuit breaker service.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
config
|
CircuitBreakerConfig | None
|
Optional configuration, loads from settings if None |
None
|
repository
|
CircuitBreakerStateRepository | None
|
Optional repository for DI, uses Django adapter if None |
None
|
repository
property
repository: CircuitBreakerStateRepository
Get the repository using ProviderRegistry with fallback policy.
is_enabled
property
is_enabled: bool
Check if circuit breaker is enabled.
register_downstream_checker
register_downstream_checker(
checker: Callable[[str], bool],
) -> None
Register a should_allow() pre-check hook.
checker(service_name) → False triggers a preemptive Fallback. checker MUST only perform local in-memory lookups (no external I/O).
apply_threshold_override
apply_threshold_override(
service_name: str, override: Any
) -> None
Apply a threshold override set by the mesh coordinator.
While the override is active, the service's failure_threshold and recovery_timeout use the override values.
remove_threshold_override
remove_threshold_override(service_name: str) -> None
Remove the threshold override and revert to the original config.
get_effective_config
get_effective_config(
service_name: str,
) -> CircuitBreakerConfig
Return the effective config with overrides applied.
Lookup happens in the L1 local cache, so there is no external I/O. Without an override, returns the base config; otherwise replaces only the overridden fields.
get_or_create_state
get_or_create_state(
service_name: str,
) -> CircuitBreakerStateData
Get or create a circuit breaker state for a service.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
service_name
|
str
|
Name of the external service |
required |
Returns:
| Type | Description |
|---|---|
CircuitBreakerStateData
|
CircuitBreakerStateData instance |
get_state
get_state(service_name: str) -> str
Get the current state of a circuit breaker.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
service_name
|
str
|
Name of the external service |
required |
Returns:
| Type | Description |
|---|---|
str
|
Current state (closed, open, half_open) |
should_allow
should_allow(service_name: str) -> bool
Check if requests should be allowed through the circuit breaker.
Post-476: HALF_OPEN slot acquisition is delegated to the repository's
atomic try_acquire_half_open_slot so the per-service counter is
cluster-wide accurate (Redis Lua) instead of per-process best-effort.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
service_name
|
str
|
Name of the external service |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if requests should be allowed, False if blocked |
should_allow_with_state
should_allow_with_state(
service_name: str,
) -> CircuitBreakerDecision
Companion API to should_allow that returns the admission decision
and the resolved state in a single call.
Closes the redundant get_or_create_state lookup that
CircuitBreakerPolicy.execute() previously incurred on the reject
path: the policy can now read decision.allowed for branching and
decision.state.state for the rejection metadata without a second
repository RLock acquire.
For is_enabled=False callers we return CircuitBreakerDecision
with allowed=True and a freshly-fetched state — direct callers of
the companion API contract receive a non-None state regardless of
feature-flag posture. CircuitBreakerPolicy short-circuits on
is_enabled before invoking this method, so the disabled-CB fetch
is off the hot path.
should_allow_with_fallback
should_allow_with_fallback(
service_name: str,
cache_key: str | None = None,
default_response: Any | None = None,
request_data: dict[str, Any] | None = None,
) -> CircuitBreakerFallbackResult
Check if requests should be allowed with fallback strategy support.
.. deprecated:: This method is deprecated. Use a CircuitBreakerPolicy + FallbackPolicy combination instead.
When CB is open, instead of simply blocking, this method can: 1. Return cached (stale) data 2. Queue the request to DLQ for later retry 3. Return a default/static response
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
service_name
|
str
|
Name of the external service |
required |
cache_key
|
str | None
|
Optional Redis key for cached data lookup |
None
|
default_response
|
Any | None
|
Optional default response to return |
None
|
request_data
|
dict[str, Any] | None
|
Optional request data for DLQ queueing |
None
|
Returns:
| Type | Description |
|---|---|
CircuitBreakerFallbackResult
|
CircuitBreakerFallbackResult with decision and optional fallback data |
get_total_calls
get_total_calls(service_name: str) -> int
Get total call count for a service (success + failure).
Used for minimum_calls check to prevent false positives.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
service_name
|
str
|
Name of the external service |
required |
Returns:
| Type | Description |
|---|---|
int
|
Total number of calls tracked |
get_all_states
get_all_states() -> list[dict[str, Any]]
Get all circuit breaker states.
Returns:
| Type | Description |
|---|---|
list[dict[str, Any]]
|
List of state dictionaries |
get_open_states
get_open_states(
limit: int | None = None,
) -> list[CircuitBreakerStateData]
Get circuit breaker states currently in OPEN state.
More efficient than get_all_states() for watchdog recovery which only needs OPEN states. Delegates to repository.get_open_states() which uses SCAN instead of KEYS in Redis.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
limit
|
int | None
|
Maximum number of results. None means no limit. |
None
|
Returns:
| Type | Description |
|---|---|
list[CircuitBreakerStateData]
|
List of CircuitBreakerStateData with state == OPEN, |
list[CircuitBreakerStateData]
|
ordered by opened_at ascending (oldest first). |
get_aggregate_failure_rate
get_aggregate_failure_rate() -> float
Return the system-wide circuit-breaker failure fraction (0.0-1.0).
Aggregates over every tracked circuit-breaker state as
sum(failure_count) / sum(failure_count + success_count), returning
0.0 when no calls have been recorded across any circuit.
This is a system-wide mean error fraction, not a fixed-time-window or per-service rate: a single failing service among many healthy ones is averaged below threshold, while a broad multi-service failure raises the mean. That makes it suited to a system-wide stability gate, where each individual service is still protected by its own circuit breaker.
Returns:
| Type | Description |
|---|---|
float
|
Failure fraction in the range 0.0-1.0. |
record_failure
record_failure(
service_name: str,
error_context: dict[str, Any] | None = None,
hint_state: CircuitBreakerStateData | None = None,
) -> None
Record a failure for a service.
This is used for automatic circuit breaker mode. If the threshold is exceeded AND minimum_calls is met, the circuit opens automatically.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
service_name
|
str
|
Name of the external service |
required |
error_context
|
dict[str, Any] | None
|
Optional context about the failure (for snapshot) |
None
|
hint_state
|
CircuitBreakerStateData | None
|
Optional pre-fetched state — when the caller
already loaded the state via |
None
|
record_success
record_success(
service_name: str,
hint_state: CircuitBreakerStateData | None = None,
) -> None
Record a success for a service.
This is used for automatic circuit breaker mode. In half-open state, enough successes will close the circuit.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
service_name
|
str
|
Name of the external service |
required |
hint_state
|
CircuitBreakerStateData | None
|
Optional pre-fetched state — when the caller
already loaded the state via |
None
|
check_recovery_transitions
check_recovery_transitions() -> dict
Check for circuit breakers that should transition from OPEN to HALF_OPEN.
This method should be called periodically (e.g., every minute) to check if any OPEN circuits have exceeded the recovery timeout and should transition to HALF_OPEN for testing.
Returns:
| Type | Description |
|---|---|
dict
|
Dictionary with transitioned service names and count |
manual_control
manual_control(
service_name: str,
action: str,
reason: str = "",
controlled_by: Any = None,
) -> CircuitBreakerResult
Manually control a circuit breaker state.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
service_name
|
str
|
Name of the service |
required |
action
|
str
|
'open', 'close', or 'auto' |
required |
reason
|
str
|
Reason for the control action |
''
|
controlled_by
|
Any
|
User who initiated the action |
None
|
Returns:
| Type | Description |
|---|---|
CircuitBreakerResult
|
CircuitBreakerResult with operation details |
reconcile_cb_cell_mapping
reconcile_cb_cell_mapping() -> dict[str, Any]
Reconcile CB-to-Cell mapping consistency after a Ring Resize.
- Iterate over all CBs and extract cell_id from the Composite Key
- Compare against the correct cell_id per the current Hash Ring
- On mismatch: archive + delete the orphan CB (no state transition)
- CBs for new Cells are lazily created by get_or_create()
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
|
ReplayService
ReplayService(
repository: FailedOperationRepository | None = None,
cache: CacheProviderInterface | None = None,
)
Bases: EventEmitterMixin
DLQ Replay Service.
Orchestrates replay operations for failed operations.
Usage
service = ReplayService()
Single replay
result = service.replay_single(dlq_id=123)
Batch replay
batch_result = service.replay_batch( failure_type="PG_TIMEOUT", max_items=50 )
For testing with mock repository
mock_repo = Mock(spec=FailedOperationRepository) service = ReplayService(repository=mock_repo)
Initialize the replay service.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
repository
|
FailedOperationRepository | None
|
Optional repository for DI, uses Django adapter if None |
None
|
cache
|
CacheProviderInterface | None
|
Optional cache provider for the per-service inflight lock
guarding |
None
|
repository
property
repository: FailedOperationRepository
Get the repository using ProviderRegistry with fallback policy.
cache
property
cache: CacheProviderInterface | None
Lazy-resolve the cache provider for the circuit-close inflight lock.
Returns None if no provider can be resolved — caller falls
open in that case. The inflight lock uses cache.get_lock()
(owner-fenced DistributedLock), so adapter-level lock support is
validated at acquire-time rather than via a separate setnx gate.
getattr with defaults handles test fixtures that bypass __init__
via ReplayService.__new__(...). A bypassed-init instance is
observationally identical to a fresh instance with cache=None for
this fail-open guard.
replay_single
replay_single(dlq_id: str) -> ReplayResult
Replay a single DLQ entry.
This method uses atomic acquisition to prevent race conditions when multiple workers try to replay the same entry simultaneously.
Safety Checks (via check_all_governance): 1. Kill Switch - system-wide deactivation check 2. Emergency Level - blocked at LEVEL_2+ to protect resources 3. ErrorBudgetGate - automation blocked when the error budget is exhausted
Audit Logging: - Blocks are automatically recorded in the AuditLog
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
dlq_id
|
str
|
ID of the FailedOperation to replay |
required |
Returns:
| Type | Description |
|---|---|
ReplayResult
|
ReplayResult indicating success or failure |
replay_batch
replay_batch(
domain: str | None = None,
failure_type: str | None = None,
max_items: int = 100,
use_adaptive: bool | None = None,
use_priority: bool | None = None,
) -> BatchReplayResult
Replay multiple DLQ entries matching criteria.
Safety Checks (via check_all_governance): 1. Kill Switch - system-wide deactivation check 2. Emergency Level - blocked at LEVEL_2+ to protect resources 3. ErrorBudgetGate - automation blocked when the error budget is exhausted
Adaptive Mode: - When adaptive_enabled=True in RuntimeConfig, batch size is dynamic - High failure rate (>=20%) reduces batch size by 20% - 3 consecutive perfect batches increases batch size by 5
Priority Mode: - When priority_enabled=True in RuntimeConfig, domains are processed by priority - Critical domains are processed first, then normal, then low - Respects domain-specific max_retries overrides
Audit Logging: - Blocks are automatically recorded in the AuditLog
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
domain
|
str | None
|
Filter by domain (optional, ignored in priority mode) |
None
|
failure_type
|
str | None
|
Filter by failure type (optional) |
None
|
max_items
|
int
|
Maximum number of items to replay (ignored in adaptive mode) |
100
|
use_adaptive
|
bool | None
|
Override adaptive mode setting (None = use RuntimeConfig) |
None
|
use_priority
|
bool | None
|
Override priority mode setting (None = use RuntimeConfig) |
None
|
Returns:
| Type | Description |
|---|---|
BatchReplayResult
|
BatchReplayResult with summary and individual results |
replay_on_circuit_close
replay_on_circuit_close(
service_name: str,
max_items: int = 50,
escalate_failures: bool = True,
service_failure_type_map: (
dict[str, list[str]] | None
) = None,
) -> BatchReplayResult
Replay entries when circuit breaker closes.
This is triggered when an external service recovers. Only replays entries related to the recovered service.
IMPORTANT: When triggered by force_close with trigger_replay=True, any replay failures are escalated to REQUIRES_REVIEW status. This is because operator-initiated recovery implies the operator intended to resolve these items, so failures need explicit attention.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
service_name
|
str
|
Name of the service that recovered |
required |
max_items
|
int
|
Maximum number of items to replay |
50
|
escalate_failures
|
bool
|
If True, mark failed replays as REQUIRES_REVIEW |
True
|
service_failure_type_map
|
dict[str, list[str]] | None
|
Custom mapping of service names to failure types. If None, uses RuntimeConfig fallback. Example: {"my_service": ["TIMEOUT", "CONNECTION_ERROR"]} |
None
|
Returns:
| Type | Description |
|---|---|
BatchReplayResult
|
BatchReplayResult with summary. |
BatchReplayResult
|
the per-service inflight lock rejected this call as a duplicate. |
BatchReplayResult
dataclass
BatchReplayResult(
total: int = 0,
success_count: int = 0,
failed_count: int = 0,
skipped_count: int = 0,
results: list[ReplayResult] = list(),
governance_blocked: bool = False,
governance_block_reason: str = "",
inflight_skipped: bool = False,
priority_used: bool = False,
domains_processed: list[str] | None = None,
)
Result of a batch replay operation.
ReplayResult
dataclass
ReplayResult(
success: bool,
dlq_id: str,
message: str = "",
error: str | None = None,
data: dict[str, Any] | None = None,
skipped: bool = False,
)
Result of a replay operation.
succeeded
classmethod
succeeded(
dlq_id: str, message: str = "", data: dict | None = None
) -> ReplayResult
Factory for successful replay.
failed
classmethod
failed(dlq_id: str, error: str) -> ReplayResult
Factory for failed replay.
skipped_result
classmethod
skipped_result(
dlq_id: str, reason: str = ""
) -> ReplayResult
Factory for idempotency-skipped replay.
blocked
classmethod
blocked(
dlq_id: str, governance_result: GovernanceCheckResult
) -> ReplayResult
Factory for governance-blocked replay.