baldur.services — Idempotency
The idempotency service and its key/domain types, used to deduplicate at-least-once operations.
IdempotencyService
IdempotencyService(
cache_ttl: int | None = None,
time_provider: TimeProvider | None = None,
clock_skew_tolerance_seconds: float | None = None,
)
Service for checking and managing idempotency of operations.
Provides both cache-based (fast) and database-based (reliable) idempotency checking.
For framework-agnostic usage, provide lookup callbacks when calling check().
Example
Framework-agnostic usage with TimeProvider
from baldur.core.time_provider import MockTimeProvider
service = IdempotencyService(time_provider=MockTimeProvider()) key = IdempotencyKey.for_operation("order", 123, "process") result = service.check(key, lookup_fn=my_lookup)
Initialize the idempotency service.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
cache_ttl
|
int | None
|
Custom cache TTL in seconds |
None
|
time_provider
|
TimeProvider | None
|
TimeProvider for testable time operations |
None
|
clock_skew_tolerance_seconds
|
float | None
|
Clock skew tolerance for distributed checks |
None
|
DEFAULT_CACHE_TTL
property
DEFAULT_CACHE_TTL: int
Default TTL for cache-based idempotency.
EXTENDED_CACHE_TTL
property
EXTENDED_CACHE_TTL: int
Extended TTL for operations requiring longer TTL.
clock_skew_tolerance
property
clock_skew_tolerance: float
Clock skew tolerance in seconds for distributed checks.
time_provider
property
time_provider: TimeProvider
Get the time provider for this service.
now
now() -> datetime
Get current time using the configured TimeProvider.
Returns:
| Type | Description |
|---|---|
datetime
|
Current datetime from time provider |
check
check(
key: IdempotencyKey,
lookup_fn: Callable[..., Any] | None = None,
cache_ttl: int | None = None,
) -> IdempotencyResult
Check if an operation has already been processed.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
key
|
IdempotencyKey
|
The idempotency key to check |
required |
lookup_fn
|
Callable[..., Any] | None
|
Optional callback to check database |
None
|
cache_ttl
|
int | None
|
Optional custom TTL for cache |
None
|
Returns:
| Type | Description |
|---|---|
IdempotencyResult
|
IdempotencyResult with duplicate status |
Note
Gracefully degrades to DB-only check if Redis is unavailable.
When BALDUR_IDEMPOTENCY_ENABLED=false, returns
IdempotencyResult(is_duplicate=False, message="idempotency disabled")
without consulting cache or database.
mark_as_processed
mark_as_processed(
key: IdempotencyKey,
record_id: int | None = None,
ttl: int | None = None,
) -> bool
Mark an operation as processed in the cache.
Call this after successfully completing an operation.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
key
|
IdempotencyKey
|
The idempotency key |
required |
record_id
|
int | None
|
Optional record ID to cache |
None
|
ttl
|
int | None
|
Optional custom TTL |
None
|
Returns:
| Type | Description |
|---|---|
bool
|
True if cache was updated, False if cache was unavailable. |
bool
|
The operation is still considered successful even if cache fails, |
bool
|
as the DB is the source of truth. When idempotency is globally |
bool
|
disabled, returns True without writing to cache. |
batch_check
batch_check(
keys: list[IdempotencyKey],
) -> list[IdempotencyResult]
Batch idempotency check using cache get_many (Redis MGET).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
keys
|
list[IdempotencyKey]
|
List of idempotency keys to check |
required |
Returns:
| Type | Description |
|---|---|
list[IdempotencyResult]
|
List of IdempotencyResult in the same order as input keys. |
list[IdempotencyResult]
|
When idempotency is globally disabled, returns all |
list[IdempotencyResult]
|
|
batch_mark_as_processed
batch_mark_as_processed(
keys: list[IdempotencyKey], ttl: int | None = None
) -> bool
Batch mark operations as processed using cache set_many (Redis Pipeline SET).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
keys
|
list[IdempotencyKey]
|
List of idempotency keys to mark |
required |
ttl
|
int | None
|
Optional custom TTL |
None
|
Returns:
| Type | Description |
|---|---|
bool
|
True if cache was updated, False if cache was unavailable. |
bool
|
When idempotency is globally disabled, returns True without |
bool
|
writing to cache. |
acquire_lock
acquire_lock(key: IdempotencyKey, ttl_seconds: int) -> bool
Acquire a distributed lock for the given idempotency key.
Delegates to CacheProviderInterface.get_lock() → DistributedLock.
When no cache adapter is registered, the resolver returns an
in-process InMemoryCacheAdapter whose get_lock is single-process
only — sufficient for single-worker installs; multi-worker
deployments must register Redis (enforced at init time).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
key
|
IdempotencyKey
|
Idempotency key identifying the lock scope |
required |
ttl_seconds
|
int
|
Lock auto-expiry in seconds (prevents deadlock on crash) |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if lock acquired, False if already held by another process. |
release_lock
release_lock(key: IdempotencyKey) -> bool
Release a previously acquired distributed lock.
Best-effort: if release fails (network issue), TTL auto-expires the lock.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
key
|
IdempotencyKey
|
Idempotency key identifying the lock scope |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if lock was found and release attempted, False if not held. |
clear
clear(key: IdempotencyKey) -> bool
Clear an idempotency key from cache.
Use with caution - only for cleanup or testing.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
key
|
IdempotencyKey
|
The idempotency key to clear |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if cache was cleared, False if cache was unavailable. |
IdempotencyKey
dataclass
IdempotencyKey(
domain: IdempotencyDomain,
key: str,
components: dict[str, Any],
)
Represents an idempotency key with domain context.
The key is a combination of domain-specific identifiers that uniquely identify an operation.
cache_key
property
cache_key: str
Get the cache key for Redis/memcached storage.
hash
property
hash: str
Get a hash of the key for indexing.
for_operation
classmethod
for_operation(
entity_type: str,
entity_id: int | str,
operation: str,
domain: IdempotencyDomain = IdempotencyDomain.EXTERNAL_SERVICE,
) -> IdempotencyKey
Create an idempotency key for a generic operation.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
entity_type
|
str
|
Type of entity (e.g., "order", "user", "product") |
required |
entity_id
|
int | str
|
The entity ID (int PKs or string UUIDs/correlation IDs). |
required |
operation
|
str
|
The operation being performed (e.g., "process", "update") |
required |
domain
|
IdempotencyDomain
|
The domain category |
EXTERNAL_SERVICE
|
Returns:
| Type | Description |
|---|---|
IdempotencyKey
|
IdempotencyKey for the operation |
for_event
classmethod
for_event(event_id: str) -> IdempotencyKey
Create an idempotency key for event processing.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
event_id
|
str
|
The unique event ID |
required |
Returns:
| Type | Description |
|---|---|
IdempotencyKey
|
IdempotencyKey for the event |
for_resource_action
classmethod
for_resource_action(
resource_type: str,
resource_id: int,
action: str,
amount: int | None = None,
) -> IdempotencyKey
Create an idempotency key for resource actions.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
resource_type
|
str
|
Type of resource |
required |
resource_id
|
int
|
The resource ID |
required |
action
|
str
|
The action being performed |
required |
amount
|
int | None
|
Optional amount for the action |
None
|
Returns:
| Type | Description |
|---|---|
IdempotencyKey
|
IdempotencyKey for the resource action |
custom
classmethod
custom(key: str, **components: Any) -> IdempotencyKey
Create a custom idempotency key.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
key
|
str
|
The raw key string |
required |
**components
|
Any
|
Key components for debugging |
{}
|
Returns:
| Type | Description |
|---|---|
IdempotencyKey
|
IdempotencyKey with custom domain |
for_chaos_experiment
classmethod
for_chaos_experiment(
schedule_id: str,
experiment_type: str,
target_service: str,
) -> IdempotencyKey
Create an idempotency key for a Chaos experiment.
Prevents experiments of the same schedule from running concurrently.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
schedule_id
|
str
|
Schedule ID |
required |
experiment_type
|
str
|
Experiment type (e.g., latency_injection, fault_injection) |
required |
target_service
|
str
|
Target service |
required |
Returns:
| Type | Description |
|---|---|
IdempotencyKey
|
IdempotencyKey for chaos experiment |
for_chaos_service_lock
classmethod
for_chaos_service_lock(
target_service: str,
) -> IdempotencyKey
Service-level Chaos experiment lock.
Prevents two or more experiments from running concurrently on the same service. (ensures clarity of root-cause analysis)
A dual-lock pattern used together with the Schedule lock: - Service Lock: concurrency control (released when the experiment ends) - Schedule Lock: re-execution prevention (kept until TTL)
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
target_service
|
str
|
Target service name |
required |
Returns:
| Type | Description |
|---|---|
IdempotencyKey
|
IdempotencyKey for service-level chaos lock |
Usage
Example of the dual-lock pattern
service_lock = IdempotencyKey.for_chaos_service_lock(target_service) schedule_lock = IdempotencyKey.for_chaos_experiment(schedule_id, exp_type, target_service)
1. Acquire the service lock (concurrency control)
if not idempotency.acquire_lock(service_lock, ttl_seconds=7200): return "another experiment running"
2. Acquire the schedule lock (re-execution prevention)
if not idempotency.acquire_lock(schedule_lock, ttl_seconds=86400): idempotency.release_lock(service_lock) # rollback return "schedule already executed"
try: # 3. Run the experiment execute_experiment() finally: # 4. Release only the service lock (the schedule lock keeps its TTL) idempotency.release_lock(service_lock)
for_config_change
classmethod
for_config_change(
config_key: str,
new_value_hash: str,
changed_by: str,
request_id: str | None = None,
window_id: str | None = None,
) -> IdempotencyKey
Create an idempotency key for a configuration change.
Prevents the same configuration change from being applied twice.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
config_key
|
str
|
Configuration key |
required |
new_value_hash
|
str
|
Hash of the new value |
required |
changed_by
|
str
|
Who made the change |
required |
request_id
|
str | None
|
Request ID (only retries of the same request count as duplicates) |
None
|
window_id
|
str | None
|
Sliding-window ID (time-window based, recommended) |
None
|
Idempotency-scope policy: - request_id provided: only retries of the same request are duplicates - window_id provided: only the same change within the same window is a duplicate - neither: based on new_value_hash (legacy behavior)
Returns:
| Type | Description |
|---|---|
IdempotencyKey
|
IdempotencyKey for config change |
Reference: Architect Review - "distinguish intentional reconfiguration vs duplicates"
for_l2_sync
classmethod
for_l2_sync(
service_name: str, record_id: str, intended_state: str
) -> IdempotencyKey
Create an idempotency key for L2 synchronization.
Prevents the same record from being synchronized twice after recovery.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
service_name
|
str
|
Service name |
required |
record_id
|
str
|
Record ID |
required |
intended_state
|
str
|
Target state |
required |
Returns:
| Type | Description |
|---|---|
IdempotencyKey
|
IdempotencyKey for L2 sync |
for_wal_recovery
classmethod
for_wal_recovery(
wal_entry_id: str, operation: str
) -> IdempotencyKey
Create an idempotency key for WAL recovery.
Prevents the same WAL entry from being processed twice.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
wal_entry_id
|
str
|
WAL entry ID |
required |
operation
|
str
|
Recovery-operation type |
required |
Returns:
| Type | Description |
|---|---|
IdempotencyKey
|
IdempotencyKey for WAL recovery |
for_auto_adjustment
classmethod
for_auto_adjustment(
module: str, parameter: str, target_value: str
) -> IdempotencyKey
Create an idempotency key for an autonomous adjustment.
Prevents the same adjustment from being applied twice.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
module
|
str
|
Module name (circuit_breaker, retry, etc.) |
required |
parameter
|
str
|
Parameter name |
required |
target_value
|
str
|
Target value |
required |
Returns:
| Type | Description |
|---|---|
IdempotencyKey
|
IdempotencyKey for auto adjustment |
Note
Use AntiFlappingWindow separately for flapping checks. get_anti_flapping_window().check_and_record(...)
for_recovery_action
classmethod
for_recovery_action(
action_type: str,
target: str,
region_id: str,
session_id: str,
) -> IdempotencyKey
Create an idempotency key for a recovery action.
Prevents duplicate execution of the same recovery action across regions in a Multi-Region Active-Active environment.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
action_type
|
str
|
Action type ("cb_reset", "pod_restart", "dlq_retry", etc.) |
required |
target
|
str
|
Target (service name, Pod name, etc.) |
required |
region_id
|
str
|
Executing region (e.g., "ap-northeast-2") |
required |
session_id
|
str
|
Recovery session ID (prevents duplicates within the same recovery session) |
required |
Returns:
| Type | Description |
|---|---|
IdempotencyKey
|
IdempotencyKey for recovery action |
Example
Check idempotency before a CB reset
key = IdempotencyKey.for_recovery_action( action_type="cb_reset", target="payment_api", region_id="ap-northeast-2", session_id="sess-12345", )
result = idempotency_service.check(key) if result.is_duplicate: logger.info("idempotency.already_executed_another_region") return
Execute
circuit_breaker.reset("payment_api")
for_cb_reset
classmethod
for_cb_reset(
service_name: str, region_id: str, trigger_id: str
) -> IdempotencyKey
Idempotency key dedicated to a Circuit Breaker reset.
A convenience method for for_recovery_action, providing a CB-reset-specific interface.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
service_name
|
str
|
Service name (e.g., "payment_api") |
required |
region_id
|
str
|
Executing region (e.g., "ap-northeast-2") |
required |
trigger_id
|
str
|
Trigger ID (e.g., recovery session ID) |
required |
Returns:
| Type | Description |
|---|---|
IdempotencyKey
|
IdempotencyKey for CB reset action |
Example
key = IdempotencyKey.for_cb_reset( service_name="payment_api", region_id="ap-northeast-2", trigger_id="recovery-sess-abc123", )
if not idempotency_service.check(key).is_duplicate: circuit_breaker.reset("payment_api")
for_pod_restart
classmethod
for_pod_restart(
pod_name: str,
namespace: str,
region_id: str,
session_id: str,
) -> IdempotencyKey
Idempotency key dedicated to a Pod restart.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
pod_name
|
str
|
Pod name |
required |
namespace
|
str
|
Kubernetes namespace |
required |
region_id
|
str
|
Executing region |
required |
session_id
|
str
|
Recovery session ID |
required |
Returns:
| Type | Description |
|---|---|
IdempotencyKey
|
IdempotencyKey for pod restart action |
for_dlq_retry
classmethod
for_dlq_retry(
queue_name: str,
message_id: str,
region_id: str,
session_id: str,
) -> IdempotencyKey
Idempotency key dedicated to a DLQ retry.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
queue_name
|
str
|
DLQ queue name |
required |
message_id
|
str
|
Message ID |
required |
region_id
|
str
|
Executing region |
required |
session_id
|
str
|
Recovery session ID |
required |
Returns:
| Type | Description |
|---|---|
IdempotencyKey
|
IdempotencyKey for DLQ retry action |
for_dlq_replay
classmethod
for_dlq_replay(
dlq_id: int, domain: str, retry_count: int
) -> IdempotencyKey
DLQ replay dedup — prevents concurrent replay of same entry at same attempt.
Uses DLQ entry PK (guaranteed unique per DB) + retry_count to scope idempotency per attempt. Different attempts (retry_count) get different keys, allowing intentional retries while blocking concurrent duplicates.
IdempotencyDomain
Bases: str, Enum
Domains that support idempotency checking (domain-neutral).
EXTERNAL_SERVICE
class-attribute
instance-attribute
EXTERNAL_SERVICE = 'external_service'
External service calls (payments, notifications, etc.).
INTERNAL_PROCESS
class-attribute
instance-attribute
INTERNAL_PROCESS = 'internal_process'
Internal processes (inventory decrement, point accrual, etc.).
ASYNC_TASK
class-attribute
instance-attribute
ASYNC_TASK = 'async_task'
Asynchronous tasks (Celery Task, etc.).
EVENT
class-attribute
instance-attribute
EVENT = 'event'
Event handling (Webhook, message, etc.).
CUSTOM
class-attribute
instance-attribute
CUSTOM = 'custom'
Custom domain.
CHAOS_EXPERIMENT
class-attribute
instance-attribute
CHAOS_EXPERIMENT = 'chaos_experiment'
Chaos experiment execution (prevents duplicate execution of the same experiment).
CHAOS_ZOMBIE_HUNTER
class-attribute
instance-attribute
CHAOS_ZOMBIE_HUNTER = 'chaos_zombie_hunter'
Zombie Hunter distributed lock (prevents duplicate rollback of orphan experiments).
Detects orphaned experiments and cleans them up safely.
CONFIG_CHANGE
class-attribute
instance-attribute
CONFIG_CHANGE = 'config_change'
Configuration change (prevents duplicate application of the same change).
L2_SYNC
class-attribute
instance-attribute
L2_SYNC = 'l2_sync'
L2 storage synchronization (prevents duplicate re-sync after recovery).
WAL_RECOVERY
class-attribute
instance-attribute
WAL_RECOVERY = 'wal_recovery'
WAL recovery (prevents duplicate processing of the same entry).
AUTO_ADJUSTMENT
class-attribute
instance-attribute
AUTO_ADJUSTMENT = 'auto_adjustment'
Autonomous adjustment (prevents duplicate application of the same adjustment).
RECOVERY_ACTION
class-attribute
instance-attribute
RECOVERY_ACTION = 'recovery_action'
Recovery action (CB reset, Pod restart, DLQ retry, etc.) duplicate-execution prevention.
get_idempotency_service
get_idempotency_service() -> IdempotencyService
Get the singleton IdempotencyService instance.