baldur.interfaces — Statistics & Event Journal
The statistics repository surface (summary DTOs + audit-trail records) and the event-journal interface that backs replay and forensics.
Statistics DTOs
StatusCounts
dataclass
StatusCounts(
total: int = 0,
pending: int = 0,
resolved: int = 0,
failed: int = 0,
archived: int = 0,
reviewing: int = 0,
replayed: int = 0,
requires_review: int = 0,
rejected: int = 0,
expired: int = 0,
)
Status-wise count of DLQ entries.
DomainDistribution
dataclass
DomainDistribution(
domain: str, count: int, percentage: float
)
Domain-wise distribution of DLQ entries.
FailureTypeDistribution
dataclass
FailureTypeDistribution(
failure_type: str, count: int, percentage: float
)
Failure type distribution.
RecentActivity
dataclass
RecentActivity(
new_in_24h: int = 0,
resolved_in_24h: int = 0,
new_in_7d: int = 0,
resolved_in_7d: int = 0,
trend: str = "stable",
)
Recent activity statistics.
CleanupStats
dataclass
CleanupStats(
total: int = 0,
by_status: dict[str, int] = dict(),
resolved_older_than_30_days: int = 0,
archived_older_than_90_days: int = 0,
)
Statistics for DLQ cleanup operations.
PaginatedResult
dataclass
PaginatedResult(
items: list[Any] = list(),
total: int = 0,
page: int = 1,
page_size: int = 20,
has_next: bool = False,
has_prev: bool = False,
)
Paginated query result.
total_pages
property
total_pages: int
Calculate total number of pages.
CircuitBreakerSummary
dataclass
CircuitBreakerSummary(
total: int = 0,
closed: int = 0,
open: int = 0,
half_open: int = 0,
)
Summary of all circuit breakers.
CircuitBreakerInfo
dataclass
CircuitBreakerInfo(
service_name: str,
state: str,
failure_count: int = 0,
success_count: int = 0,
last_failure_time: datetime | None = None,
last_state_change: datetime | None = None,
)
Information about a single circuit breaker.
Audit-trail DTOs
AuditTrailEntry
dataclass
AuditTrailEntry(
timestamp: datetime,
action: str,
actor_id: str | None = None,
status: str | None = None,
details: str | None = None,
hash_chain: str | None = None,
previous_hash: str | None = None,
)
Single audit trail entry for an entity.
EntityAuditTrail
dataclass
EntityAuditTrail(
entity_id: str,
entity_type: str,
domain: str,
created_at: datetime | None = None,
resolved_at: datetime | None = None,
current_status: str = "unknown",
entries: list[AuditTrailEntry] = list(),
)
Complete audit trail for a DLQ entity.
Provides end-to-end traceability from creation to resolution.
total_entries
property
total_entries: int
Total number of audit entries.
is_chain_valid
property
is_chain_valid: bool
Verify hash chain integrity.
Returns True if all entries have valid hash chain.
Interfaces
StatisticsRepositoryInterface
Bases: ABC
Statistics/Dashboard Repository Interface.
This interface defines operations for dashboards and analytics. Unlike runtime repositories (optimized for speed), this interface is designed for complex aggregate queries.
Implementations: - DjangoStatisticsAdapter: Uses Django ORM - SQLAlchemyStatisticsAdapter: Uses SQLAlchemy - NullStatisticsRepository: Returns empty results (default)
Usage
from baldur.factory import ProviderRegistry
stats_repo = ProviderRegistry.get_statistics_repo() counts = stats_repo.get_status_counts()
get_status_counts
abstractmethod
get_status_counts() -> StatusCounts
Get count of DLQ entries by status.
Returns:
| Type | Description |
|---|---|
StatusCounts
|
StatusCounts with counts for each status |
get_domain_distribution
abstractmethod
get_domain_distribution(
limit: int = 10,
) -> list[DomainDistribution]
Get distribution of DLQ entries by domain.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
limit
|
int
|
Maximum number of domains to return (top N) |
10
|
Returns:
| Type | Description |
|---|---|
list[DomainDistribution]
|
List of DomainDistribution sorted by count descending |
get_failure_type_distribution
abstractmethod
get_failure_type_distribution(
limit: int = 10,
) -> list[FailureTypeDistribution]
Get distribution of DLQ entries by failure type.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
limit
|
int
|
Maximum number of failure types to return (top N) |
10
|
Returns:
| Type | Description |
|---|---|
list[FailureTypeDistribution]
|
List of FailureTypeDistribution sorted by count descending |
get_recent_activity
abstractmethod
get_recent_activity(
hours: int = 24, days: int = 7
) -> RecentActivity
Get recent activity statistics.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
hours
|
int
|
Hours to look back for hourly stats |
24
|
days
|
int
|
Days to look back for daily stats |
7
|
Returns:
| Type | Description |
|---|---|
RecentActivity
|
RecentActivity with new/resolved counts and trend |
get_resolution_rate
abstractmethod
get_resolution_rate(days: int = 30) -> float
Calculate resolution success rate.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
days
|
int
|
Number of days to look back |
30
|
Returns:
| Type | Description |
|---|---|
float
|
Resolution rate as a float (0.0 to 1.0) |
get_avg_retry_count
abstractmethod
get_avg_retry_count() -> float
Get average retry count across all DLQ entries.
Returns:
| Type | Description |
|---|---|
float
|
Average retry count as a float |
list_entries
abstractmethod
list_entries(
page: int = 1,
page_size: int = 20,
status: str | None = None,
domain: str | None = None,
failure_type: str | None = None,
order_by: str = "-created_at",
) -> PaginatedResult
List DLQ entries with pagination and filtering.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
page
|
int
|
Page number (1-indexed) |
1
|
page_size
|
int
|
Number of items per page |
20
|
status
|
str | None
|
Filter by status (optional) |
None
|
domain
|
str | None
|
Filter by domain (optional) |
None
|
failure_type
|
str | None
|
Filter by failure type (optional) |
None
|
order_by
|
str
|
Sort order (prefix with - for descending) |
'-created_at'
|
Returns:
| Type | Description |
|---|---|
PaginatedResult
|
PaginatedResult containing DLQ entries |
get_entry_detail
abstractmethod
get_entry_detail(entry_id: str) -> dict[str, Any] | None
Get detailed information about a specific DLQ entry.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
entry_id
|
str
|
Unique identifier of the entry |
required |
Returns:
| Type | Description |
|---|---|
dict[str, Any] | None
|
Dict with entry details or None if not found |
get_sla_breaches
abstractmethod
get_sla_breaches(
sla_threshold_hours: int = 4,
statuses: list[str] | None = None,
) -> dict[str, int]
Get count of SLA breaches by domain.
Finds DLQ entries that have exceeded the SLA threshold for resolution.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
sla_threshold_hours
|
int
|
SLA threshold in hours (default: 4) |
4
|
statuses
|
list[str] | None
|
List of statuses to check (default: pending, reviewing, requires_review) |
None
|
Returns:
| Type | Description |
|---|---|
dict[str, int]
|
Dictionary mapping domain to breach count |
get_cleanup_stats
abstractmethod
get_cleanup_stats() -> CleanupStats
Get statistics for cleanup operations.
Returns:
| Type | Description |
|---|---|
CleanupStats
|
CleanupStats with counts of entries eligible for cleanup |
archive_old_entries
abstractmethod
archive_old_entries(older_than_days: int = 30) -> int
Archive old resolved entries.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
older_than_days
|
int
|
Archive entries resolved more than N days ago |
30
|
Returns:
| Type | Description |
|---|---|
int
|
Number of entries archived |
purge_archived
abstractmethod
purge_archived(
ids: list[str] | None = None,
older_than_days: int | None = None,
) -> int
Permanently delete archived entries.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
ids
|
list[str] | None
|
Specific entry IDs to purge (optional) |
None
|
older_than_days
|
int | None
|
Purge archived entries older than N days (optional) |
None
|
Returns:
| Type | Description |
|---|---|
int
|
Number of entries purged |
get_circuit_breaker_summary
abstractmethod
get_circuit_breaker_summary() -> CircuitBreakerSummary
Get summary of all circuit breakers.
Returns:
| Type | Description |
|---|---|
CircuitBreakerSummary
|
CircuitBreakerSummary with counts by state |
list_circuit_breakers
abstractmethod
list_circuit_breakers() -> list[CircuitBreakerInfo]
List all circuit breakers with their current state.
Returns:
| Type | Description |
|---|---|
list[CircuitBreakerInfo]
|
List of CircuitBreakerInfo for all registered circuit breakers |
persist_entry
abstractmethod
persist_entry(entry_data: dict[str, Any]) -> str | None
Persist a DLQ entry to the statistics store.
Called by runtime layer to sync data to ORM for statistics. Can be called asynchronously.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
entry_data
|
dict[str, Any]
|
Entry data from runtime repository |
required |
Returns:
| Type | Description |
|---|---|
str | None
|
Entry ID if persisted, None otherwise |
sync_from_runtime
abstractmethod
sync_from_runtime(entries: list[dict[str, Any]]) -> int
Bulk sync entries from runtime repository.
Used for periodic synchronization.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
entries
|
list[dict[str, Any]]
|
List of entry data from runtime repository |
required |
Returns:
| Type | Description |
|---|---|
int
|
Number of entries synced |
get_audit_trail_by_entity
abstractmethod
get_audit_trail_by_entity(
entity_id: str, entity_type: str = "dlq_entry"
) -> EntityAuditTrail
Get complete audit trail for a specific entity.
This method provides end-to-end traceability showing all actions from creation to resolution with hash chain verification.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
entity_id
|
str
|
Unique identifier of the entity (e.g., DLQ ID) |
required |
entity_type
|
str
|
Type of entity (default: "dlq_entry") |
'dlq_entry'
|
Returns:
| Type | Description |
|---|---|
EntityAuditTrail
|
EntityAuditTrail with all audit entries and chain verification |
Example
trail = stats_repo.get_audit_trail_by_entity("dlq-123") print(f"Total actions: {trail.total_entries}") print(f"Chain valid: {trail.is_chain_valid}") for entry in trail.entries: print(f"{entry.timestamp}: {entry.action} by {entry.actor_id}")
link_audit_entry
abstractmethod
link_audit_entry(
entity_id: str,
entity_type: str,
action: str,
actor_id: str | None = None,
status: str | None = None,
details: str | None = None,
audit_record_hash: str | None = None,
) -> bool
Link an audit record to an entity.
Called when audit events are recorded to maintain the relationship between DLQ entries and their audit trail.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
entity_id
|
str
|
Entity identifier |
required |
entity_type
|
str
|
Entity type |
required |
action
|
str
|
Action performed (store, replay, resolve, etc.) |
required |
actor_id
|
str | None
|
Who performed the action |
None
|
status
|
str | None
|
New status after the action |
None
|
details
|
str | None
|
Additional details |
None
|
audit_record_hash
|
str | None
|
Hash from the audit system |
None
|
Returns:
| Type | Description |
|---|---|
bool
|
True if linked successfully |
should_persist_async
should_persist_async() -> bool
Determine if persistence should be done asynchronously.
Override in implementations to enable async persistence. Default returns False for synchronous persistence.
Returns:
| Type | Description |
|---|---|
bool
|
True if async persistence is preferred |
get_async_persist_task_name
get_async_persist_task_name() -> str | None
Get the Celery task name for async persistence.
Returns:
| Type | Description |
|---|---|
str | None
|
Task name string or None if sync persistence |
EventJournalRepository
Bases: ABC
Baldur event journal storage interface.
Append-only storage. Recorded entries cannot be modified or deleted. Sequence numbers increase monotonically to guarantee order. Gaps may exist (e.g., 1, 2, 4, 5); consumers must not assume contiguity.
Implementations: - InMemoryEventJournalRepository: for tests and single process - RedisEventJournalRepository: for multi-worker environments
append
abstractmethod
append(entry: JournalEntry) -> int
Append an event to the journal.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
entry
|
JournalEntry
|
Journal entry (the sequence field is assigned by the implementation) |
required |
Returns:
| Type | Description |
|---|---|
int
|
The assigned sequence number |
query
abstractmethod
query(
query_filter: JournalQueryFilter,
) -> JournalQueryResult
Return entries matching the filter, sorted by sequence (ascending).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
query_filter
|
JournalQueryFilter
|
Query criteria |
required |
Returns:
| Type | Description |
|---|---|
JournalQueryResult
|
JournalQueryResult — entries (sequence ascending), truncated flag, total_count |
get_sequence_range
abstractmethod
get_sequence_range(
start_sequence: int, end_sequence: int
) -> list[JournalEntry]
Look up entries by sequence range.
Used by simulations to replay a precise range.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
start_sequence
|
int
|
Start sequence (inclusive) |
required |
end_sequence
|
int
|
End sequence (exclusive) |
required |
Returns:
| Type | Description |
|---|---|
list[JournalEntry]
|
List of entries sorted by sequence ascending |
get_latest_sequence
abstractmethod
get_latest_sequence() -> int
Return the current latest sequence number. 0 when empty.
count
abstractmethod
count(query_filter: JournalQueryFilter) -> int
Return the number of entries matching the filter.
JournalEntry
dataclass
JournalEntry(
sequence: int,
event_type: str,
source: str,
timestamp: datetime,
service_name: str,
context: dict[str, Any] = dict(),
region: str = "",
tier_id: str = "",
)
Event journal entry. Immutable (frozen) data.
JournalQueryFilter
dataclass
JournalQueryFilter(
event_types: list[str] | None = None,
service_name: str | None = None,
start_time: datetime | None = None,
end_time: datetime | None = None,
region: str | None = None,
limit: int = 1000,
context_filters: dict[str, str] | None = None,
)
Journal query filter.
JournalQueryResult
dataclass
JournalQueryResult(
entries: list[JournalEntry],
truncated: bool,
total_count: int | None = None,
)
Journal query result. Includes whether the result was truncated.