baldur.adapters.sql — Framework-Free SQL Adapter (DB-API 2.0)
Generic repository base plus priority-1 SQL-backed implementations of Baldur's core repositories. Works with any DB-API 2.0 driver (psycopg2, mysql-connector-python, stdlib sqlite3) — selected by DSN scheme.
sql
Framework-free SQL adapter (DB-API 2.0).
Provides a generic repository base plus priority-1 SQL-backed implementations of Baldur's core repositories. Works with any DB-API 2.0 driver — psycopg2, mysql-connector-python, or the stdlib sqlite3 — selected by DSN scheme.
Status: Public
GenericSQLRepository
GenericSQLRepository(
get_connection: Callable[[], Any],
*,
dialect: SQLDialect | None = None,
autocommit_delegated: bool | None = None,
schema: (
tuple[str, int, Callable[[SQLDialect], list[str]]]
| None
) = None
)
DB-API 2.0 repository helpers.
Subclasses inherit this class plus a domain-specific ABC
(FailedOperationRepository etc.). The base exposes helpers only
— no ABC method has a default implementation here, so there is no
MRO ambiguity.
Connection ownership: get_connection is a user-supplied
callable. Baldur does not own a pool. Each helper borrows a
connection via the callable and relies on the callable's
close() / return-to-pool semantics. Common implementations:
get_connection = lambda: psycopg2.connect(DSN)— direct.get_connection = engine.raw_connection— SQLAlchemy pool.get_connection = lambda: pgbouncer_pool.getconn()— external pooler.
SchemaVersionManager
SchemaVersionManager(
get_connection: Callable[[], Any],
*,
dialect: SQLDialect
)
Owns the baldur_schema_version bookkeeping table.
Repos call ensure(repo_name, version, ddl_statements) during
first use. DDL runs exactly once per (repo_name, version) pair per
database — subsequent calls are no-ops.
SQLCascadeEventArchiveRepository
SQLCascadeEventArchiveRepository(
get_connection: Callable[[], Any],
*,
dialect: SQLDialect | None = None,
autocommit_delegated: bool | None = None
)
Bases: GenericSQLRepository, CascadeEventArchiveRepository
DB-API 2.0 backed cascade event archive repository.
SQLCircuitBreakerStateRepository
SQLCircuitBreakerStateRepository(
get_connection: Callable[[], Any],
*,
dialect: SQLDialect | None = None,
autocommit_delegated: bool | None = None
)
Bases: GenericSQLRepository, CircuitBreakerStateRepository
DB-API 2.0 backed circuit-breaker state repository.
try_acquire_half_open_slot
try_acquire_half_open_slot(
service_name: str,
limit: int,
stuck_timeout_seconds: int,
) -> tuple[bool, str, str]
Atomic HALF_OPEN slot acquisition.
Uses SELECT ... FOR UPDATE NOWAIT (PostgreSQL/MySQL 8+) to
serialize concurrent acquires; on lock contention, fails open with
(False, current_state, current_state). SQLite falls back
to its implicit single-writer model (no NOWAIT clause).
reset_half_open_count
reset_half_open_count(service_name: str) -> None
Reset HALF_OPEN counter and clear window watermark.
record_success_with_close_check
record_success_with_close_check(
service_name, success_threshold
)
Atomic HALF_OPEN -> CLOSED close-check via SELECT FOR UPDATE NOWAIT.
Mirrors try_acquire_half_open_slot's SQL pattern.
Concurrent transactions on the same row are serialized by the
row-level lock; on NOWAIT lock contention the driver exception
is re-raised so the Layered wrapper records the
degraded-mode metric and delegates to L1.
Branches mirror the Redis Lua:
- state='half_open': increment success_count; transition
to CLOSED + reset counters/watermarks when the threshold is
crossed (did_close=True). Otherwise persist the increment.
- state='closed': race-loser / post-crash convergence -- no
write, return (did_close=False, state='closed', count=0).
- state in {open, missing, unknown}: stale relative to the
caller's HALF_OPEN expectation; the wrapper's stale-L2 guard
falls back to L1's atomic close path.
SQLEventJournalRepository
SQLEventJournalRepository(
get_connection: Callable[[], Any],
*,
dialect: SQLDialect | None = None,
autocommit_delegated: bool | None = None,
max_query_limit: int = 10000
)
SQLFailedOperationRepository
SQLFailedOperationRepository(
get_connection: Callable[[], Any],
*,
dialect: SQLDialect | None = None,
autocommit_delegated: bool | None = None
)
Bases: GenericSQLRepository, FailedOperationRepository
DB-API 2.0 backed DLQ repository.
count_created_in_window
count_created_in_window(
start: datetime, end: datetime
) -> int
Count rows whose created_at is in the inclusive [start, end].
Backed by idx_baldur_dlq_created_at — a range seek, not a scan.
get_facet_counts
get_facet_counts(
*, status: str | None = None, domain: str | None = None
) -> dict[str, dict[str, int]]
Faceted status×domain counts via GROUP BY.
by_status is scoped by domain; by_domain is scoped by
status. GROUP BY drops zero-count buckets structurally.
The domain-scoped by_status (WHERE domain GROUP BY status) is a
covering scan over idx_baldur_dlq_status_domain; the status-scoped
by_domain (WHERE status GROUP BY domain) is a prefix seek on the
same composite. Both exact, both fine on the cold operator read path.
try_acquire_for_replay
try_acquire_for_replay(
id: str, max_retries: int, force: bool = False
) -> FailedOperationData | None
Atomically flip PENDING → REPLAYING if retry budget remains.
Uses a conditional UPDATE as the concurrency guard. cursor.rowcount
0 means this worker won the race; the row is then re-read within the same transaction so no other writer can delete or mutate it between the claim and the DTO return.
force=True is the operator cap-override: it widens the WHERE status
set to {PENDING, REQUIRES_REVIEW}, drops the retry_count < max_retries
bound, resets retry_count to a fresh budget (1), and stamps the
metadata history scar into the JSON payload — all inside the same
transaction so the SELECT→UPDATE claim stays race-free against a
concurrent sweep. See FailedOperationRepository.try_acquire_for_replay.
SQLPostmortemRepository
SQLPostmortemRepository(
get_connection: Callable[[], Any],
*,
dialect: SQLDialect | None = None,
autocommit_delegated: bool | None = None
)
SQLRecoverySessionArchiveRepository
SQLRecoverySessionArchiveRepository(
get_connection: Callable[[], Any],
*,
dialect: SQLDialect | None = None,
autocommit_delegated: bool | None = None
)
Bases: GenericSQLRepository, RecoverySessionArchiveRepository
DB-API 2.0 backed recovery session archive repository.
SQLSecurityIncidentRepository
SQLSecurityIncidentRepository(
get_connection: Callable[[], Any],
*,
dialect: SQLDialect | None = None,
autocommit_delegated: bool | None = None
)
Bases: GenericSQLRepository, SecurityIncidentRepository
DB-API 2.0 backed security incident repository.
SQLStatisticsRepository
SQLStatisticsRepository(
get_connection: Callable[[], Any],
*,
dialect: SQLDialect | None = None,
autocommit_delegated: bool | None = None
)
Bases: GenericSQLRepository, StatisticsRepositoryInterface
DB-API 2.0 backed statistics repository.
Reads from baldur_dlq and baldur_cb_state tables directly.
Does not own a table — schema bootstrap is skipped.
sql_transaction
sql_transaction(conn: Any) -> Any
Suspend repo-scoped auto-commit for the duration of the block.
Usage::
with sql_transaction(conn):
dlq_repo.save(...)
cb_repo.update(...)
# single commit (or rollback on exception) applies to both.
All repositories whose get_connection returns conn during
the block skip their per-call commit. The context manager itself
issues the final commit, or rollback on exception.