baldur_pro.services.throttle — Adaptive Throttle
Adaptive request throttling: ThrottleConfig, AdaptiveThrottle, and the
ThrottlePolicy surface.
🔒 PRO Feature — requires a baldur-pro license
These symbols ship in the baldur-pro distribution. PRO modules import
normally — there is no ImportError. PRO features activate only when
baldur.init() runs with a valid BALDUR_LICENSE_KEY; without it the system
runs with OSS defaults and register_pro_services() logs
entitlement.pro_registration_skipped.
throttle
Throttle Services for Baldur System.
Provides framework-agnostic throttling with: - Base throttle logic (sliding window, token bucket) - Netflix Gradient Adaptive Throttling - DRF adapter for Django integration
Usage
from baldur_pro.services.throttle import AdaptiveThrottle, ThrottleConfig
throttle = AdaptiveThrottle( config=ThrottleConfig( initial_limit=100, min_limit=10, max_limit=500, ) )
Check if request is allowed
allowed, info = throttle.check("user_123")
Record response time for gradient calculation
throttle.record_response(response_time_ms=45.2)
Status: Public
AdaptiveThrottle
AdaptiveThrottle(config: ThrottleConfig | None = None)
Bases: CircuitBreakerHandlerMixin, RateLimitHandlerMixin, GovernanceEventMixin, ErrorBudgetHandlerMixin, EmergencyModeMixin, FullStopMixin, RecoveryDampeningMixin, ThrottleDLQReplayMixin, SlidingWindowThrottle
Netflix Gradient-based Adaptive Throttle.
Dynamically adjusts rate limits based on response time trends. Extends SlidingWindowThrottle with gradient-based limit adjustment. Includes DLQ integration for throttle rejection storage and recovery replay.
Emergency Mode integration: - Automatically adjusts limit based on Emergency Level - At LEVEL_3, gradient calculation continues but application is frozen - Emergency multiplier applied as Hard-Cap
Governance integration: - Lazy import of check_all_governance for Kill Switch / Emergency / Error Budget / Break Glass checks (Fail-Open when ENT absent) - Kill Switch EventBus subscription for immediate gradient freeze - Break Glass activation releases Full Stop and starts Recovery Dampening - check_all_governance() Safety Net check in _maybe_adjust_limit() Control Plane
conservative_limit
property
conservative_limit: int
Conservative limit with Min-Winner policy.
Returns the lowest among RTT-based limit, 429-based limit, and Error Budget-based limit.
record_response
record_response(rtt_ms: float) -> None
Record response time and potentially adjust limit.
Call this after each successful request with the response time.
Also emits THROTTLE_LIMIT_RECOVERED event when limit recovers from min_limit.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
rtt_ms
|
float
|
Response time in milliseconds |
required |
check
check(
key: str,
tier_id: str = "standard",
context: dict | None = None,
store_rejection: bool = True,
) -> ThrottleResult
Check if request is allowed with adaptive info and priority protection.
On rejection with DLQ auto-storage: - If context is provided and store_rejection=True, rejected requests are stored to DLQ - store_rejection=False skips DLQ storage (prevents cycles during Replay rejection)
Error Budget Gate integration: - In ERROR_BUDGET_CRITICAL state, additionally rejects non_essential tier
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
key
|
str
|
Request identifier |
required |
tier_id
|
str
|
Request tier (critical/standard/non_essential) |
'standard'
|
context
|
dict | None
|
Request context (for DLQ storage, domain/tier_id/trace_id etc.) |
None
|
store_rejection
|
bool
|
Whether to store rejection to DLQ (default True) |
True
|
Returns:
| Type | Description |
|---|---|
ThrottleResult
|
ThrottleResult with adaptive info |
allow_request
allow_request(ident: str) -> ThrottleResult
OSS-facing wrapper used by DRF AdaptiveDRFThrottle bridge.
Delegates to check() with default tier/context. Identifier maps
directly to the rate-limit key. PRO callers should prefer check()
for tiered / context-aware rejection-storage flows.
get_status
get_status(key: str) -> ThrottleResult
Return current throttle state for key without consuming a slot.
Used by DRF AdaptiveDRFThrottle.wait() to compute reset_at-based
backoff after a 429. Reads the same sliding-window counters as
check() but does not mutate them.
close
close() -> None
Unsubscribe all EventBus handlers.
get_stats
get_stats() -> dict
Get adaptive throttle statistics.
reset_all
reset_all() -> None
Reset all state including gradient calculator and emergency state.
rollback_to_base_limit
rollback_to_base_limit() -> int
Immediately rollback to pre-emergency base limit.
Returns:
| Type | Description |
|---|---|
int
|
The rolled-back limit |
get_config_snapshot
get_config_snapshot() -> dict
Return current throttle config state snapshot.
Used for audit logging and config change tracking.
Returns:
| Type | Description |
|---|---|
dict
|
Current config state dictionary |
swap_config
swap_config(new_config: ThrottleConfig) -> ThrottleConfig
Atomically swap the Config object.
Reference assignment is atomic under the GIL, so this is safe even while _maybe_adjust_limit() is running. Derived state (_current_limit, _base_limit_before_emergency, etc.) is not changed (SLA values swap only).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
new_config
|
ThrottleConfig
|
New ThrottleConfig (created via model_copy etc.) |
required |
Returns:
| Type | Description |
|---|---|
ThrottleConfig
|
Previous config object (for rollback retention) |
GradientCalculator
GradientCalculator(
smoothing_factor: float = 0.5,
sample_window_seconds: float = 10.0,
min_samples: int = 3,
)
RTT Gradient calculator — based on Exponential Moving Average.
Use cases: 1. AdaptiveThrottle: the original use case (dynamic limit adjustment) 2. TrafficGate/AdmissionControl: providing the expected processing time for Dynamic Fast-Fail
Positive gradient = RTT increasing (overload) Negative gradient = RTT decreasing (recovery) Zero gradient = stable
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
smoothing_factor
|
float
|
Weight for new samples (0-1, higher = more reactive) |
0.5
|
sample_window_seconds
|
float
|
How far back to look for samples |
10.0
|
min_samples
|
int
|
Minimum samples needed for gradient calculation |
3
|
add_sample
add_sample(rtt_ms: float) -> None
Add a new RTT sample.
get_gradient
get_gradient() -> float
Calculate current RTT gradient.
Returns:
| Type | Description |
|---|---|
float
|
Gradient value: |
float
|
|
float
|
|
float
|
|
get_current_rtt
get_current_rtt() -> float | None
Get current smoothed RTT.
get_snapshot
get_snapshot() -> tuple[float | None, float]
Return the current RTT + gradient within a single Lock.
Reduces Lock acquisitions from 2 → 1 to improve hot-path performance.
Returns:
| Type | Description |
|---|---|
tuple[float | None, float]
|
(current_rtt_ms, gradient) tuple |
get_stats
get_stats() -> dict
Get calculator statistics.
reset
reset() -> None
Reset calculator state.
BaseThrottle
BaseThrottle(config: ThrottleConfig | None = None)
Bases: ABC
Abstract base class for throttle implementations.
current_limit
property
writable
current_limit: int
Get current rate limit.
check
abstractmethod
check(key: str) -> ThrottleResult
Check if request is allowed.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
key
|
str
|
Unique identifier (user_id, ip, etc.) |
required |
Returns:
| Type | Description |
|---|---|
ThrottleResult
|
ThrottleResult with allowed status and metadata |
reset
abstractmethod
reset(key: str) -> None
Reset throttle state for a key.
get_stats
get_stats() -> dict
Get throttle statistics.
SlidingWindowThrottle
SlidingWindowThrottle(config: ThrottleConfig | None = None)
Bases: BaseThrottle
In-memory sliding window throttle.
Thread-safe implementation for single-process use. For distributed systems, use Redis-based implementation.
check
check(key: str) -> ThrottleResult
Check if request is allowed using sliding window.
reset
reset(key: str) -> None
Reset throttle state for a key.
reset_all
reset_all() -> None
Reset all throttle state (for testing).
get_stats
get_stats() -> dict
Get throttle statistics.
RTTMetrics
dataclass
RTTMetrics(
current_rtt_ms: float | None = None,
smoothed_rtt_ms: float | None = None,
gradient: float = 0.0,
sample_count: int = 0,
last_updated: datetime | None = None,
)
RTT-related metrics.
RTTSeverity
Bases: str, Enum
RTT severity level.
ServiceHealthMetrics
dataclass
ServiceHealthMetrics(
service_name: str,
rtt: RTTMetrics = RTTMetrics(),
cb_state: str = "closed",
cb_failure_count: int = 0,
cb_success_count: int = 0,
throttle_current_limit: int | None = None,
sla_warning_triggered: bool = False,
sla_critical_triggered: bool = False,
)
Aggregated service health metrics.
ThrottleCircuitBreakerBridge
ThrottleCircuitBreakerBridge(
sla_warning_ms: int | None = None,
sla_critical_ms: int | None = None,
gradient_warning_threshold: float | None = None,
)
Data sharing bridge between Throttle and Circuit Breaker.
Uses RTT data for CB failure detection and reflects CB state in Throttle limit adjustment.
Initialize.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
sla_warning_ms
|
int | None
|
SLA Warning threshold (ms) |
None
|
sla_critical_ms
|
int | None
|
SLA Critical threshold (ms) |
None
|
gradient_warning_threshold
|
float | None
|
Gradient warning threshold |
None
|
record_rtt
record_rtt(
service_name: str,
rtt_ms: float,
gradient: float | None = None,
) -> RTTSeverity
Record RTT data and CB feedback.
If RTT exceeds the threshold, a failure signal may be sent to the CB.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
service_name
|
str
|
service name |
required |
rtt_ms
|
float
|
response time (ms) |
required |
gradient
|
float | None
|
RTT rate of change (optional) |
None
|
Returns:
| Type | Description |
|---|---|
RTTSeverity
|
RTT severity level |
record_success
record_success(
service_name: str, rtt_ms: float | None = None
) -> None
Record a successful request (RTT normal).
If RTT is within the normal range, it is counted as a CB success.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
service_name
|
str
|
service name |
required |
rtt_ms
|
float | None
|
response time (optional) |
None
|
get_cb_state
get_cb_state(service_name: str) -> str | None
Query the service's CB state.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
service_name
|
str
|
service name |
required |
Returns:
| Type | Description |
|---|---|
str | None
|
CB state ("closed", "open", "half_open") or None |
get_cb_info
get_cb_info(service_name: str) -> dict[str, Any] | None
Query the service's CB info (detailed).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
service_name
|
str
|
service name |
required |
Returns:
| Type | Description |
|---|---|
dict[str, Any] | None
|
CB state info dictionary or None |
should_record_as_cb_failure
should_record_as_cb_failure(
service_name: str,
rtt_ms: float,
consecutive_critical_count: int = 3,
) -> bool
Determine whether to record as a CB failure based on RTT.
If the consecutive Critical count exceeds the threshold, record as a CB failure.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
service_name
|
str
|
service name |
required |
rtt_ms
|
float
|
response time (ms) |
required |
consecutive_critical_count
|
int
|
consecutive Critical count threshold |
3
|
Returns:
| Type | Description |
|---|---|
bool
|
Whether it should be recorded as a CB failure |
get_service_health
get_service_health(
service_name: str,
) -> ServiceHealthMetrics
Query aggregated service health metrics.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
service_name
|
str
|
service name |
required |
Returns:
| Type | Description |
|---|---|
ServiceHealthMetrics
|
Aggregated metrics |
get_all_service_health
get_all_service_health() -> list[ServiceHealthMetrics]
Query health status of all services.
Returns:
| Type | Description |
|---|---|
list[ServiceHealthMetrics]
|
List of per-service metrics |
reset
reset() -> None
Reset the bridge (for testing).
ThrottleConfig
Bases: BaseSettings
Netflix Gradient-based adaptive throttle settings.
Dynamically adjusts request limits based on RTT gradient.
validate_max_limit
classmethod
validate_max_limit(v: int, info) -> int
Ensure max_limit is greater than min_limit.
validate_sla_critical
classmethod
validate_sla_critical(v: int, info) -> int
Ensure sla_critical_ms is greater than sla_warning_ms.
from_dict
classmethod
from_dict(data: dict) -> ThrottleSettings
Create settings from dictionary (backward compat with ThrottleConfig).
from_settings
classmethod
from_settings() -> ThrottleSettings
Create from current settings (backward compat with ThrottleConfig).
ThrottleDeniedRequest
dataclass
ThrottleDeniedRequest(
service_name: str,
request_key: str,
request_data: dict[str, Any],
denied_at: datetime = (lambda: utc_now())(),
reason: str = "rate_limit_exceeded",
dlq_entry_id: str | None = None,
)
Information about a request denied by Throttle.
ThrottleDLQConfig
dataclass
ThrottleDLQConfig(
enabled: bool = True,
dlq_domain: str = "throttle",
auto_replay_on_cb_close: bool = True,
replay_batch_size: int = 50,
replay_interval_seconds: float = 5.0,
)
Throttle DLQ integration settings.
ThrottleDLQIntegration
ThrottleDLQIntegration(
config: ThrottleDLQConfig | None = None,
)
Bases: EventEmitterMixin
Manages integration between Throttle and DLQ.
Stores in the DLQ when a Throttle deny occurs, and automatically replays on receiving a CB CLOSE event.
store_denied_request
store_denied_request(
service_name: str,
request_key: str,
request_data: dict[str, Any],
throttle_limit: int,
current_count: int,
) -> ThrottleDeniedRequest | None
Store a Throttle-denied request in the DLQ.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
service_name
|
str
|
service name |
required |
request_key
|
str
|
rate limit key (IP + user combination) |
required |
request_data
|
dict[str, Any]
|
request data |
required |
throttle_limit
|
int
|
current throttle limit |
required |
current_count
|
int
|
current request count |
required |
Returns:
| Type | Description |
|---|---|
ThrottleDeniedRequest | None
|
Stored request info or None (when disabled/failed) |
replay_denied_requests
replay_denied_requests(
service_name: str | None = None,
batch_size: int | None = None,
) -> dict[str, Any]
Replay denied requests stored in the DLQ.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
service_name
|
str | None
|
replay only a specific service (None for all) |
None
|
batch_size
|
int | None
|
batch size |
None
|
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
replay result statistics |
on_circuit_breaker_closed
on_circuit_breaker_closed(
service_name: str,
) -> dict[str, Any]
CB CLOSE event handler.
When the CB closes, automatically replays the denied requests of that service.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
service_name
|
str
|
service name |
required |
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
replay result |
get_pending_count
get_pending_count(service_name: str) -> int
Query the per-service pending request count.
get_all_pending_counts
get_all_pending_counts() -> dict[str, int]
Query the pending request count for all services.
ThrottleDLQSink
ThrottleDLQSink(dlq_integration: Any | None = None)
Sink that stores Throttle-rejected requests in the DLQ.
Separates AdaptiveThrottle's ThrottleDLQReplayMixin and _auto_store_rejection_to_dlq() into an independent component.
Fail-Open: ignore when the DLQ module is not installed or storage fails.
Initialize.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
dlq_integration
|
Any | None
|
ThrottleDLQIntegration instance. If None, uses the global instance via lazy import. |
None
|
handle_rejection
handle_rejection(
context: dict[str, Any], reason: str
) -> None
Store a rejected request in the DLQ.
Same behavior as AdaptiveThrottle._auto_store_rejection_to_dlq(). Extracts domain, order_id, user_id, etc. from context and records them in the DLQ.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
context
|
dict[str, Any]
|
request context (domain, order_id, tier_id, etc.) |
required |
reason
|
str
|
rejection reason ("rate_limit_exceeded", etc.) |
required |
AdaptiveThrottleFacade
AdaptiveThrottleFacade(
policy: Any,
guards: list[Any] | None = None,
limit_adjuster: Any | None = None,
sinks: list[Any] | None = None,
)
Facade that preserves the legacy check()/record_response() API.
Internally delegates to ThrottlePolicy.
Usage example::
facade = AdaptiveThrottleFacade(
policy=throttle_policy,
guards=[governance_guard, full_stop_guard],
sinks=[dlq_sink],
)
result = facade.check("user_123")
facade.record_response(rtt_ms=45.2)
Initialize.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
policy
|
Any
|
ThrottlePolicy instance |
required |
guards
|
list[Any] | None
|
list of Guards (ThrottleGovernanceGuard, FullStopGuard, etc.) |
None
|
limit_adjuster
|
Any | None
|
ThrottleLimitAdjuster (EventBus limit adjustment component) |
None
|
sinks
|
list[Any] | None
|
list of components handling rejections, such as ThrottleDLQSink |
None
|
current_limit
property
writable
current_limit: int
Current rate limit (delegates to ThrottlePolicy).
check
check(
key: str,
tier_id: str = "standard",
context: dict[str, Any] | None = None,
store_rejection: bool = True,
) -> ThrottleResult
Identical signature to the original AdaptiveThrottle.check() API.
Execution order: 1. Check Guards in order (return immediately if any rejects) 2. Rate limit check of the ThrottlePolicy internal engine 3. DLQ Sink handling on rejection
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
key
|
str
|
request identifier (user_id, ip, etc.) |
required |
tier_id
|
str
|
request tier ("critical"/"standard"/"non_essential") |
'standard'
|
context
|
dict[str, Any] | None
|
request context (for DLQ storage, domain/order_id, etc.) |
None
|
store_rejection
|
bool
|
whether to store in the DLQ on rejection |
True
|
Returns:
| Type | Description |
|---|---|
ThrottleResult
|
ThrottleResult — allow/reject decision + adaptive info |
record_response
record_response(rtt_ms: float) -> None
Record RTT + dynamically adjust limit based on Gradient.
Delegates the same behavior as the original AdaptiveThrottle.record_response() to ThrottlePolicy.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
rtt_ms
|
float
|
response time (milliseconds) |
required |
get_stats
get_stats() -> dict[str, Any]
Return Throttle statistics.
RTTSample
dataclass
RTTSample(timestamp: float, rtt_ms: float)
Single RTT sample.
ThrottleLimitAdjuster
ThrottleLimitAdjuster()
Component that receives EventBus system-wide events and adjusts the ThrottlePolicy limit.
Consolidates the four EventBus subscription logics separated from AdaptiveThrottle: - RATE_LIMIT_429 → decrease limit - RATE_LIMIT_COOLDOWN_END → start Recovery Dampening - ERROR_BUDGET_WARNING/CRITICAL → decrease limit - ERROR_BUDGET_RECOVERED → start Recovery Dampening - LOAD_SHEDDING_LEVEL_CHANGED → adjust limit - KILL_SWITCH_ACTIVATED → Gradient Freeze - KILL_SWITCH_DEACTIVATED → start Recovery Dampening
Same lifecycle pattern as HedgingConfigUpdateHook (hedging.py): register(policy) → start() → begin EventBus subscription
register
register(policy: Any) -> None
Register a ThrottlePolicy as a limit adjustment target.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
policy
|
Any
|
ThrottlePolicy instance. Must expose reduce_limit_for_429(), adjust_limit_for_error_budget(), adjust_limit_for_shedding(), start_recovery_dampening(), and the gradient_frozen setter. |
required |
start
start() -> None
Begin EventBus subscription.
Does nothing in an environment without EventBus (Fail-Open). Ignores duplicate calls.
close
close() -> None
Unsubscribe all EventBus handlers and release resources.
Idempotent: safe to call multiple times.
ThrottlePolicy
ThrottlePolicy(
config: ThrottleConfig | None = None,
gradient_calculator: Any | None = None,
dampening_manager: Any | None = None,
)
Bases: ResiliencePolicy[T]
Pure Throttle Policy — wraps SlidingWindowThrottle + GradientCalculator.
Removes all 13 hardcoded external dependencies (Emergency, CB, ErrorBudget, KillSwitch, LoadShedding, DLQ, Prometheus, Audit, etc.) and performs only rate limit checks + gradient-based limit adjustment.
Consumers optionally register Guards/Hooks/Sinks via PolicyComposer.add_guard()/add_hook()/add_sink().
Usage example::
from baldur_pro.services.throttle.policy import ThrottlePolicy
policy = ThrottlePolicy(config=ThrottleConfig(initial_limit=100))
result = policy.execute(call_api, order_id=123)
Initialize.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
config
|
ThrottleConfig | None
|
Throttle settings (ThrottleSettings compatible). |
None
|
gradient_calculator
|
Any | None
|
GradientCalculator instance. If None, created by default with config.smoothing_factor. |
None
|
dampening_manager
|
Any | None
|
RecoveryDampeningManager instance. If None, no Dampening Cap is applied (gradient freely adjusts up to max_limit). |
None
|
name
property
name: str
Policy identifier.
current_limit
property
writable
current_limit: int
Current rate limit.
gradient_frozen
property
writable
gradient_frozen: bool
Whether Gradient adjustment is frozen.
execute
execute(
func: Callable[..., T],
*args: Any,
context: PolicyContext | None = None,
**kwargs: Any
) -> PolicyResult[T]
Run the function after a Throttle check.
When the rate limit is passed, executes the function, measures RTT, and automatically adjusts the limit based on the gradient.
throttle key resolution order: 1. context.extra["throttle_key"] (per-request specification) 2. config.service_name (per-Policy-instance default) 3. "default" (final fallback)
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
func
|
Callable[..., T]
|
function to execute |
required |
*args
|
Any
|
positional arguments for the function |
()
|
context
|
PolicyContext | None
|
PolicyContext (throttle_key, tier_id, etc.) |
None
|
**kwargs
|
Any
|
keyword arguments for the function |
{}
|
Returns:
| Type | Description |
|---|---|
PolicyResult[T]
|
PolicyResult — function execution result on pass, REJECTED on rejection |
check
check(key: str) -> ThrottleResult
Pure rate limit check (used without execute()).
Used by AdaptiveThrottleFacade for legacy API compatibility.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
key
|
str
|
request identifier |
required |
Returns:
| Type | Description |
|---|---|
ThrottleResult
|
ThrottleResult — allow/reject decision |
record_response
record_response(rtt_ms: float) -> None
Record RTT + dynamic gradient-based limit adjustment.
Used when executing the function directly outside execute(). Supports AdaptiveThrottleFacade's legacy record_response() API.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
rtt_ms
|
float
|
response time (milliseconds) |
required |
reduce_limit_for_429
reduce_limit_for_429(event: Any) -> None
Limit decrease triggered by a 429 Rate Limit event.
Called by ThrottleLimitAdjuster on receiving an EventBus RATE_LIMIT_429 event. Same decrease logic as AdaptiveThrottle._handle_rate_limit_429().
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
event
|
Any
|
EventBus event (refers to data.reduction_percent) |
required |
adjust_limit_for_error_budget
adjust_limit_for_error_budget(multiplier: float) -> None
Limit adjustment triggered by an Error Budget event.
Called by ThrottleLimitAdjuster on receiving an ERROR_BUDGET_WARNING/CRITICAL event.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
multiplier
|
float
|
multiplier to apply (0.8=20% decrease, 0.5=50% decrease) |
required |
adjust_limit_for_shedding
adjust_limit_for_shedding(suggested_limit: int) -> None
Limit adjustment triggered by a Load Shedding event.
Called by ThrottleLimitAdjuster on receiving a LOAD_SHEDDING_LEVEL_CHANGED event.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
suggested_limit
|
int
|
suggested limit (applied if lower than the current limit) |
required |
start_recovery_dampening
start_recovery_dampening(
target_limit: int | None = None,
) -> None
Start Recovery Dampening.
Called by ThrottleLimitAdjuster on receiving a KILL_SWITCH_DEACTIVATED or ERROR_BUDGET_RECOVERED event.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
target_limit
|
int | None
|
recovery target limit. If None, uses config.initial_limit. |
None
|
RecoveryDampeningConfig
dataclass
RecoveryDampeningConfig(
enabled: bool = True,
phase_1_ratio: float = 0.8,
phase_2_ratio: float = 0.9,
phase_1_duration_seconds: float = 30.0,
phase_2_duration_seconds: float = 30.0,
)
Recovery Dampening settings.
RecoveryDampeningManager
RecoveryDampeningManager(
config: RecoveryDampeningConfig | None = None,
on_limit_change: (
Callable[[str, int], None] | None
) = None,
)
Recovery Dampening manager.
Manages gradual recovery on CB CLOSE or emergency mode release. The Gradient algorithm keeps computing in the background; only application is deferred.
Initialize.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
config
|
RecoveryDampeningConfig | None
|
dampening settings |
None
|
on_limit_change
|
Callable[[str, int], None] | None
|
callback invoked on limit change (service_name, new_limit) |
None
|
start_recovery
start_recovery(
service_name: str, target_limit: int, current_limit: int
) -> int
Start recovery and return the Phase 1 limit.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
service_name
|
str
|
service name |
required |
target_limit
|
int
|
final target limit |
required |
current_limit
|
int
|
current limit (before recovery) |
required |
Returns:
| Type | Description |
|---|---|
int
|
Phase 1 limit (80% of target) |
cancel_recovery
cancel_recovery(service_name: str) -> None
Cancel recovery (on a new failure).
is_recovery_active
is_recovery_active(service_name: str) -> bool
Check whether recovery is in progress.
get_current_multiplier
get_current_multiplier(service_name: str) -> float
Return the multiplier of the current recovery phase.
store_pending_gradient_limit
store_pending_gradient_limit(
service_name: str, gradient_limit: int
) -> None
Store the Gradient calculation result (deferred application).
During recovery the Gradient algorithm keeps computing, but the limit is applied only after recovery completes.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
service_name
|
str
|
service name |
required |
gradient_limit
|
int
|
Gradient-computed limit |
required |
get_recovery_state
get_recovery_state(
service_name: str,
) -> dict[str, Any] | None
Query the service recovery state.
get_all_recovery_states
get_all_recovery_states() -> list[dict[str, Any]]
Query all recovery states.
reset
reset() -> None
Reset all state (for testing).
RecoveryPhase
Bases: str, Enum
Recovery phase.
ServiceRecoveryState
dataclass
ServiceRecoveryState(
service_name: str,
target_limit: int,
current_phase: RecoveryPhase = RecoveryPhase.PHASE_1,
phase_started_at: float = time.time(),
is_active: bool = True,
pending_gradient_limit: int | None = None,
)
Per-service recovery state.
RedisThrottleLimitManager
RedisThrottleLimitManager(
redis_client: Any,
key_prefix: str = "baldur:",
default_ttl_seconds: int = 3600,
)
Redis-based Throttle Limit manager.
Uses Lua scripts to perform atomic limit updates.
Initialize.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
redis_client
|
Any
|
Redis client |
required |
key_prefix
|
str
|
Redis key prefix |
'baldur:'
|
default_ttl_seconds
|
int
|
default TTL (seconds) |
3600
|
update_limit_atomic
update_limit_atomic(
service_name: str,
new_limit: int,
min_limit: int,
max_limit: int,
save_as_safe: bool = False,
) -> tuple[int, int]
Atomic limit update.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
service_name
|
str
|
service name |
required |
new_limit
|
int
|
new limit |
required |
min_limit
|
int
|
minimum limit |
required |
max_limit
|
int
|
maximum limit |
required |
save_as_safe
|
bool
|
whether to save as the last safe limit |
False
|
Returns:
| Type | Description |
|---|---|
tuple[int, int]
|
(previous limit, new limit) |
update_limit_cas
update_limit_cas(
service_name: str,
expected_current: int,
new_limit: int,
min_limit: int,
max_limit: int,
) -> tuple[bool, int, str]
Conditional limit update (Compare-And-Swap).
Updates only when the current value matches expected_current.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
service_name
|
str
|
service name |
required |
expected_current
|
int
|
expected current value |
required |
new_limit
|
int
|
new limit |
required |
min_limit
|
int
|
minimum limit |
required |
max_limit
|
int
|
maximum limit |
required |
Returns:
| Type | Description |
|---|---|
tuple[bool, int, str]
|
(success, actual current value, message) |
load_safe_limit
load_safe_limit(
service_name: str,
default_limit: int,
max_age_seconds: int = 3600,
) -> tuple[int, str]
Load the last safe limit on Cold Start.
Uses the last safe limit when there is no current limit.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
service_name
|
str
|
service name |
required |
default_limit
|
int
|
default limit |
required |
max_age_seconds
|
int
|
maximum valid age of the safe limit |
3600
|
Returns:
| Type | Description |
|---|---|
tuple[int, str]
|
(limit value, source: "CURRENT", "SAFE", "DEFAULT") |
save_safe_limit
save_safe_limit(service_name: str, limit: int) -> bool
Save the last safe limit.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
service_name
|
str
|
service name |
required |
limit
|
int
|
limit to save |
required |
Returns:
| Type | Description |
|---|---|
bool
|
Whether it succeeded |
get_current_limit
get_current_limit(service_name: str) -> int | None
Query the current limit.
add_rtt_sample
add_rtt_sample(
service_name: str,
rtt_ms: float,
window_seconds: float = 60.0,
max_samples: int = 100,
) -> int
Add an RTT sample.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
service_name
|
str
|
service name |
required |
rtt_ms
|
float
|
RTT (ms) |
required |
window_seconds
|
float
|
sample retention time |
60.0
|
max_samples
|
int
|
maximum number of samples |
100
|
Returns:
| Type | Description |
|---|---|
int
|
Current number of samples |
get_rtt_samples
get_rtt_samples(
service_name: str, window_seconds: float = 60.0
) -> list[float]
Query recent RTT samples.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
service_name
|
str
|
service name |
required |
window_seconds
|
float
|
query window |
60.0
|
Returns:
| Type | Description |
|---|---|
list[float]
|
List of RTT values |
ThrottleLuaScripts
Lua scripts for Throttle Limit management.
All scripts execute atomically to prevent race conditions.
ServiceThrottleConfig
dataclass
ServiceThrottleConfig(
service_name: str,
initial_limit: int = 100,
min_limit: int = 10,
max_limit: int = 1000,
sla_warning_ms: int = 200,
sla_critical_ms: int = 500,
)
Per-service Throttle configuration.
to_throttle_config
to_throttle_config() -> ThrottleConfig
Convert to ThrottleConfig.
ServiceThrottleState
dataclass
ServiceThrottleState(
service_name: str, throttle: AdaptiveThrottle
)
Per-service Throttle state info.
ThrottleRegistry
Per-service Throttle instance management registry.
Manages independent AdaptiveThrottle per external service. CB state changes are handled by CircuitBreakerHandlerMixin within each AdaptiveThrottle instance.
Implemented as a thread-safe singleton.
reset_instance
classmethod
reset_instance() -> None
Reset singleton instance for test isolation.
register_service_config
register_service_config(
config: ServiceThrottleConfig,
) -> None
Register per-service Throttle configuration.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
config
|
ServiceThrottleConfig
|
Per-service configuration |
required |
get_throttle
get_throttle(service_name: str) -> AdaptiveThrottle
Get per-service Throttle instance (creates if absent).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
service_name
|
str
|
Service name |
required |
Returns:
| Type | Description |
|---|---|
AdaptiveThrottle
|
AdaptiveThrottle instance for the service |
get_service_state
get_service_state(service_name: str) -> dict | None
Get current Throttle state for a service.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
service_name
|
str
|
Service name |
required |
Returns:
| Type | Description |
|---|---|
dict | None
|
State info dict or None |
get_all_service_states
get_all_service_states() -> list[dict]
Get Throttle state for all services.
Returns:
| Type | Description |
|---|---|
list[dict]
|
List of state info dicts |
reset
reset() -> None
Reset registry (for testing).
RedisConnectionState
Bases: str, Enum
Redis connection state.
SafeOpenConfig
dataclass
SafeOpenConfig(
enabled: bool = True,
health_check_interval_seconds: float = 5.0,
failure_threshold: int = 3,
safe_limit_save_interval_seconds: float = 60.0,
safe_limit_max_age_seconds: int = 3600,
default_fallback_limit: int = 50,
)
Safe-Open fallback settings.
SafeOpenFallbackManager
SafeOpenFallbackManager(
config: SafeOpenConfig | None = None,
redis_client: Any = None,
)
Safe-Open fallback manager.
Maintains service continuity on Redis failure by using the last known safe limit.
Implemented following the LocalMemoryRateLimiter pattern.
Initialize.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
config
|
SafeOpenConfig | None
|
Safe-Open settings |
None
|
redis_client
|
Any
|
Redis client (optional) |
None
|
set_redis_client
set_redis_client(redis_client: Any) -> None
Set the Redis client.
check_redis_health
check_redis_health() -> bool
Check the Redis connection state.
get_or_create_service_state
get_or_create_service_state(
service_name: str, initial_limit: int
) -> ServiceSafeLimitState
Get or create per-service state.
update_safe_limit
update_safe_limit(
service_name: str, limit: int, force_save: bool = False
) -> bool
Update the safe limit.
Saves to Redis periodically or on a significant change.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
service_name
|
str
|
service name |
required |
limit
|
int
|
new limit |
required |
force_save
|
bool
|
whether to force a save |
False
|
Returns:
| Type | Description |
|---|---|
bool
|
Whether the Redis save succeeded |
get_safe_limit
get_safe_limit(
service_name: str, default_limit: int
) -> tuple[int, str]
Get the limit to use currently.
Gets it from Redis if Redis is healthy, otherwise from the local cache.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
service_name
|
str
|
service name |
required |
default_limit
|
int
|
default limit |
required |
Returns:
| Type | Description |
|---|---|
tuple[int, str]
|
(limit value, source: "redis", "local_cache", "default") |
get_redis_state
get_redis_state() -> RedisConnectionState
Query the global Redis state.
get_service_state
get_service_state(
service_name: str,
) -> dict[str, Any] | None
Query the per-service state.
get_all_service_states
get_all_service_states() -> list[dict[str, Any]]
Query all service states.
force_disconnect
force_disconnect() -> None
Simulate a forced disconnection (for testing).
force_connect
force_connect() -> None
Simulate a forced reconnection (for testing).
reset
reset() -> None
Reset all state (for testing).
ServiceSafeLimitState
dataclass
ServiceSafeLimitState(
service_name: str,
last_known_safe_limit: int,
last_saved_at: float = time.time(),
redis_state: RedisConnectionState = RedisConnectionState.CONNECTED,
consecutive_failures: int = 0,
)
Per-service Safe Limit state.
BucketData
Bases: NamedTuple
Aggregated data of a single bucket.
RTTWindowStats
dataclass
RTTWindowStats(
avg_rtt_ms: float | None,
min_rtt_ms: float | None,
max_rtt_ms: float | None,
total_samples: int,
bucket_count: int,
window_seconds: int,
oldest_bucket_age_seconds: float | None,
)
RTT window statistics.
TimeBucketedGradientCalculator
TimeBucketedGradientCalculator(
window_seconds: int = 10,
min_samples_for_gradient: int = 3,
)
Time-Bucketed-based Gradient calculator.
Computes a time-based RTT Gradient using TimeBucketedRTTWindow. Uses bucket-based averages instead of the existing GradientCalculator's EMA approach.
Gradient calculation: - Compares the recent-half window average vs the previous-half window average - gradient > 0: RTT increasing (limit decrease needed) - gradient < 0: RTT decreasing (limit increase possible)
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
window_seconds
|
int
|
window size (seconds) |
10
|
min_samples_for_gradient
|
int
|
minimum number of samples needed for Gradient calculation |
3
|
add_sample
add_sample(rtt_ms: float) -> None
Add an RTT sample.
get_gradient
get_gradient() -> float
Compute the current RTT Gradient.
Compares the averages of the recent-half window and the previous-half window.
Returns:
| Type | Description |
|---|---|
float
|
gradient value: |
float
|
|
float
|
|
float
|
|
get_current_rtt
get_current_rtt() -> float | None
Return the current average RTT.
get_stats
get_stats() -> dict
Return statistics info.
reset
reset() -> None
Reset state.
TimeBucketedRTTWindow
TimeBucketedRTTWindow(window_seconds: int = 10)
Time-based RTT window.
Aggregates RTT into one bucket per second to provide memory-efficient time-based statistics. Uses array.array('f') to minimize memory usage.
Characteristics: - Fixed memory: window_seconds × 16 bytes (sum, count, min, max per bucket) - TPS-independent: summarizes once per second, covering a constant time range regardless of TPS - Circular buffer: old buckets are automatically overwritten
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
window_seconds
|
int
|
window size (seconds). Default 10 seconds. |
10
|
add_sample
add_sample(rtt_ms: float) -> None
Add an RTT sample.
Multiple samples arriving in the same second accumulate in that bucket.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
rtt_ms
|
float
|
RTT value (milliseconds) |
required |
get_stats
get_stats() -> RTTWindowStats
Compute RTT statistics within the window.
Returns:
| Name | Type | Description |
|---|---|---|
RTTWindowStats |
RTTWindowStats
|
average, min, max RTT and sample info |
get_avg_rtt
get_avg_rtt() -> float | None
Return the average RTT of the current window.
get_bucket_data
get_bucket_data(bucket_idx: int) -> BucketData | None
Query the data of a specific bucket (for debugging).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
bucket_idx
|
int
|
bucket index (0 ~ window_seconds-1) |
required |
Returns:
| Type | Description |
|---|---|
BucketData | None
|
Bucket data or None (invalid bucket) |
get_all_bucket_data
get_all_bucket_data() -> list[dict]
Query the data of all valid buckets (for debugging).
Returns:
| Type | Description |
|---|---|
list[dict]
|
List of valid bucket data |
reset
reset() -> None
Reset the window.
memory_usage_bytes
memory_usage_bytes() -> int
Compute memory usage (bytes).
Returns:
| Type | Description |
|---|---|
int
|
Approximate memory usage |
get_throttle_cb_bridge
get_throttle_cb_bridge(
sla_warning_ms: int | None = None,
sla_critical_ms: int | None = None,
) -> ThrottleCircuitBreakerBridge
Get the global ThrottleCircuitBreakerBridge instance.
Thread-safe singleton pattern.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
sla_warning_ms
|
int | None
|
SLA Warning threshold (applied only on initial creation) |
None
|
sla_critical_ms
|
int | None
|
SLA Critical threshold (applied only on initial creation) |
None
|
Returns:
| Type | Description |
|---|---|
ThrottleCircuitBreakerBridge
|
ThrottleCircuitBreakerBridge instance |
reset_throttle_cb_bridge
reset_throttle_cb_bridge() -> None
Reset the bridge (for testing).
get_throttle_dlq_integration
get_throttle_dlq_integration(
config: ThrottleDLQConfig | None = None,
) -> ThrottleDLQIntegration
Global ThrottleDLQIntegration instance.
reset_throttle_dlq_integration
reset_throttle_dlq_integration() -> None
Reset for testing.
get_gradient_calculator
get_gradient_calculator(
name: str = "default",
smoothing_factor: float = _DEFAULT_SMOOTHING_FACTOR,
sample_window_seconds: float = 10.0,
) -> GradientCalculator
Return a named GradientCalculator singleton.
Uses a Lock only on initial creation via the Double-Checked Locking pattern. Subsequent reads are Lock-free.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
calculator name (service/endpoint identifier) |
'default'
|
smoothing_factor
|
float
|
EMA weight |
_DEFAULT_SMOOTHING_FACTOR
|
sample_window_seconds
|
float
|
sample window |
10.0
|
Returns:
| Type | Description |
|---|---|
GradientCalculator
|
GradientCalculator instance |
reset_gradient_calculators
reset_gradient_calculators() -> None
Reset for testing. Removes all named GradientCalculator instances.
get_throttle_limit_adjuster
get_throttle_limit_adjuster() -> ThrottleLimitAdjuster
Return the ThrottleLimitAdjuster singleton.
reset_throttle_limit_adjuster
reset_throttle_limit_adjuster() -> None
Reset the singleton (calls close() before clearing).
get_recovery_dampening_manager
get_recovery_dampening_manager(
config: RecoveryDampeningConfig | None = None,
on_limit_change: (
Callable[[str, int], None] | None
) = None,
) -> RecoveryDampeningManager
Global RecoveryDampeningManager instance.
reset_recovery_dampening_manager
reset_recovery_dampening_manager() -> None
Reset for testing.
get_throttle_registry
get_throttle_registry() -> ThrottleRegistry
Get global ThrottleRegistry instance.
Thread-safe singleton pattern.
Returns:
| Type | Description |
|---|---|
ThrottleRegistry
|
ThrottleRegistry instance |
reset_throttle_registry
reset_throttle_registry() -> None
Reset registry (for testing).
get_safe_open_fallback_manager
get_safe_open_fallback_manager(
config: SafeOpenConfig | None = None,
redis_client: Any = None,
) -> SafeOpenFallbackManager
Global SafeOpenFallbackManager instance.
reset_safe_open_fallback_manager
reset_safe_open_fallback_manager() -> None
Reset for testing.