Skip to content

baldur_pro.services.dlq — Dead-Letter Queue

Durable capture and replay of failed operations: the DLQService, the store_to_dlq entry point, and the DLQ domain models.

🔒 PRO Feature — requires a baldur-pro license

These symbols ship in the baldur-pro distribution. PRO modules import normally — there is no ImportError. PRO features activate only when baldur.init() runs with a valid BALDUR_LICENSE_KEY; without it the system runs with OSS defaults and register_pro_services() logs entitlement.pro_registration_skipped.

dlq

Dead Letter Queue (DLQ) Service Package.

Provides centralized DLQ operations for the baldur layer. Handles storage, retrieval, and management of failed operations.

Features: - Store failed operations with full forensic context - Query and filter DLQ entries (via Repository pattern) - Batch replay operations

Admin/Dashboard operations (cleanup, archive, purge, list, entry management)

should be implemented in the host application using Django ORM directly. This package follows domain-free principles.

Status: Public

DLQConfig dataclass

DLQConfig(
    enabled: bool = True,
    retention_days: int = 30,
    max_replay_attempts: int = 2,
    retry_delay: int = 60,
    expiry_hours: int = 72,
    batch_size: int = 10,
)

DLQ runtime configuration.

Loaded from RuntimeConfigManager when available (PRO tier with hot reload), falling back to the static DLQSettings (Pydantic) defaults on OSS-only installs.

from_settings classmethod

from_settings() -> DLQConfig

Load configuration from RuntimeConfigManager (preferred) or DLQSettings.

DLQEntryResult dataclass

DLQEntryResult(
    success: bool,
    dlq_id: str | None = None,
    error: str | None = None,
    fallback_path: str | None = None,
)

Outcome of a single DLQ store/push operation.

fallback_path class-attribute instance-attribute

fallback_path: str | None = None

Local fallback path when the DB write failed but data was preserved.

DLQBatchReplayStats dataclass

DLQBatchReplayStats(
    processed: int = 0,
    success: int = 0,
    failed: int = 0,
    skipped: int = 0,
    errors: list[str] = list(),
)

Result of a batch replay operation (statistics).

Note: This is for batch DLQ replay statistics, not to be confused with ReplayResult in replay_service.py which is for single replay outcomes.

DLQThrottleBatchReplayResult dataclass

DLQThrottleBatchReplayResult(
    total: int = 0,
    succeeded: int = 0,
    failed: int = 0,
    skipped: int = 0,
    early_stop_reason: str | None = None,
)

Result of a batch throttle-aware DLQ replay operation.

DLQThrottleReplayResult dataclass

DLQThrottleReplayResult(
    success: bool,
    entry_id: str | None = None,
    error: str | None = None,
    retry_after: float | None = None,
)

Result of a single throttle-aware DLQ replay operation.

DLQServiceBase

DLQServiceBase(
    config: DLQConfig | None = None,
    repository: FailedOperationRepository | None = None,
)

Base class for DLQ Service.

Provides initialization and common utilities.

Initialize the DLQ service.

Parameters:

Name Type Description Default
config DLQConfig | None

Optional configuration, loads from settings if None

None
repository FailedOperationRepository | None

Optional repository for DI, uses Django adapter if None

None

repository property

repository: FailedOperationRepository

Get the repository using ProviderRegistry with fallback policy.

is_enabled property

is_enabled: bool

Check if DLQ is enabled.

EntryOperationsMixin

Mixin providing DLQ entry operations using Repository pattern.

retry_entry

retry_entry(pk: str) -> dict[str, Any]

Retry a single DLQ entry by re-executing it through its replay handler.

Applies the same per-entry pipeline as the batch replay() path to the single operator-selected entry: cap-gate, atomic acquire, handler execution, then a cap-aware terminal transition. success means the replay succeeded — not merely that the counter advanced.

Parameters:

Name Type Description Default
pk str

Entry primary key

required

Returns:

Type Description
dict[str, Any]

Dict with operation details: - success: bool (True only if the replay handler succeeded) - id: str - retry_count: int (post-attempt count) - previous_retry_count: int - status: str (resulting entry status) - message: str

Raises:

Type Description
DLQEntryNotFoundError

If entry not found (-> HTTP 404)

DLQStateConflictError

If entry is resolved/archived, has exhausted its replay attempts (at cap), or is not in a replayable state (-> HTTP 409)

DLQReplayError

If the replay raised an unexpected exception (-> HTTP 500)

force_redrive_entry

force_redrive_entry(
    pk: str,
    *,
    actor_id: str | None = None,
    reason: str = "",
    ticket_url: str | None = None
) -> dict[str, Any]

Force-redrive an at-cap DLQ entry past the cap gate (operator override).

A deliberate, ADMIN-gated escape hatch: after diagnosing and fixing a root cause, re-drive an entry that exhausted its cap because of that now-fixed cause. Mirrors retry_entry() but acquires with force=True (bypassing the cap gate) and grants a fresh retry budget. The poison-pill convergence guarantee is preserved: a still-broken entry re-converges to REQUIRES_REVIEW within max_replay_attempts further automatic attempts.

The normal retry_entry() hard block on at-cap entries is untouched — force is purely additive, and only this ADMIN-gated, audited path grants the fresh budget.

Parameters:

Name Type Description Default
pk str

Entry primary key

required
actor_id str | None

Acting operator (recorded in the force-redrive audit)

None
reason str

Operator justification (recorded in the audit)

''
ticket_url str | None

Optional change/incident ticket reference (recorded)

None

Returns:

Type Description
dict[str, Any]

Dict mirroring retry_entry(): - success: bool (True only if the replay handler succeeded) - id: str - retry_count: int (post-acquire count — 1 under the fresh budget) - previous_retry_count: int (pre-acquire count) - status: str (resulting entry status) - message: str

Raises:

Type Description
DLQEntryNotFoundError

If entry not found (-> HTTP 404)

DLQStateConflictError

If entry is resolved/archived or not in a force-redrivable state (-> HTTP 409)

DLQReplayError

If the replay raised an unexpected exception (-> HTTP 500)

resolve_entry

resolve_entry(
    pk: str,
    notes: str = "",
    resolution_type: str = "manual",
    status: str = "resolved",
) -> dict[str, Any]

Resolve a DLQ entry.

Parameters:

Name Type Description Default
pk str

Entry primary key

required
notes str

Resolution notes (optional)

''
resolution_type str

How the entry was resolved (default: "manual")

'manual'
status str

Target status (default: "resolved")

'resolved'

Returns:

Type Description
dict[str, Any]

Dict with operation details: - success: bool - id: str - previous_status: str - current_status: str - resolved_at: str (ISO format) - notes: str

Raises:

Type Description
DLQEntryNotFoundError

If entry not found (-> HTTP 404)

DLQStateConflictError

If entry is already resolved/archived (-> HTTP 409)

DLQError

If the resolve operation fails (-> HTTP 400)

resolve_entries_batch

resolve_entries_batch(
    pks: list[str],
    resolution_type: str = "manual",
    status: str = "resolved",
    notes: str = "",
    chunk_size: int | None = None,
) -> dict[str, Any]

Resolve multiple DLQ entries in chunks.

Parameters:

Name Type Description Default
pks list[str]

Entry primary keys to resolve

required
resolution_type str

How entries were resolved

'manual'
status str

Target status

'resolved'
notes str

Resolution notes

''
chunk_size int | None

Chunk size (default: from DLQSettings)

None

Returns:

Type Description
dict[str, Any]

Dict with batch operation summary

get_entry

get_entry(pk: str) -> dict[str, Any] | None

Get detailed info for a single DLQ entry.

Parameters:

Name Type Description Default
pk str

Entry primary key

required

Returns:

Type Description
dict[str, Any] | None

Dictionary with entry details or None if not found

ListOperationsMixin

Mixin providing DLQ list operations using Repository pattern.

list_entries

list_entries(
    filters: dict[str, Any] | None = None,
    page: int = 1,
    page_size: int = 20,
) -> dict[str, Any]

Get paginated list of DLQ entries.

Delegates pagination to the repository-native find/count primitive: storage-layer offset/limit instead of load-everything-then-slice, and a no-status filter spans ALL statuses (escalated/terminal entries are visible by default), not a hardcoded subset.

Parameters:

Name Type Description Default
filters dict[str, Any] | None

Dictionary with filter conditions - status: Filter by status - domain: Filter by domain - failure_type: Filter by failure type

None
page int

Page number (default 1, clamped to >= 1)

1
page_size int

Items per page (default 20, max 100)

20

Returns:

Type Description
dict[str, Any]

Dict with entries and pagination info: - results: List of entry dicts - page: Current page - page_size: Items per page - total_pages: Total number of pages - total_count: Total number of items - has_next: Whether there's a next page - has_previous: Whether there's a previous page

MaintenanceOperationsMixin

DLQ maintenance: expire, archive, purge.

archive_old_entries

archive_old_entries(older_than_days: int = 30) -> int

Archive old RESOLVED entries. Called by CleanupService.

cleanup_old_entries

cleanup_old_entries(days_old: int = 30) -> dict

Compound: expire + archive. Called by cleanup_resolved_dlq_entries task.

purge_archived

purge_archived(
    ids: list[str] | None = None,
    older_than_days: int | None = None,
) -> int

Permanently delete old ARCHIVED entries. Called by CleanupService.

count_archived_older_than

count_archived_older_than(older_than_days: int) -> int

Count ARCHIVED entries older than N days. Called by CleanupService dry_run.

get_cleanup_stats

get_cleanup_stats() -> CleanupStats

Cleanup statistics as a CleanupStats value object.

Bridges the repository's dict-shaped cleanup stats to the canonical CleanupStats model (its can_archive/can_purge derive from the age-bucketed counts), matching the StatisticsProvider adapters that already return CleanupStats. Read by the admin handler dlq_cleanup_stats via attribute access.

expire_entries

expire_entries() -> int

Mark entries past expires_at as EXPIRED. Drains in batches.

QueryOperationsMixin

Mixin providing DLQ query operations.

get_pending_entries

get_pending_entries(
    domain: str | None = None,
    failure_type: str | None = None,
    limit: int = 100,
) -> list[FailedOperationData]

Get pending DLQ entries.

Parameters:

Name Type Description Default
domain str | None

Filter by domain (optional)

None
failure_type str | None

Filter by failure type (optional)

None
limit int

Maximum number of entries to return

100

Returns:

Type Description
list[FailedOperationData]

List of pending FailedOperationData entries

get_replayable_entries

get_replayable_entries(
    domain: str | None = None,
    failure_type: str | None = None,
    limit: int = 100,
) -> list[FailedOperationData]

Get entries that can be replayed.

Entries are replayable if: - Status is PENDING - retry_count < max_retries

Parameters:

Name Type Description Default
domain str | None

Filter by domain (optional)

None
failure_type str | None

Filter by failure type (optional)

None
limit int

Maximum number of entries to return

100

Returns:

Type Description
list[FailedOperationData]

List of replayable FailedOperationData entries

get_sla_breached_entries

get_sla_breached_entries() -> list[FailedOperationData]

Get entries that have breached their SLA.

SLA thresholds are loaded from configuration (domain-free). Uses SLAConfig.get_all_thresholds() to support any configured domain.

Returns:

Type Description
list[FailedOperationData]

List of SLA-breached FailedOperationData entries

get_expired_entries

get_expired_entries() -> list[FailedOperationData]

Get entries that have passed their retention period.

Returns:

Type Description
list[FailedOperationData]

List of expired FailedOperationData entries

get_entry_by_id

get_entry_by_id(dlq_id: str) -> FailedOperationData | None

Get a single DLQ entry by ID.

Parameters:

Name Type Description Default
dlq_id str

The DLQ entry ID

required

Returns:

Type Description
FailedOperationData | None

FailedOperationData or None

get_stats

get_stats() -> dict[str, Any]

Get DLQ statistics.

Returns:

Type Description
dict[str, Any]

Dictionary with DLQ statistics

get_facet_counts

get_facet_counts(
    *, status: str | None = None, domain: str | None = None
) -> dict[str, dict[str, int]]

Get faceted status×domain DLQ counts for the admin-console filter.

Thin pass-through to the repository. The by_status map is scoped by domain and by_domain is scoped by status (standard faceted-search semantics); zero-count buckets are dropped. See FailedOperationRepository.get_facet_counts for the full contract.

Parameters:

Name Type Description Default
status str | None

Filter by status (scopes the by_domain map).

None
domain str | None

Filter by domain (scopes the by_status map).

None

Returns:

Type Description
dict[str, dict[str, int]]

Dict with by_status and by_domain count maps.

ReplayOperationsMixin

Mixin providing DLQ replay operations.

replay

replay(
    domain: str | None = None,
    batch_size: int = 50,
    request: Any = None,
) -> DLQBatchReplayStats

Execute batch replay of pending DLQ entries.

Hybrid logic: - request present -> buffer into RequestAuditBuffer (AuditMiddleware flushes in batch) - request absent -> call the adapter directly (async contexts such as Celery)

Parameters:

Name Type Description Default
domain str | None

Filter by domain (optional)

None
batch_size int

Maximum number of entries to process (default 50)

50
request Any

Django HttpRequest object (buffered when present)

None

Returns:

Type Description
DLQBatchReplayStats

DLQBatchReplayStats with operation statistics

replay_throttle_aware

replay_throttle_aware(
    entry_id: str, throttle: AdaptiveThrottle
) -> DLQThrottleReplayResult

Throttle-aware safe single replay.

Validation order: 1. expires_at TTL expiry check (FailedOperationData.expires_at) 2. can_retry retry-limit check (retry_count < max_retries) 3. Throttle permit acquisition (store_rejection=False prevents DLQ re-storage) 4. _execute_replay() pipeline (can_replay -> replay)

Parameters:

Name Type Description Default
entry_id str

DLQ entry ID to reprocess

required
throttle AdaptiveThrottle

AdaptiveThrottle instance

required

Returns:

Type Description
DLQThrottleReplayResult

DLQThrottleReplayResult

replay_all_throttle_aware

replay_all_throttle_aware(
    throttle: AdaptiveThrottle,
    domain: str | None = None,
    batch_size: int = 10,
    max_entries: int = 100,
) -> DLQThrottleBatchReplayResult

Throttle-aware batch replay.

get_replayable_entries() fetches only entries with retry_count < max_retries. Checks Throttle health per batch and stops on Emergency.

Parameters:

Name Type Description Default
throttle AdaptiveThrottle

AdaptiveThrottle instance

required
domain str | None

Domain filter (optional)

None
batch_size int

Batch size for Throttle health checks

10
max_entries int

Maximum number of entries to process

100

Returns:

Type Description
DLQThrottleBatchReplayResult

DLQThrottleBatchReplayResult

StoreOperationsMixin

Mixin providing DLQ store operations.

store_failure

store_failure(
    domain: str,
    failure_type: str,
    entity_type: str | None = None,
    entity_id: str | None = None,
    user_id: int | None = None,
    error_code: str = "",
    error_message: str = "",
    snapshot_data: dict[str, Any] | None = None,
    request_data: dict[str, Any] | None = None,
    response_data: dict[str, Any] | None = None,
    metadata: dict[str, Any] | None = None,
    next_action_hint: str = "",
    recommended_action: str = "",
    request: Any = None,
    mode: Literal["sync", "async"] | None = None,
) -> DLQEntryResult

Store a failed operation in the DLQ.

Hybrid logic: - request present -> buffer into RequestAuditBuffer (AuditMiddleware flushes in batch) - request absent -> call the adapter directly (async contexts such as Celery)

Parameters:

Name Type Description Default
domain str

Business domain (payment, point, inventory, webhook, notification)

required
failure_type str

Specific failure type (e.g., PG_TIMEOUT, AMOUNT_MISMATCH)

required
entity_type str | None

Type of related entity (e.g., "order", "payment", "product")

None
entity_id str | None

ID of related entity

None
user_id int | None

Related User ID

None
error_code str

Error code from external system

''
error_message str

Human-readable error message

''
snapshot_data dict[str, Any] | None

State snapshot for recovery

None
request_data dict[str, Any] | None

Original request payload

None
response_data dict[str, Any] | None

External system response

None
metadata dict[str, Any] | None

Additional debug context

None
next_action_hint str

Guidance for operators

''
recommended_action str

Suggested action (replay, manual_check, etc.)

''
request Any

Django HttpRequest object (buffered when present)

None
mode Literal['sync', 'async'] | None

Dispatch mode: - "sync" — execute on calling thread, return real dlq_id. - "async" — enqueue into the outbox, return DLQEntryResult(success=True, dlq_id=None). - None (default) — resolve against BALDUR_DLQ_OUTBOX_ENABLED. - "async" + request != None raises ValueError (HttpRequest cannot be safely thread-shared — programmer error, fail-fast). - None + request != None silently coerces to "sync" (env-default safe path).

None

Returns:

Type Description
DLQEntryResult

DLQEntryResult with creation status

create_test_entry

create_test_entry(
    healing_domain: str | None = None,
    failure_type: str | None = None,
    user_id: int | None = None,
    entity_type: str | None = "test",
    entity_id: str | None = "",
    error_message: str = "Test failure for load testing",
    snapshot_data: dict[str, Any] | None = None,
    request_data: dict[str, Any] | None = None,
    response_data: dict[str, Any] | None = None,
    metadata: dict[str, Any] | None = None,
    created_by: str = "",
) -> dict[str, Any]

Create a synthetic DLQ entry for debugging / load testing.

Thin wrapper over store_failure(mode="sync") so the result is a real, retrievable DLQ record on the same store / audit / metrics path as a production failure. Returns a JSON-safe dict carrying the real dlq_id — the admin POST /dlq/test/create handler returns it verbatim with HTTP 201. Debug-only: callers outside DEBUG gate this at the framework layer.

DLQService

DLQService(
    config: DLQConfig | None = None,
    repository: FailedOperationRepository | None = None,
)

Bases: StoreOperationsMixin, QueryOperationsMixin, ReplayOperationsMixin, EntryOperationsMixin, ListOperationsMixin, MaintenanceOperationsMixin, DLQServiceBase

Dead Letter Queue Service.

Provides centralized operations for managing failed operations.

Usage

service = DLQService() result = service.store_failure( domain="payment", failure_type="PG_TIMEOUT", order=order, error_message="Connection timed out", ) if result.success: print(f"Stored as DLQ entry {result.dlq_id}")

For testing with mock repository

mock_repo = Mock(spec=FailedOperationRepository) service = DLQService(repository=mock_repo)

get_dlq_service

get_dlq_service() -> DLQService

Get the singleton DLQ service instance.

reset_dlq_service

reset_dlq_service() -> None

Reset the singleton DLQ service instance.

store_to_dlq

store_to_dlq(
    domain: str,
    failure_type: str,
    entity_type: str | None = None,
    entity_id: str | None = None,
    user_id: int | None = None,
    error_code: str = "",
    error_message: str = "",
    snapshot_data: dict[str, Any] | None = None,
    request_data: dict[str, Any] | None = None,
    response_data: dict[str, Any] | None = None,
    metadata: dict[str, Any] | None = None,
    next_action_hint: str = "",
    recommended_action: str = "",
    request: Any = None,
    mode: Literal["sync", "async"] | None = None,
) -> DLQEntryResult

Convenience function to store a failure in the DLQ.

This is a shortcut for get_dlq_service().store_failure(...). See DLQService.store_failure for mode= semantics.