Skip to content

baldur_pro.services.throttle — Adaptive Throttle

Adaptive request throttling: ThrottleConfig, AdaptiveThrottle, and the ThrottlePolicy surface.

🔒 PRO Feature — requires a baldur-pro license

These symbols ship in the baldur-pro distribution. PRO modules import normally — there is no ImportError. PRO features activate only when baldur.init() runs with a valid BALDUR_LICENSE_KEY; without it the system runs with OSS defaults and register_pro_services() logs entitlement.pro_registration_skipped.

throttle

Throttle Services for Baldur System.

Provides framework-agnostic throttling with: - Base throttle logic (sliding window, token bucket) - Netflix Gradient Adaptive Throttling - DRF adapter for Django integration

Usage

from baldur_pro.services.throttle import AdaptiveThrottle, ThrottleConfig

throttle = AdaptiveThrottle( config=ThrottleConfig( initial_limit=100, min_limit=10, max_limit=500, ) )

Check if request is allowed

allowed, info = throttle.check("user_123")

Record response time for gradient calculation

throttle.record_response(response_time_ms=45.2)

Status: Public

AdaptiveThrottle

AdaptiveThrottle(config: ThrottleConfig | None = None)

Bases: CircuitBreakerHandlerMixin, RateLimitHandlerMixin, GovernanceEventMixin, ErrorBudgetHandlerMixin, EmergencyModeMixin, FullStopMixin, RecoveryDampeningMixin, ThrottleDLQReplayMixin, SlidingWindowThrottle

Netflix Gradient-based Adaptive Throttle.

Dynamically adjusts rate limits based on response time trends. Extends SlidingWindowThrottle with gradient-based limit adjustment. Includes DLQ integration for throttle rejection storage and recovery replay.

Emergency Mode integration: - Automatically adjusts limit based on Emergency Level - At LEVEL_3, gradient calculation continues but application is frozen - Emergency multiplier applied as Hard-Cap

Governance integration: - Lazy import of check_all_governance for Kill Switch / Emergency / Error Budget / Break Glass checks (Fail-Open when ENT absent) - Kill Switch EventBus subscription for immediate gradient freeze - Break Glass activation releases Full Stop and starts Recovery Dampening - check_all_governance() Safety Net check in _maybe_adjust_limit() Control Plane

conservative_limit property

conservative_limit: int

Conservative limit with Min-Winner policy.

Returns the lowest among RTT-based limit, 429-based limit, and Error Budget-based limit.

record_response

record_response(rtt_ms: float) -> None

Record response time and potentially adjust limit.

Call this after each successful request with the response time.

Also emits THROTTLE_LIMIT_RECOVERED event when limit recovers from min_limit.

Parameters:

Name Type Description Default
rtt_ms float

Response time in milliseconds

required

check

check(
    key: str,
    tier_id: str = "standard",
    context: dict | None = None,
    store_rejection: bool = True,
) -> ThrottleResult

Check if request is allowed with adaptive info and priority protection.

On rejection with DLQ auto-storage: - If context is provided and store_rejection=True, rejected requests are stored to DLQ - store_rejection=False skips DLQ storage (prevents cycles during Replay rejection)

Error Budget Gate integration: - In ERROR_BUDGET_CRITICAL state, additionally rejects non_essential tier

Parameters:

Name Type Description Default
key str

Request identifier

required
tier_id str

Request tier (critical/standard/non_essential)

'standard'
context dict | None

Request context (for DLQ storage, domain/tier_id/trace_id etc.)

None
store_rejection bool

Whether to store rejection to DLQ (default True)

True

Returns:

Type Description
ThrottleResult

ThrottleResult with adaptive info

allow_request

allow_request(ident: str) -> ThrottleResult

OSS-facing wrapper used by DRF AdaptiveDRFThrottle bridge.

Delegates to check() with default tier/context. Identifier maps directly to the rate-limit key. PRO callers should prefer check() for tiered / context-aware rejection-storage flows.

get_status

get_status(key: str) -> ThrottleResult

Return current throttle state for key without consuming a slot.

Used by DRF AdaptiveDRFThrottle.wait() to compute reset_at-based backoff after a 429. Reads the same sliding-window counters as check() but does not mutate them.

close

close() -> None

Unsubscribe all EventBus handlers.

get_stats

get_stats() -> dict

Get adaptive throttle statistics.

reset_all

reset_all() -> None

Reset all state including gradient calculator and emergency state.

rollback_to_base_limit

rollback_to_base_limit() -> int

Immediately rollback to pre-emergency base limit.

Returns:

Type Description
int

The rolled-back limit

get_config_snapshot

get_config_snapshot() -> dict

Return current throttle config state snapshot.

Used for audit logging and config change tracking.

Returns:

Type Description
dict

Current config state dictionary

swap_config

swap_config(new_config: ThrottleConfig) -> ThrottleConfig

Atomically swap the Config object.

Reference assignment is atomic under the GIL, so this is safe even while _maybe_adjust_limit() is running. Derived state (_current_limit, _base_limit_before_emergency, etc.) is not changed (SLA values swap only).

Parameters:

Name Type Description Default
new_config ThrottleConfig

New ThrottleConfig (created via model_copy etc.)

required

Returns:

Type Description
ThrottleConfig

Previous config object (for rollback retention)

GradientCalculator

GradientCalculator(
    smoothing_factor: float = 0.5,
    sample_window_seconds: float = 10.0,
    min_samples: int = 3,
)

RTT Gradient calculator — based on Exponential Moving Average.

Use cases: 1. AdaptiveThrottle: the original use case (dynamic limit adjustment) 2. TrafficGate/AdmissionControl: providing the expected processing time for Dynamic Fast-Fail

Positive gradient = RTT increasing (overload) Negative gradient = RTT decreasing (recovery) Zero gradient = stable

Parameters:

Name Type Description Default
smoothing_factor float

Weight for new samples (0-1, higher = more reactive)

0.5
sample_window_seconds float

How far back to look for samples

10.0
min_samples int

Minimum samples needed for gradient calculation

3

add_sample

add_sample(rtt_ms: float) -> None

Add a new RTT sample.

get_gradient

get_gradient() -> float

Calculate current RTT gradient.

Returns:

Type Description
float

Gradient value:

float
  • 0: RTT increasing (should decrease limit)

float
  • < 0: RTT decreasing (can increase limit)
float
  • 0: Stable or insufficient data

get_current_rtt

get_current_rtt() -> float | None

Get current smoothed RTT.

get_snapshot

get_snapshot() -> tuple[float | None, float]

Return the current RTT + gradient within a single Lock.

Reduces Lock acquisitions from 2 → 1 to improve hot-path performance.

Returns:

Type Description
tuple[float | None, float]

(current_rtt_ms, gradient) tuple

get_stats

get_stats() -> dict

Get calculator statistics.

reset

reset() -> None

Reset calculator state.

BaseThrottle

BaseThrottle(config: ThrottleConfig | None = None)

Bases: ABC

Abstract base class for throttle implementations.

current_limit property writable

current_limit: int

Get current rate limit.

check abstractmethod

check(key: str) -> ThrottleResult

Check if request is allowed.

Parameters:

Name Type Description Default
key str

Unique identifier (user_id, ip, etc.)

required

Returns:

Type Description
ThrottleResult

ThrottleResult with allowed status and metadata

reset abstractmethod

reset(key: str) -> None

Reset throttle state for a key.

get_stats

get_stats() -> dict

Get throttle statistics.

SlidingWindowThrottle

SlidingWindowThrottle(config: ThrottleConfig | None = None)

Bases: BaseThrottle

In-memory sliding window throttle.

Thread-safe implementation for single-process use. For distributed systems, use Redis-based implementation.

check

check(key: str) -> ThrottleResult

Check if request is allowed using sliding window.

reset

reset(key: str) -> None

Reset throttle state for a key.

reset_all

reset_all() -> None

Reset all throttle state (for testing).

get_stats

get_stats() -> dict

Get throttle statistics.

RTTMetrics dataclass

RTTMetrics(
    current_rtt_ms: float | None = None,
    smoothed_rtt_ms: float | None = None,
    gradient: float = 0.0,
    sample_count: int = 0,
    last_updated: datetime | None = None,
)

RTT-related metrics.

RTTSeverity

Bases: str, Enum

RTT severity level.

ServiceHealthMetrics dataclass

ServiceHealthMetrics(
    service_name: str,
    rtt: RTTMetrics = RTTMetrics(),
    cb_state: str = "closed",
    cb_failure_count: int = 0,
    cb_success_count: int = 0,
    throttle_current_limit: int | None = None,
    sla_warning_triggered: bool = False,
    sla_critical_triggered: bool = False,
)

Aggregated service health metrics.

ThrottleCircuitBreakerBridge

ThrottleCircuitBreakerBridge(
    sla_warning_ms: int | None = None,
    sla_critical_ms: int | None = None,
    gradient_warning_threshold: float | None = None,
)

Data sharing bridge between Throttle and Circuit Breaker.

Uses RTT data for CB failure detection and reflects CB state in Throttle limit adjustment.

Initialize.

Parameters:

Name Type Description Default
sla_warning_ms int | None

SLA Warning threshold (ms)

None
sla_critical_ms int | None

SLA Critical threshold (ms)

None
gradient_warning_threshold float | None

Gradient warning threshold

None

record_rtt

record_rtt(
    service_name: str,
    rtt_ms: float,
    gradient: float | None = None,
) -> RTTSeverity

Record RTT data and CB feedback.

If RTT exceeds the threshold, a failure signal may be sent to the CB.

Parameters:

Name Type Description Default
service_name str

service name

required
rtt_ms float

response time (ms)

required
gradient float | None

RTT rate of change (optional)

None

Returns:

Type Description
RTTSeverity

RTT severity level

record_success

record_success(
    service_name: str, rtt_ms: float | None = None
) -> None

Record a successful request (RTT normal).

If RTT is within the normal range, it is counted as a CB success.

Parameters:

Name Type Description Default
service_name str

service name

required
rtt_ms float | None

response time (optional)

None

get_cb_state

get_cb_state(service_name: str) -> str | None

Query the service's CB state.

Parameters:

Name Type Description Default
service_name str

service name

required

Returns:

Type Description
str | None

CB state ("closed", "open", "half_open") or None

get_cb_info

get_cb_info(service_name: str) -> dict[str, Any] | None

Query the service's CB info (detailed).

Parameters:

Name Type Description Default
service_name str

service name

required

Returns:

Type Description
dict[str, Any] | None

CB state info dictionary or None

should_record_as_cb_failure

should_record_as_cb_failure(
    service_name: str,
    rtt_ms: float,
    consecutive_critical_count: int = 3,
) -> bool

Determine whether to record as a CB failure based on RTT.

If the consecutive Critical count exceeds the threshold, record as a CB failure.

Parameters:

Name Type Description Default
service_name str

service name

required
rtt_ms float

response time (ms)

required
consecutive_critical_count int

consecutive Critical count threshold

3

Returns:

Type Description
bool

Whether it should be recorded as a CB failure

get_service_health

get_service_health(
    service_name: str,
) -> ServiceHealthMetrics

Query aggregated service health metrics.

Parameters:

Name Type Description Default
service_name str

service name

required

Returns:

Type Description
ServiceHealthMetrics

Aggregated metrics

get_all_service_health

get_all_service_health() -> list[ServiceHealthMetrics]

Query health status of all services.

Returns:

Type Description
list[ServiceHealthMetrics]

List of per-service metrics

reset

reset() -> None

Reset the bridge (for testing).

ThrottleConfig

Bases: BaseSettings

Netflix Gradient-based adaptive throttle settings.

Dynamically adjusts request limits based on RTT gradient.

validate_max_limit classmethod

validate_max_limit(v: int, info) -> int

Ensure max_limit is greater than min_limit.

validate_sla_critical classmethod

validate_sla_critical(v: int, info) -> int

Ensure sla_critical_ms is greater than sla_warning_ms.

from_dict classmethod

from_dict(data: dict) -> ThrottleSettings

Create settings from dictionary (backward compat with ThrottleConfig).

from_settings classmethod

from_settings() -> ThrottleSettings

Create from current settings (backward compat with ThrottleConfig).

ThrottleDeniedRequest dataclass

ThrottleDeniedRequest(
    service_name: str,
    request_key: str,
    request_data: dict[str, Any],
    denied_at: datetime = (lambda: utc_now())(),
    reason: str = "rate_limit_exceeded",
    dlq_entry_id: str | None = None,
)

Information about a request denied by Throttle.

ThrottleDLQConfig dataclass

ThrottleDLQConfig(
    enabled: bool = True,
    dlq_domain: str = "throttle",
    auto_replay_on_cb_close: bool = True,
    replay_batch_size: int = 50,
    replay_interval_seconds: float = 5.0,
)

Throttle DLQ integration settings.

ThrottleDLQIntegration

ThrottleDLQIntegration(
    config: ThrottleDLQConfig | None = None,
)

Bases: EventEmitterMixin

Manages integration between Throttle and DLQ.

Stores in the DLQ when a Throttle deny occurs, and automatically replays on receiving a CB CLOSE event.

store_denied_request

store_denied_request(
    service_name: str,
    request_key: str,
    request_data: dict[str, Any],
    throttle_limit: int,
    current_count: int,
) -> ThrottleDeniedRequest | None

Store a Throttle-denied request in the DLQ.

Parameters:

Name Type Description Default
service_name str

service name

required
request_key str

rate limit key (IP + user combination)

required
request_data dict[str, Any]

request data

required
throttle_limit int

current throttle limit

required
current_count int

current request count

required

Returns:

Type Description
ThrottleDeniedRequest | None

Stored request info or None (when disabled/failed)

replay_denied_requests

replay_denied_requests(
    service_name: str | None = None,
    batch_size: int | None = None,
) -> dict[str, Any]

Replay denied requests stored in the DLQ.

Parameters:

Name Type Description Default
service_name str | None

replay only a specific service (None for all)

None
batch_size int | None

batch size

None

Returns:

Type Description
dict[str, Any]

replay result statistics

on_circuit_breaker_closed

on_circuit_breaker_closed(
    service_name: str,
) -> dict[str, Any]

CB CLOSE event handler.

When the CB closes, automatically replays the denied requests of that service.

Parameters:

Name Type Description Default
service_name str

service name

required

Returns:

Type Description
dict[str, Any]

replay result

get_pending_count

get_pending_count(service_name: str) -> int

Query the per-service pending request count.

get_all_pending_counts

get_all_pending_counts() -> dict[str, int]

Query the pending request count for all services.

ThrottleDLQSink

ThrottleDLQSink(dlq_integration: Any | None = None)

Sink that stores Throttle-rejected requests in the DLQ.

Separates AdaptiveThrottle's ThrottleDLQReplayMixin and _auto_store_rejection_to_dlq() into an independent component.

Fail-Open: ignore when the DLQ module is not installed or storage fails.

Initialize.

Parameters:

Name Type Description Default
dlq_integration Any | None

ThrottleDLQIntegration instance. If None, uses the global instance via lazy import.

None

handle_rejection

handle_rejection(
    context: dict[str, Any], reason: str
) -> None

Store a rejected request in the DLQ.

Same behavior as AdaptiveThrottle._auto_store_rejection_to_dlq(). Extracts domain, order_id, user_id, etc. from context and records them in the DLQ.

Parameters:

Name Type Description Default
context dict[str, Any]

request context (domain, order_id, tier_id, etc.)

required
reason str

rejection reason ("rate_limit_exceeded", etc.)

required

AdaptiveThrottleFacade

AdaptiveThrottleFacade(
    policy: Any,
    guards: list[Any] | None = None,
    limit_adjuster: Any | None = None,
    sinks: list[Any] | None = None,
)

Facade that preserves the legacy check()/record_response() API.

Internally delegates to ThrottlePolicy.

Usage example::

facade = AdaptiveThrottleFacade(
    policy=throttle_policy,
    guards=[governance_guard, full_stop_guard],
    sinks=[dlq_sink],
)
result = facade.check("user_123")
facade.record_response(rtt_ms=45.2)

Initialize.

Parameters:

Name Type Description Default
policy Any

ThrottlePolicy instance

required
guards list[Any] | None

list of Guards (ThrottleGovernanceGuard, FullStopGuard, etc.)

None
limit_adjuster Any | None

ThrottleLimitAdjuster (EventBus limit adjustment component)

None
sinks list[Any] | None

list of components handling rejections, such as ThrottleDLQSink

None

current_limit property writable

current_limit: int

Current rate limit (delegates to ThrottlePolicy).

check

check(
    key: str,
    tier_id: str = "standard",
    context: dict[str, Any] | None = None,
    store_rejection: bool = True,
) -> ThrottleResult

Identical signature to the original AdaptiveThrottle.check() API.

Execution order: 1. Check Guards in order (return immediately if any rejects) 2. Rate limit check of the ThrottlePolicy internal engine 3. DLQ Sink handling on rejection

Parameters:

Name Type Description Default
key str

request identifier (user_id, ip, etc.)

required
tier_id str

request tier ("critical"/"standard"/"non_essential")

'standard'
context dict[str, Any] | None

request context (for DLQ storage, domain/order_id, etc.)

None
store_rejection bool

whether to store in the DLQ on rejection

True

Returns:

Type Description
ThrottleResult

ThrottleResult — allow/reject decision + adaptive info

record_response

record_response(rtt_ms: float) -> None

Record RTT + dynamically adjust limit based on Gradient.

Delegates the same behavior as the original AdaptiveThrottle.record_response() to ThrottlePolicy.

Parameters:

Name Type Description Default
rtt_ms float

response time (milliseconds)

required

get_stats

get_stats() -> dict[str, Any]

Return Throttle statistics.

RTTSample dataclass

RTTSample(timestamp: float, rtt_ms: float)

Single RTT sample.

ThrottleLimitAdjuster

ThrottleLimitAdjuster()

Component that receives EventBus system-wide events and adjusts the ThrottlePolicy limit.

Consolidates the four EventBus subscription logics separated from AdaptiveThrottle: - RATE_LIMIT_429 → decrease limit - RATE_LIMIT_COOLDOWN_END → start Recovery Dampening - ERROR_BUDGET_WARNING/CRITICAL → decrease limit - ERROR_BUDGET_RECOVERED → start Recovery Dampening - LOAD_SHEDDING_LEVEL_CHANGED → adjust limit - KILL_SWITCH_ACTIVATED → Gradient Freeze - KILL_SWITCH_DEACTIVATED → start Recovery Dampening

Same lifecycle pattern as HedgingConfigUpdateHook (hedging.py): register(policy) → start() → begin EventBus subscription

register

register(policy: Any) -> None

Register a ThrottlePolicy as a limit adjustment target.

Parameters:

Name Type Description Default
policy Any

ThrottlePolicy instance. Must expose reduce_limit_for_429(), adjust_limit_for_error_budget(), adjust_limit_for_shedding(), start_recovery_dampening(), and the gradient_frozen setter.

required

start

start() -> None

Begin EventBus subscription.

Does nothing in an environment without EventBus (Fail-Open). Ignores duplicate calls.

close

close() -> None

Unsubscribe all EventBus handlers and release resources.

Idempotent: safe to call multiple times.

ThrottlePolicy

ThrottlePolicy(
    config: ThrottleConfig | None = None,
    gradient_calculator: Any | None = None,
    dampening_manager: Any | None = None,
)

Bases: ResiliencePolicy[T]

Pure Throttle Policy — wraps SlidingWindowThrottle + GradientCalculator.

Removes all 13 hardcoded external dependencies (Emergency, CB, ErrorBudget, KillSwitch, LoadShedding, DLQ, Prometheus, Audit, etc.) and performs only rate limit checks + gradient-based limit adjustment.

Consumers optionally register Guards/Hooks/Sinks via PolicyComposer.add_guard()/add_hook()/add_sink().

Usage example::

from baldur_pro.services.throttle.policy import ThrottlePolicy
policy = ThrottlePolicy(config=ThrottleConfig(initial_limit=100))
result = policy.execute(call_api, order_id=123)

Initialize.

Parameters:

Name Type Description Default
config ThrottleConfig | None

Throttle settings (ThrottleSettings compatible).

None
gradient_calculator Any | None

GradientCalculator instance. If None, created by default with config.smoothing_factor.

None
dampening_manager Any | None

RecoveryDampeningManager instance. If None, no Dampening Cap is applied (gradient freely adjusts up to max_limit).

None

name property

name: str

Policy identifier.

current_limit property writable

current_limit: int

Current rate limit.

gradient_frozen property writable

gradient_frozen: bool

Whether Gradient adjustment is frozen.

execute

execute(
    func: Callable[..., T],
    *args: Any,
    context: PolicyContext | None = None,
    **kwargs: Any
) -> PolicyResult[T]

Run the function after a Throttle check.

When the rate limit is passed, executes the function, measures RTT, and automatically adjusts the limit based on the gradient.

throttle key resolution order: 1. context.extra["throttle_key"] (per-request specification) 2. config.service_name (per-Policy-instance default) 3. "default" (final fallback)

Parameters:

Name Type Description Default
func Callable[..., T]

function to execute

required
*args Any

positional arguments for the function

()
context PolicyContext | None

PolicyContext (throttle_key, tier_id, etc.)

None
**kwargs Any

keyword arguments for the function

{}

Returns:

Type Description
PolicyResult[T]

PolicyResult — function execution result on pass, REJECTED on rejection

check

check(key: str) -> ThrottleResult

Pure rate limit check (used without execute()).

Used by AdaptiveThrottleFacade for legacy API compatibility.

Parameters:

Name Type Description Default
key str

request identifier

required

Returns:

Type Description
ThrottleResult

ThrottleResult — allow/reject decision

record_response

record_response(rtt_ms: float) -> None

Record RTT + dynamic gradient-based limit adjustment.

Used when executing the function directly outside execute(). Supports AdaptiveThrottleFacade's legacy record_response() API.

Parameters:

Name Type Description Default
rtt_ms float

response time (milliseconds)

required

reduce_limit_for_429

reduce_limit_for_429(event: Any) -> None

Limit decrease triggered by a 429 Rate Limit event.

Called by ThrottleLimitAdjuster on receiving an EventBus RATE_LIMIT_429 event. Same decrease logic as AdaptiveThrottle._handle_rate_limit_429().

Parameters:

Name Type Description Default
event Any

EventBus event (refers to data.reduction_percent)

required

adjust_limit_for_error_budget

adjust_limit_for_error_budget(multiplier: float) -> None

Limit adjustment triggered by an Error Budget event.

Called by ThrottleLimitAdjuster on receiving an ERROR_BUDGET_WARNING/CRITICAL event.

Parameters:

Name Type Description Default
multiplier float

multiplier to apply (0.8=20% decrease, 0.5=50% decrease)

required

adjust_limit_for_shedding

adjust_limit_for_shedding(suggested_limit: int) -> None

Limit adjustment triggered by a Load Shedding event.

Called by ThrottleLimitAdjuster on receiving a LOAD_SHEDDING_LEVEL_CHANGED event.

Parameters:

Name Type Description Default
suggested_limit int

suggested limit (applied if lower than the current limit)

required

start_recovery_dampening

start_recovery_dampening(
    target_limit: int | None = None,
) -> None

Start Recovery Dampening.

Called by ThrottleLimitAdjuster on receiving a KILL_SWITCH_DEACTIVATED or ERROR_BUDGET_RECOVERED event.

Parameters:

Name Type Description Default
target_limit int | None

recovery target limit. If None, uses config.initial_limit.

None

RecoveryDampeningConfig dataclass

RecoveryDampeningConfig(
    enabled: bool = True,
    phase_1_ratio: float = 0.8,
    phase_2_ratio: float = 0.9,
    phase_1_duration_seconds: float = 30.0,
    phase_2_duration_seconds: float = 30.0,
)

Recovery Dampening settings.

RecoveryDampeningManager

RecoveryDampeningManager(
    config: RecoveryDampeningConfig | None = None,
    on_limit_change: (
        Callable[[str, int], None] | None
    ) = None,
)

Recovery Dampening manager.

Manages gradual recovery on CB CLOSE or emergency mode release. The Gradient algorithm keeps computing in the background; only application is deferred.

Initialize.

Parameters:

Name Type Description Default
config RecoveryDampeningConfig | None

dampening settings

None
on_limit_change Callable[[str, int], None] | None

callback invoked on limit change (service_name, new_limit)

None

start_recovery

start_recovery(
    service_name: str, target_limit: int, current_limit: int
) -> int

Start recovery and return the Phase 1 limit.

Parameters:

Name Type Description Default
service_name str

service name

required
target_limit int

final target limit

required
current_limit int

current limit (before recovery)

required

Returns:

Type Description
int

Phase 1 limit (80% of target)

cancel_recovery

cancel_recovery(service_name: str) -> None

Cancel recovery (on a new failure).

is_recovery_active

is_recovery_active(service_name: str) -> bool

Check whether recovery is in progress.

get_current_multiplier

get_current_multiplier(service_name: str) -> float

Return the multiplier of the current recovery phase.

store_pending_gradient_limit

store_pending_gradient_limit(
    service_name: str, gradient_limit: int
) -> None

Store the Gradient calculation result (deferred application).

During recovery the Gradient algorithm keeps computing, but the limit is applied only after recovery completes.

Parameters:

Name Type Description Default
service_name str

service name

required
gradient_limit int

Gradient-computed limit

required

get_recovery_state

get_recovery_state(
    service_name: str,
) -> dict[str, Any] | None

Query the service recovery state.

get_all_recovery_states

get_all_recovery_states() -> list[dict[str, Any]]

Query all recovery states.

reset

reset() -> None

Reset all state (for testing).

RecoveryPhase

Bases: str, Enum

Recovery phase.

ServiceRecoveryState dataclass

ServiceRecoveryState(
    service_name: str,
    target_limit: int,
    current_phase: RecoveryPhase = RecoveryPhase.PHASE_1,
    phase_started_at: float = time.time(),
    is_active: bool = True,
    pending_gradient_limit: int | None = None,
)

Per-service recovery state.

RedisThrottleLimitManager

RedisThrottleLimitManager(
    redis_client: Any,
    key_prefix: str = "baldur:",
    default_ttl_seconds: int = 3600,
)

Redis-based Throttle Limit manager.

Uses Lua scripts to perform atomic limit updates.

Initialize.

Parameters:

Name Type Description Default
redis_client Any

Redis client

required
key_prefix str

Redis key prefix

'baldur:'
default_ttl_seconds int

default TTL (seconds)

3600

update_limit_atomic

update_limit_atomic(
    service_name: str,
    new_limit: int,
    min_limit: int,
    max_limit: int,
    save_as_safe: bool = False,
) -> tuple[int, int]

Atomic limit update.

Parameters:

Name Type Description Default
service_name str

service name

required
new_limit int

new limit

required
min_limit int

minimum limit

required
max_limit int

maximum limit

required
save_as_safe bool

whether to save as the last safe limit

False

Returns:

Type Description
tuple[int, int]

(previous limit, new limit)

update_limit_cas

update_limit_cas(
    service_name: str,
    expected_current: int,
    new_limit: int,
    min_limit: int,
    max_limit: int,
) -> tuple[bool, int, str]

Conditional limit update (Compare-And-Swap).

Updates only when the current value matches expected_current.

Parameters:

Name Type Description Default
service_name str

service name

required
expected_current int

expected current value

required
new_limit int

new limit

required
min_limit int

minimum limit

required
max_limit int

maximum limit

required

Returns:

Type Description
tuple[bool, int, str]

(success, actual current value, message)

load_safe_limit

load_safe_limit(
    service_name: str,
    default_limit: int,
    max_age_seconds: int = 3600,
) -> tuple[int, str]

Load the last safe limit on Cold Start.

Uses the last safe limit when there is no current limit.

Parameters:

Name Type Description Default
service_name str

service name

required
default_limit int

default limit

required
max_age_seconds int

maximum valid age of the safe limit

3600

Returns:

Type Description
tuple[int, str]

(limit value, source: "CURRENT", "SAFE", "DEFAULT")

save_safe_limit

save_safe_limit(service_name: str, limit: int) -> bool

Save the last safe limit.

Parameters:

Name Type Description Default
service_name str

service name

required
limit int

limit to save

required

Returns:

Type Description
bool

Whether it succeeded

get_current_limit

get_current_limit(service_name: str) -> int | None

Query the current limit.

add_rtt_sample

add_rtt_sample(
    service_name: str,
    rtt_ms: float,
    window_seconds: float = 60.0,
    max_samples: int = 100,
) -> int

Add an RTT sample.

Parameters:

Name Type Description Default
service_name str

service name

required
rtt_ms float

RTT (ms)

required
window_seconds float

sample retention time

60.0
max_samples int

maximum number of samples

100

Returns:

Type Description
int

Current number of samples

get_rtt_samples

get_rtt_samples(
    service_name: str, window_seconds: float = 60.0
) -> list[float]

Query recent RTT samples.

Parameters:

Name Type Description Default
service_name str

service name

required
window_seconds float

query window

60.0

Returns:

Type Description
list[float]

List of RTT values

ThrottleLuaScripts

Lua scripts for Throttle Limit management.

All scripts execute atomically to prevent race conditions.

ServiceThrottleConfig dataclass

ServiceThrottleConfig(
    service_name: str,
    initial_limit: int = 100,
    min_limit: int = 10,
    max_limit: int = 1000,
    sla_warning_ms: int = 200,
    sla_critical_ms: int = 500,
)

Per-service Throttle configuration.

to_throttle_config

to_throttle_config() -> ThrottleConfig

Convert to ThrottleConfig.

ServiceThrottleState dataclass

ServiceThrottleState(
    service_name: str, throttle: AdaptiveThrottle
)

Per-service Throttle state info.

ThrottleRegistry

Per-service Throttle instance management registry.

Manages independent AdaptiveThrottle per external service. CB state changes are handled by CircuitBreakerHandlerMixin within each AdaptiveThrottle instance.

Implemented as a thread-safe singleton.

reset_instance classmethod

reset_instance() -> None

Reset singleton instance for test isolation.

register_service_config

register_service_config(
    config: ServiceThrottleConfig,
) -> None

Register per-service Throttle configuration.

Parameters:

Name Type Description Default
config ServiceThrottleConfig

Per-service configuration

required

get_throttle

get_throttle(service_name: str) -> AdaptiveThrottle

Get per-service Throttle instance (creates if absent).

Parameters:

Name Type Description Default
service_name str

Service name

required

Returns:

Type Description
AdaptiveThrottle

AdaptiveThrottle instance for the service

get_service_state

get_service_state(service_name: str) -> dict | None

Get current Throttle state for a service.

Parameters:

Name Type Description Default
service_name str

Service name

required

Returns:

Type Description
dict | None

State info dict or None

get_all_service_states

get_all_service_states() -> list[dict]

Get Throttle state for all services.

Returns:

Type Description
list[dict]

List of state info dicts

reset

reset() -> None

Reset registry (for testing).

RedisConnectionState

Bases: str, Enum

Redis connection state.

SafeOpenConfig dataclass

SafeOpenConfig(
    enabled: bool = True,
    health_check_interval_seconds: float = 5.0,
    failure_threshold: int = 3,
    safe_limit_save_interval_seconds: float = 60.0,
    safe_limit_max_age_seconds: int = 3600,
    default_fallback_limit: int = 50,
)

Safe-Open fallback settings.

SafeOpenFallbackManager

SafeOpenFallbackManager(
    config: SafeOpenConfig | None = None,
    redis_client: Any = None,
)

Safe-Open fallback manager.

Maintains service continuity on Redis failure by using the last known safe limit.

Implemented following the LocalMemoryRateLimiter pattern.

Initialize.

Parameters:

Name Type Description Default
config SafeOpenConfig | None

Safe-Open settings

None
redis_client Any

Redis client (optional)

None

set_redis_client

set_redis_client(redis_client: Any) -> None

Set the Redis client.

check_redis_health

check_redis_health() -> bool

Check the Redis connection state.

get_or_create_service_state

get_or_create_service_state(
    service_name: str, initial_limit: int
) -> ServiceSafeLimitState

Get or create per-service state.

update_safe_limit

update_safe_limit(
    service_name: str, limit: int, force_save: bool = False
) -> bool

Update the safe limit.

Saves to Redis periodically or on a significant change.

Parameters:

Name Type Description Default
service_name str

service name

required
limit int

new limit

required
force_save bool

whether to force a save

False

Returns:

Type Description
bool

Whether the Redis save succeeded

get_safe_limit

get_safe_limit(
    service_name: str, default_limit: int
) -> tuple[int, str]

Get the limit to use currently.

Gets it from Redis if Redis is healthy, otherwise from the local cache.

Parameters:

Name Type Description Default
service_name str

service name

required
default_limit int

default limit

required

Returns:

Type Description
tuple[int, str]

(limit value, source: "redis", "local_cache", "default")

get_redis_state

get_redis_state() -> RedisConnectionState

Query the global Redis state.

get_service_state

get_service_state(
    service_name: str,
) -> dict[str, Any] | None

Query the per-service state.

get_all_service_states

get_all_service_states() -> list[dict[str, Any]]

Query all service states.

force_disconnect

force_disconnect() -> None

Simulate a forced disconnection (for testing).

force_connect

force_connect() -> None

Simulate a forced reconnection (for testing).

reset

reset() -> None

Reset all state (for testing).

ServiceSafeLimitState dataclass

ServiceSafeLimitState(
    service_name: str,
    last_known_safe_limit: int,
    last_saved_at: float = time.time(),
    redis_state: RedisConnectionState = RedisConnectionState.CONNECTED,
    consecutive_failures: int = 0,
)

Per-service Safe Limit state.

BucketData

Bases: NamedTuple

Aggregated data of a single bucket.

RTTWindowStats dataclass

RTTWindowStats(
    avg_rtt_ms: float | None,
    min_rtt_ms: float | None,
    max_rtt_ms: float | None,
    total_samples: int,
    bucket_count: int,
    window_seconds: int,
    oldest_bucket_age_seconds: float | None,
)

RTT window statistics.

TimeBucketedGradientCalculator

TimeBucketedGradientCalculator(
    window_seconds: int = 10,
    min_samples_for_gradient: int = 3,
)

Time-Bucketed-based Gradient calculator.

Computes a time-based RTT Gradient using TimeBucketedRTTWindow. Uses bucket-based averages instead of the existing GradientCalculator's EMA approach.

Gradient calculation: - Compares the recent-half window average vs the previous-half window average - gradient > 0: RTT increasing (limit decrease needed) - gradient < 0: RTT decreasing (limit increase possible)

Parameters:

Name Type Description Default
window_seconds int

window size (seconds)

10
min_samples_for_gradient int

minimum number of samples needed for Gradient calculation

3

add_sample

add_sample(rtt_ms: float) -> None

Add an RTT sample.

get_gradient

get_gradient() -> float

Compute the current RTT Gradient.

Compares the averages of the recent-half window and the previous-half window.

Returns:

Type Description
float

gradient value:

float
  • 0: RTT increasing (limit decrease needed)

float
  • < 0: RTT decreasing (limit increase possible)
float
  • 0: stable or insufficient data

get_current_rtt

get_current_rtt() -> float | None

Return the current average RTT.

get_stats

get_stats() -> dict

Return statistics info.

reset

reset() -> None

Reset state.

TimeBucketedRTTWindow

TimeBucketedRTTWindow(window_seconds: int = 10)

Time-based RTT window.

Aggregates RTT into one bucket per second to provide memory-efficient time-based statistics. Uses array.array('f') to minimize memory usage.

Characteristics: - Fixed memory: window_seconds × 16 bytes (sum, count, min, max per bucket) - TPS-independent: summarizes once per second, covering a constant time range regardless of TPS - Circular buffer: old buckets are automatically overwritten

Parameters:

Name Type Description Default
window_seconds int

window size (seconds). Default 10 seconds.

10

add_sample

add_sample(rtt_ms: float) -> None

Add an RTT sample.

Multiple samples arriving in the same second accumulate in that bucket.

Parameters:

Name Type Description Default
rtt_ms float

RTT value (milliseconds)

required

get_stats

get_stats() -> RTTWindowStats

Compute RTT statistics within the window.

Returns:

Name Type Description
RTTWindowStats RTTWindowStats

average, min, max RTT and sample info

get_avg_rtt

get_avg_rtt() -> float | None

Return the average RTT of the current window.

get_bucket_data

get_bucket_data(bucket_idx: int) -> BucketData | None

Query the data of a specific bucket (for debugging).

Parameters:

Name Type Description Default
bucket_idx int

bucket index (0 ~ window_seconds-1)

required

Returns:

Type Description
BucketData | None

Bucket data or None (invalid bucket)

get_all_bucket_data

get_all_bucket_data() -> list[dict]

Query the data of all valid buckets (for debugging).

Returns:

Type Description
list[dict]

List of valid bucket data

reset

reset() -> None

Reset the window.

memory_usage_bytes

memory_usage_bytes() -> int

Compute memory usage (bytes).

Returns:

Type Description
int

Approximate memory usage

get_throttle_cb_bridge

get_throttle_cb_bridge(
    sla_warning_ms: int | None = None,
    sla_critical_ms: int | None = None,
) -> ThrottleCircuitBreakerBridge

Get the global ThrottleCircuitBreakerBridge instance.

Thread-safe singleton pattern.

Parameters:

Name Type Description Default
sla_warning_ms int | None

SLA Warning threshold (applied only on initial creation)

None
sla_critical_ms int | None

SLA Critical threshold (applied only on initial creation)

None

Returns:

Type Description
ThrottleCircuitBreakerBridge

ThrottleCircuitBreakerBridge instance

reset_throttle_cb_bridge

reset_throttle_cb_bridge() -> None

Reset the bridge (for testing).

get_throttle_dlq_integration

get_throttle_dlq_integration(
    config: ThrottleDLQConfig | None = None,
) -> ThrottleDLQIntegration

Global ThrottleDLQIntegration instance.

reset_throttle_dlq_integration

reset_throttle_dlq_integration() -> None

Reset for testing.

get_gradient_calculator

get_gradient_calculator(
    name: str = "default",
    smoothing_factor: float = _DEFAULT_SMOOTHING_FACTOR,
    sample_window_seconds: float = 10.0,
) -> GradientCalculator

Return a named GradientCalculator singleton.

Uses a Lock only on initial creation via the Double-Checked Locking pattern. Subsequent reads are Lock-free.

Parameters:

Name Type Description Default
name str

calculator name (service/endpoint identifier)

'default'
smoothing_factor float

EMA weight

_DEFAULT_SMOOTHING_FACTOR
sample_window_seconds float

sample window

10.0

Returns:

Type Description
GradientCalculator

GradientCalculator instance

reset_gradient_calculators

reset_gradient_calculators() -> None

Reset for testing. Removes all named GradientCalculator instances.

get_throttle_limit_adjuster

get_throttle_limit_adjuster() -> ThrottleLimitAdjuster

Return the ThrottleLimitAdjuster singleton.

reset_throttle_limit_adjuster

reset_throttle_limit_adjuster() -> None

Reset the singleton (calls close() before clearing).

get_recovery_dampening_manager

get_recovery_dampening_manager(
    config: RecoveryDampeningConfig | None = None,
    on_limit_change: (
        Callable[[str, int], None] | None
    ) = None,
) -> RecoveryDampeningManager

Global RecoveryDampeningManager instance.

reset_recovery_dampening_manager

reset_recovery_dampening_manager() -> None

Reset for testing.

get_throttle_registry

get_throttle_registry() -> ThrottleRegistry

Get global ThrottleRegistry instance.

Thread-safe singleton pattern.

Returns:

Type Description
ThrottleRegistry

ThrottleRegistry instance

reset_throttle_registry

reset_throttle_registry() -> None

Reset registry (for testing).

get_safe_open_fallback_manager

get_safe_open_fallback_manager(
    config: SafeOpenConfig | None = None,
    redis_client: Any = None,
) -> SafeOpenFallbackManager

Global SafeOpenFallbackManager instance.

reset_safe_open_fallback_manager

reset_safe_open_fallback_manager() -> None

Reset for testing.