baldur.interfaces — Eventing, Notification & Audit
The unified event-bus protocol (with its Kafka-shaped sub-protocols and NoOp defaults), the notification and alert adapter contracts, the audit-log adapter surface, and the traffic-routing adapter for multi-region failover.
Event bus
EventBusProtocol
Bases: Protocol
Protocol for event bus implementations.
Both BaldurEventBus (L1 in-memory) and RedisEventBus (L2 distributed) implement this protocol. Used as the return type of the unified get_event_bus() factory.
ConsumedEventProtocol
Bases: Protocol
Value-shape Protocol for events consumed from a Kafka topic.
Mirrors the field set on the Dormant ConsumedEvent class. Pure
attribute Protocol — no methods. Used by OSS
callers that pattern-match on event payload shape without importing
the Dormant concrete class.
KafkaConsumerProtocol
Bases: Protocol
Protocol for Kafka audit consumers.
Implementations: the Dormant KafkaAuditConsumer class. Methods
cover only the OSS-caller usage axis
(start in background thread, stop cleanly).
KafkaEventBusProtocol
Bases: Protocol
Protocol for the Kafka-backed event bus (Producer + Consumer combo).
Implementations: the Dormant KafkaEventBus class. The OSS-facing
surface is publish/subscribe + lifecycle
methods (start/stop/close/flush).
KafkaProducerProtocol
Bases: Protocol
Protocol for Kafka audit producers (publishes events to Kafka).
Implementations: the Dormant KafkaAuditProducer class. OSS callers
obtain instances via
ProviderRegistry.kafka_eventbus.get("kafka_producer") or the
equivalent factory.
NoOpKafkaEventBus
No-op fallback for the kafka_eventbus registry slot (OSS-safe).
Returned by ProviderRegistry.kafka_eventbus.get() when
baldur_dormant is not installed. publish/subscribe silently no-op
so OSS callers can use the registry result unconditionally; nothing
is ever sent to a broker. Logs at DEBUG to surface accidental wiring
on clean-OSS installs (typical Baldur pattern: NoOp logs are quiet).
Satisfies the "NoOp default registration requirement".
Notification
NotificationAdapter
Bases: ABC
ABC for notification adapters.
Implement this class to send notifications to your preferred channel. Duck-typed adapters are also accepted via register_notification_adapter(), which auto-registers them as virtual subclasses.
Example
class SlackNotificationAdapter(NotificationAdapter): def send(self, payload: NotificationPayload) -> bool: response = requests.post( SLACK_WEBHOOK_URL, json={"text": f"{payload.title}\n{payload.message}"} ) return response.ok
def send_batch(self, payloads: list[NotificationPayload]) -> int:
return sum(1 for p in payloads if self.send(p))
@property
def channel(self) -> NotificationChannel:
return NotificationChannel.SLACK
channel
abstractmethod
property
channel: NotificationChannel
Return the channel this adapter handles.
send
abstractmethod
send(payload: NotificationPayload) -> bool
Send a single notification.
Returns:
| Type | Description |
|---|---|
bool
|
True if sent successfully, False otherwise |
send_batch
abstractmethod
send_batch(payloads: list[NotificationPayload]) -> int
Send multiple notifications.
Returns:
| Type | Description |
|---|---|
int
|
Number of successfully sent notifications |
is_available
is_available() -> bool
Check if this adapter is available (configuration validity).
Non-abstract concrete method — ABC-inheriting adapters inherit
the default return True. Network-based adapters should override
with I/O-free self-diagnosis (e.g., URL format validation).
Duck-typed adapters (virtual subclasses) do NOT inherit this —
callers must use getattr(adapter, 'is_available', lambda: True)().
NotificationChannel
module-attribute
NotificationChannel = MessageChannel
NotificationSeverity
module-attribute
NotificationSeverity = MessageSeverity
Alerting
AlertSeverity
module-attribute
AlertSeverity = MessageSeverity
AlertCategory
Bases: str, Enum
Alert categories for routing.
Alert
dataclass
Alert(
title: str,
description: str,
severity: AlertSeverity = AlertSeverity.WARNING,
category: AlertCategory = AlertCategory.AVAILABILITY,
timestamp: datetime = (lambda: utc_now())(),
source: str = "baldur",
service_name: str | None = None,
domain: str | None = None,
slo_name: str | None = None,
slo_target: float | None = None,
slo_current: float | None = None,
details: dict[str, Any] = dict(),
runbook_url: str | None = None,
alert_key: str | None = None,
)
Bases: SerializableMixin
Alert containing all relevant context.
Captures: - What happened (title, description) - How severe (severity) - What category (category) - Where (source, service_name) - Additional context (details)
key
property
key: str
Generate alert key for deduplication.
to_json
to_json() -> str
Convert to JSON string.
AlertAdapter
Bases: ABC
Abstract interface for alerting.
Implementations can send alerts to: - stdout (StdoutAlertAdapter) - Files (FileAlertAdapter) - Slack/Teams (user implements) - PagerDuty/OpsGenie (user implements) - Email (user implements) - Nowhere (NullAlertAdapter)
send
abstractmethod
send(alert: Alert) -> None
resolve
abstractmethod
resolve(alert_key: str) -> None
Resolve/close an alert.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
alert_key
|
str
|
The key of the alert to resolve |
required |
alert_cb_opened
alert_cb_opened(
service_name: str,
failure_count: int,
threshold: int,
is_manual: bool = False,
) -> None
Convenience method for Circuit Breaker open alert.
alert_cb_closed
alert_cb_closed(
service_name: str, is_manual: bool = False
) -> None
Convenience method for Circuit Breaker close (resolves alert).
alert_dlq_threshold
alert_dlq_threshold(
domain: str, pending_count: int, threshold: int
) -> None
Convenience method for DLQ threshold alert.
alert_slo_violation
alert_slo_violation(
slo_name: str,
target: float,
current: float,
service_name: str | None = None,
) -> None
Convenience method for SLO violation alert.
alert_high_error_rate
alert_high_error_rate(
service_name: str, error_rate: float, threshold: float
) -> None
Convenience method for high error rate alert.
alert_failsafe_activated
alert_failsafe_activated(
component: str,
error_message: str,
fallback_action: str = "PROCEED",
) -> None
CRITICAL: Fail-Safe mode activation alert.
Sent when part of the Baldur system fails and transitions into Fail-Safe mode. This alert requires immediate attention.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
component
|
str
|
Component that failed (e.g., "error_budget", "circuit_breaker") |
required |
error_message
|
str
|
Failure cause message |
required |
fallback_action
|
str
|
Fallback action taken (e.g., "PROCEED", "ALLOW") |
'PROCEED'
|
Note
This alert is designed to prevent "silent failures". When Fail-Safe activates, the system continues operating, but the operations team must be notified immediately to address the root cause.
resolve_failsafe
resolve_failsafe(component: str) -> None
Resolve the alert when Fail-Safe recovers.
alert_failsafe_recovered
alert_failsafe_recovered(
component: str,
downtime_seconds: float,
recovery_reason: str = "System recovered automatically",
) -> None
Recovery notification: sent when normal operation is restored from Fail-Safe mode.
Similar to PagerDuty/OpsGenie "resolved" events, actively notifies that the failure has been cleared.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
component
|
str
|
Component that recovered (e.g., "error_budget", "circuit_breaker") |
required |
downtime_seconds
|
float
|
Failure duration (seconds) |
required |
recovery_reason
|
str
|
Description of the recovery cause |
'System recovered automatically'
|
Note
This alert prevents "silent recoveries". When a failure is cleared it is explicitly announced, so the operations team does not need to keep tracking the failure state.
alert_override_escalation
alert_override_escalation(
override_type: str,
requester: str,
reason: str,
service_name: str | None = None,
escalation_channel: str = "#governance",
escalation_mention: str = "@cto @security",
) -> None
Override escalation alert: sent when deployment is overridden under Error Budget exhaustion.
Netflix CAB (Change Advisory Board) style — when a deployment is forced through while the Error Budget is exhausted, escalates to a senior decision-maker / governance channel.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
override_type
|
str
|
Override type (hotfix, security_patch, business_critical, etc.) |
required |
requester
|
str
|
Override requester |
required |
reason
|
str
|
Override reason |
required |
service_name
|
str | None
|
Target service name |
None
|
escalation_channel
|
str
|
Escalation channel (e.g., #governance) |
'#governance'
|
escalation_mention
|
str
|
Mention target (e.g., @cto @security) |
'@cto @security'
|
Note
This alert provides visibility into "risky actions". Every action that bypasses the Error Budget policy must be tracked.
Audit
AuditAction
Bases: str, Enum
Standard audit action types.
DLQ_FORCE_REDRIVE
class-attribute
instance-attribute
DLQ_FORCE_REDRIVE = 'dlq_force_redrive'
Operator cap-override re-drive of an at-cap (REQUIRES_REVIEW) entry — a privileged, audited action distinct from a normal replay.
API_ERROR
class-attribute
instance-attribute
API_ERROR = 'api_error'
Error raised while handling an API request.
VALIDATION_FAILED
class-attribute
instance-attribute
VALIDATION_FAILED = 'validation_failed'
Input validation failed.
AUTHORIZATION_DENIED
class-attribute
instance-attribute
AUTHORIZATION_DENIED = 'authorization_denied'
Authorization denied (no permission).
FORENSIC_CAPTURE_COMPLETED
class-attribute
instance-attribute
FORENSIC_CAPTURE_COMPLETED = 'forensic_capture_completed'
Forensic context recorded after a captured exception.
AuditEntry
dataclass
AuditEntry(
action: AuditAction | str,
timestamp: datetime = (lambda: utc_now())(),
actor_id: str | None = None,
actor_type: str = "system",
actor_roles: list[str] = list(),
context_type: ContextType = ContextType.UNKNOWN,
target_type: str | None = None,
target_id: str | None = None,
service_name: str | None = None,
domain: str | None = None,
reason: str | None = None,
details: dict[str, Any] = dict(),
success: bool = True,
error_message: str | None = None,
)
Audit log entry containing all relevant context.
Captures: - What happened (action) - Who did it (actor_id, actor_type) - sourced automatically from ActorContext - What was affected (target_type, target_id) - Why (reason) - Additional context (details)
Note
When actor_id and actor_type are not set explicitly, they are pulled from ActorContext automatically. This auto-tracks "who set this and when".
to_dict
to_dict() -> dict[str, Any]
Convert to dictionary for serialization.
to_json
to_json() -> str
Convert to JSON string.
from_dict
classmethod
from_dict(data: dict[str, Any]) -> AuditEntry
Inverse of to_dict() — round-trip safe.
Unknown fields (e.g. integrity, checksum, audit_id added
by ResilientContinuousAuditRecorder) are preserved in
details under their original keys to maintain forensic
completeness.
Forward-compatibility design (intentional, no schema_version field needed):
- New first-class field added later: old
from_dict()running on new data — the unknown key falls intodetails, no exception. Newfrom_dict()running on old data —data.get(key)returnsNone, default applies. Round-trip is preserved at every point in time. - Field promoted from details to first-class: requires a one-time migration script. A schema_version field cannot solve this automatically.
- Field semantic change: cannot be auto-detected without a version field, but also cannot be auto-handled. Always requires coordinated reader/writer updates.
Conclusion: a schema_version meta field would add ceremony
without solving the cases that actually need solving. The
known-set + details-overflow design covers the only
case that benefits from automation, and does so without
ceremony.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
data
|
dict[str, Any]
|
Dict produced by |
required |
Returns:
| Type | Description |
|---|---|
AuditEntry
|
|
AuditLogAdapter
Bases: ABC
Abstract interface for audit logging.
Implementations can store audit logs in: - Files (FileAuditLogAdapter) - stdout (StdoutAuditLogAdapter) - Database (user implements) - External services like Loki, Datadog (user implements) - Nowhere (NullAuditLogAdapter)
log
abstractmethod
log(entry: AuditEntry) -> None
Log an audit entry.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
entry
|
AuditEntry
|
The audit entry to log |
required |
query
abstractmethod
query(
action: AuditAction | str | None = None,
target_type: str | None = None,
target_id: str | None = None,
start_time: datetime | None = None,
end_time: datetime | None = None,
limit: int = 100,
) -> list[AuditEntry]
Query audit logs (optional - may not be supported by all adapters).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
action
|
AuditAction | str | None
|
Filter by action type |
None
|
target_type
|
str | None
|
Filter by target type |
None
|
target_id
|
str | None
|
Filter by target ID |
None
|
start_time
|
datetime | None
|
Filter from this time |
None
|
end_time
|
datetime | None
|
Filter until this time |
None
|
limit
|
int
|
Maximum entries to return |
100
|
Returns:
| Type | Description |
|---|---|
list[AuditEntry]
|
List of matching audit entries |
log_cb_open
log_cb_open(
service_name: str,
reason: str,
actor_id: str | None = None,
is_manual: bool = True,
) -> None
Convenience method for Circuit Breaker open.
log_cb_close
log_cb_close(
service_name: str,
reason: str,
actor_id: str | None = None,
is_manual: bool = True,
trigger_replay: bool = False,
) -> None
Convenience method for Circuit Breaker close.
log_dlq_store
log_dlq_store(
dlq_id: int,
domain: str,
failure_type: str,
error_message: str | None = None,
) -> None
Convenience method for DLQ storage.
log_dlq_replay
log_dlq_replay(
dlq_id: int,
domain: str,
success: bool,
actor_id: str | None = None,
error_message: str | None = None,
) -> None
Convenience method for DLQ replay.
log_retry
log_retry(
domain: str,
func_name: str,
attempt: int,
max_attempts: int,
success: bool,
error_message: str | None = None,
) -> None
Convenience method for retry attempts.
log_governance_blocked
log_governance_blocked(
block_reason: str,
operation_name: str,
details: dict[str, Any] | None = None,
service_name: str | None = None,
domain: str | None = None,
) -> None
Record an entry when automation was blocked by governance.
This method answers the "why didn't the operation run at this time?" question concretely.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
block_reason
|
str
|
Block reason (kill_switch, emergency_mode, error_budget) |
required |
operation_name
|
str
|
Name of the blocked operation |
required |
details
|
dict[str, Any] | None
|
Additional details (emergency_level, budget_percent, etc.) |
None
|
service_name
|
str | None
|
Related service name |
None
|
domain
|
str | None
|
Domain (payment, point, etc.) |
None
|
NoOpKafkaAuditAdapter
Bases: AuditLogAdapter
Kafka-shaped audit NoOp returned by audit_kafka_adapter slot.
When baldur_dormant is not installed, the
ProviderRegistry.audit_kafka_adapter slot returns this adapter so
OSS callers can request the Kafka audit path unconditionally without
needing to check for the underlying broker. log() silently drops
the entry (logging at DEBUG so operational tracing remains possible);
query() returns an empty list — there is no backing topic.
NoOpWormAdapter
Bases: AuditLogAdapter
WORM-shaped audit NoOp returned by audit_worm_adapter slot.
The real WORM adapters (S3 Object Lock, Loki, sidecar) live in the
Dormant package. When the Dormant package is absent,
this NoOp answers the registry slot so callers don't need to guard
against missing providers. log() drops entries silently; the
compliance/non-repudiation property the real WORM adapters provide
is not met — operators relying on it must install
baldur-pro[dormant,aws] explicitly.
verify
verify() -> bool
Verify the WORM store integrity (NoOp: trivially True).
Real implementations check Object Lock retention, hash-chain continuity, etc. The NoOp returns True because there is nothing stored to violate.
Traffic routing
RoutingChange
dataclass
RoutingChange(
success: bool,
from_region: str,
to_region: str,
details: dict[str, Any] = dict(),
rollback_info: dict[str, Any] | None = None,
)
Traffic-routing change result.
Captures the result of a switch_primary() call and carries the information needed by rollback() to restore the previous state.
Attributes:
| Name | Type | Description |
|---|---|---|
success |
bool
|
Whether the switch succeeded |
from_region |
str
|
Previous Primary region |
to_region |
str
|
New Primary region |
details |
dict[str, Any]
|
Switch details |
rollback_info |
dict[str, Any] | None
|
Previous-state info for rollback |
success
instance-attribute
success: bool
Whether the switch succeeded.
from_region
instance-attribute
from_region: str
Previous Primary region.
to_region
instance-attribute
to_region: str
New Primary region.
details
class-attribute
instance-attribute
details: dict[str, Any] = field(default_factory=dict)
Switch details.
rollback_info
class-attribute
instance-attribute
rollback_info: dict[str, Any] | None = None
Previous-state info for rollback.
TrafficRoutingAdapter
Bases: ABC
Traffic routing adapter interface.
Abstract interface for shifting traffic at the DNS/LB layer during a regional outage. The baldur package does not include any external cloud SDK, so production adapters are implemented in the host app and registered with the ProviderRegistry.
Default implementation (LoggingTrafficRoutingAdapter): Operates at the application level only, without DNS/LB changes. Publishes a REGION_PRIMARY_CHANGED event via RedisEventBus so that ServiceLocalityRouter can refresh its routing table.
Example (AWS Route53): class Route53TrafficRouter(TrafficRoutingAdapter): def init(self, hosted_zone_id: str): self._client = boto3.client('route53') self._zone_id = hosted_zone_id
def switch_primary(self, from_region, to_region) -> RoutingChange:
self._client.change_resource_record_sets(
HostedZoneId=self._zone_id,
ChangeBatch={...}
)
return RoutingChange(
success=True,
from_region=from_region,
to_region=to_region,
details={"dns_updated": True},
)
def rollback(self, routing_change) -> bool:
return self.switch_primary(
routing_change.to_region,
routing_change.from_region,
).success
def get_current_routing(self) -> dict:
return {"hosted_zone": self._zone_id}
Example (K8s Ingress): class K8sIngressTrafficRouter(TrafficRoutingAdapter): def switch_primary(self, from_region, to_region) -> RoutingChange: # kubectl patch ingress ... ...
Registration
ProviderRegistry.register_traffic_routing("route53", Route53TrafficRouter)
switch_primary
abstractmethod
switch_primary(
from_region: str, to_region: str
) -> RoutingChange
Switch the Primary region.
Shifts traffic to to_region at the DNS/LB layer.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
from_region
|
str
|
Current Primary region |
required |
to_region
|
str
|
New Primary region |
required |
Returns:
| Type | Description |
|---|---|
RoutingChange
|
RoutingChange result (includes rollback info) |
rollback
abstractmethod
rollback(routing_change: RoutingChange) -> bool
Roll back a routing change.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
routing_change
|
RoutingChange
|
The return value of switch_primary() |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if the rollback succeeded |
get_current_routing
abstractmethod
get_current_routing() -> dict[str, Any]
Query the current routing state.