baldur.interfaces — Policy Composition & ML Strategy
The resilience-policy composition protocols (guards, hooks, failure sinks, the policy result/context DTOs) and the AI/ML strategy interfaces that back anomaly detection, forecasting, and classification.
Resilience policy
PolicyOutcome
Bases: str, Enum
Kinds of Policy execution result.
PolicyResult
dataclass
PolicyResult(
value: T | None = None,
outcome: PolicyOutcome = PolicyOutcome.SUCCESS,
error: Exception | None = None,
executed_policies: list[str] = list(),
total_attempts: int = 1,
total_duration_ms: float = 0.0,
metadata: dict[str, Any] = dict(),
)
Bases: Generic[T]
Unified result type for every resilience Policy.
Consolidates each pattern's existing result type into a single shape: - RetryResult(success, action, attempt, value, error, dlq_id) - FallbackResult(value, used_fallback, fallback_mode, original_error) - CircuitBreakerResult is for state management only and is not converted - BulkheadFullError exceptions are mapped to PolicyResult(outcome=REJECTED)
Attributes:
| Name | Type | Description |
|---|---|---|
value |
T | None
|
Result value (on success) |
outcome |
PolicyOutcome
|
Kind of execution result |
error |
Exception | None
|
Exception raised on failure |
executed_policies |
list[str]
|
Names of policies that ran |
total_attempts |
int
|
Total number of attempts |
total_duration_ms |
float
|
Total execution time (milliseconds) |
metadata |
dict[str, Any]
|
Per-pattern details (optional) |
success
property
success: bool
Whether the execution succeeded (including Fallback).
executed
property
executed: bool
Whether the policy pipeline was executed (always True if PolicyComposer ran).
rejected
property
rejected: bool
Whether the execution was rejected by a policy.
PolicyContext
dataclass
PolicyContext(
order_id: str | None = None,
payment_id: str | None = None,
user_id: str | None = None,
tier_id: str | None = None,
region: str | None = None,
domain: str = "",
trace_id: str | None = None,
extra: dict[str, Any] = dict(),
)
Policy pipeline execution context (immutable).
frozen=True prevents side-effects inside the pipeline. Use
dataclasses.replace() (or with_updates()) to produce a copy with
edited fields (Copy-on-Write).
Attributes:
| Name | Type | Description |
|---|---|---|
order_id |
str | None
|
Order identifier (read by DLQ sink as |
payment_id |
str | None
|
Payment identifier (no current named-field consumer;
decorator auto-extract still forwards it via |
user_id |
str | None
|
User identifier (read by DLQ sink as |
tier_id |
str | None
|
Service tier ("critical" | "standard" | "non_essential"). |
region |
str | None
|
Region identifier (input to ErrorBudgetGate). |
domain |
str
|
Domain identifier (mirrors |
trace_id |
str | None
|
Distributed-trace ID (OTel |
extra |
dict[str, Any]
|
Open-ended extension dict. Conventional keys:
- |
with_updates
with_updates(**kwargs: Any) -> PolicyContext
Copy-on-Write: return a new instance with the given fields replaced.
GuardResult
dataclass
GuardResult(
allowed: bool,
reason: str | None = None,
metadata: dict[str, Any] = dict(),
)
Guard validation result.
Attributes:
| Name | Type | Description |
|---|---|---|
allowed |
bool
|
Whether execution is allowed (True = pass, False = reject) |
reason |
str | None
|
Rejection reason (when allowed=False) |
metadata |
dict[str, Any]
|
Additional information (per-Guard details) |
ResiliencePolicy
Bases: Protocol[T]
Core Protocol implemented by synchronous resilience patterns.
Each Policy wraps a function and applies resilience logic. Composition of policies is handled by PolicyComposer.
Exception-handling contract: - Business exceptions are wrapped in PolicyResult(outcome=FAILURE, error=e) and returned - The "except Exception" pattern automatically lets KeyboardInterrupt/SystemExit through - BulkheadFullError is converted to PolicyResult(outcome=REJECTED) by the Policy wrapper
name
property
name: str
Policy identifier (e.g., 'retry', 'circuit_breaker', 'bulkhead').
execute
execute(
func: Callable[..., T],
*args: Any,
context: PolicyContext | None = None,
**kwargs: Any
) -> PolicyResult[T]
Wrap a function in the Policy and execute it.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
func
|
Callable[..., T]
|
Function to execute |
required |
*args
|
Any
|
Function positional arguments |
()
|
context
|
PolicyContext | None
|
Execution context (propagated to Guard/Hook/Sink) |
None
|
**kwargs
|
Any
|
Function keyword arguments |
{}
|
Returns:
| Type | Description |
|---|---|
PolicyResult[T]
|
PolicyResult[T]: Unified result. Does not raise. |
AsyncResiliencePolicy
Bases: Protocol[T]
Protocol implemented by asynchronous resilience patterns.
Current concrete implementation: AsyncSemaphoreBulkhead (async_semaphore.py). Follows the same exception-handling contract.
name
property
name: str
Policy identifier.
execute
async
execute(
func: Callable[..., T],
*args: Any,
context: PolicyContext | None = None,
**kwargs: Any
) -> PolicyResult[T]
Wrap an async function in the Policy and execute it.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
func
|
Callable[..., T]
|
Async function to execute |
required |
*args
|
Any
|
Function positional arguments |
()
|
context
|
PolicyContext | None
|
Execution context (propagated to Guard/Hook/Sink) |
None
|
**kwargs
|
Any
|
Function keyword arguments |
{}
|
Returns:
| Type | Description |
|---|---|
PolicyResult[T]
|
PolicyResult[T]: Unified result. Does not raise. |
PolicyGuard
Bases: Protocol
Pre-execution validation for a Policy.
Guard implementations must define the default behavior when context=None: - KillSwitchGuard: ignore context, only check global state - ErrorBudgetGuard: tier_id=None -> global decision (tier-agnostic) - RetryBudgetGuard: decide against the default budget
name
property
name: str
Guard identifier.
check
check(context: PolicyContext | None = None) -> GuardResult
Check whether execution is allowed.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
context
|
PolicyContext | None
|
Execution context. When None, only the global state is checked. |
None
|
Returns:
| Name | Type | Description |
|---|---|---|
GuardResult |
GuardResult
|
allowed=True passes, allowed=False rejects. |
PolicyHook
Bases: Protocol
Hook that observes Policy execution events.
Fail-Open principle: hook failures must not stop business logic.
on_execute
on_execute(
policy_name: str,
attempt: int,
context: PolicyContext | None = None,
) -> None
Called when execution starts.
on_success
on_success(
policy_name: str,
result: PolicyResult,
context: PolicyContext | None = None,
) -> None
Called on successful execution.
on_failure
on_failure(
policy_name: str,
error: Exception,
attempt: int,
context: PolicyContext | None = None,
) -> None
Called on failed execution.
on_retry
on_retry(
policy_name: str,
attempt: int,
delay: float,
context: PolicyContext | None = None,
) -> None
Called before a scheduled retry (not invoked on the final failure or when the budget is exhausted).
on_reject
on_reject(
guard_name: str,
reason: str,
context: PolicyContext | None = None,
) -> None
Called when a Guard rejects (Kill Switch, CB open, Bulkhead full, etc.).
FailureSink
Bases: Protocol
Interface that handles the terminal failure after every Policy is exhausted.
Performs final-failure handling such as DLQ persistence, error logging, alerts, etc.
handle_failure
handle_failure(
error: Exception,
context: PolicyContext | None,
policy_result: PolicyResult,
) -> str | None
Handle the terminal failure.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
error
|
Exception
|
Terminal failure exception |
required |
context
|
PolicyContext | None
|
PolicyContext (order_id, user_id, etc. needed for DLQ persistence) |
required |
policy_result
|
PolicyResult
|
Full pipeline result |
required |
Returns:
| Type | Description |
|---|---|
str | None
|
Failure record ID (e.g., DLQ ID) or None |
ML strategy
AnomalyDetectionStrategy
Bases: Protocol
Anomaly detection strategy - swappable between statistics / ML / deep learning.
Built-in
- ZScoreDetector: Z-Score based
- IQRDetector: IQR based
Consumer extension examples
- IsolationForestDetector: scikit-learn Isolation Forest
- AutoencoderDetector: PyTorch Autoencoder based
- ProphetDetector: Facebook Prophet based seasonal-aware detection
Used by
- PredictiveForecasterService (metric anomaly detection)
- CoOccurrenceTracker (co-occurrence frequency anomaly detection)
- CorruptionShield L3 (data anomaly detection)
detect
detect(
value: float, context: dict[str, Any] | None = None
) -> tuple[bool, float]
Decide whether a single value is anomalous.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
value
|
float
|
Value to check |
required |
context
|
dict[str, Any] | None
|
Multi-dimensional features to pass to the ML model (optional). Ignored by the built-in statistical strategies (ZScore, IQR). ML implementations extract the features they need from this dict. |
None
|
Returns:
| Type | Description |
|---|---|
(is_anomalous, score)
|
anomaly verdict and anomaly score. |
float
|
score does not need to be normalized -- strategies choose |
tuple[bool, float]
|
their own scale (ZScore, probability, etc.). |
update
update(
value: float, context: dict[str, Any] | None = None
) -> None
Add a training sample (online learning).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
value
|
float
|
New observation |
required |
context
|
dict[str, Any] | None
|
Multi-dimensional features to pass to the ML model (optional) |
None
|
reset
reset() -> None
Reset learning state.
get_feature_schema
get_feature_schema() -> dict[str, str] | None
Return the context key/type schema the strategy expects.
Returns:
| Type | Description |
|---|---|
dict[str, str] | None
|
Shape like {"service_name": "str", "cpu_usage": "float", ...}. |
dict[str, str] | None
|
None when no schema applies (statistical strategy). |
dict[str, str] | None
|
Used by the orchestrator to validate inputs in advance. |
ForecastStrategy
Bases: Protocol
Time-series forecasting strategy.
Built-in
- HoltLinearForecaster: double exponential smoothing
Consumer extension examples
- ProphetForecaster: Facebook Prophet
- LSTMForecaster: PyTorch LSTM
- ARIMAForecaster: statsmodels ARIMA
Used by
- PredictiveForecasterService (metric forecasting)
- CoOccurrenceTracker (co-occurrence frequency trend)
update
update(value: float) -> float
Update the model with a new observation.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
value
|
float
|
New observation |
required |
Returns:
| Type | Description |
|---|---|
float
|
Current level (smoothed value) |
predict
predict(steps_ahead: int = 1) -> float | None
Predict a future value.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
steps_ahead
|
int
|
Number of future steps to predict |
1
|
Returns:
| Type | Description |
|---|---|
float | None
|
Predicted value. None when there is insufficient data. |
get_confidence
get_confidence() -> float
Confidence of the current model (0.0 to 1.0).
ClassificationStrategy
Bases: Protocol
Classification strategy - event / spike type classification.
Built-in
- SpikeClassifier: rule-based classifier
Consumer extension examples
- RandomForestClassifier: scikit-learn RF
- XGBoostClassifier: XGBoost
- NeuralClassifier: PyTorch NN
Used by
- PredictiveForecasterService (spike classification)
- CorrelationEngine (event pattern classification)
classify
classify(
features: dict[str, float],
context: dict[str, Any] | None = None,
) -> tuple[str, float]
Feature vector -> class label + confidence.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
features
|
dict[str, float]
|
Feature name -> value mapping |
required |
context
|
dict[str, Any] | None
|
Additional metadata to pass to the ML model (optional) |
None
|
Returns:
| Type | Description |
|---|---|
(label, confidence)
|
classification label and confidence (0.0 to 1.0) |
BatchDetectable
Bases: Protocol
Batch anomaly detection marker.
ML implementations that support tensor batch operations implement this. Statistical strategies (ZScore, IQR) do not need to implement this.
Usage
- CorrelationEngineService (batch dispatch)
- MicroBatchConsumer (batch dispatch)
detect_batch
detect_batch(
values: list[float],
contexts: list[dict[str, Any] | None] | None = None,
) -> list[tuple[bool, float]]
Batch anomaly detection.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
values
|
list[float]
|
Values to check. |
required |
contexts
|
list[dict[str, Any] | None] | None
|
Per-value metadata (optional). |
None
|
Returns:
| Type | Description |
|---|---|
list[tuple[bool, float]]
|
List of (is_anomalous, score) tuples in input order. |
update_batch
update_batch(values: list[float]) -> None
Add batch training data.
BatchClassifiable
Bases: Protocol
Batch classification marker.
ML classification implementations that support vectorized batch operations implement this. Rule-based classifiers (SpikeClassifier) do not need to implement this.
Usage
- CorrelationEngine (batch event classification)
classify_batch
classify_batch(
features_list: list[dict[str, float]],
contexts: list[dict[str, Any]] | None = None,
) -> list[tuple[str, float]]
Batch classification.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
features_list
|
list[dict[str, float]]
|
List of feature dicts. |
required |
contexts
|
list[dict[str, Any]] | None
|
Per-item metadata (optional). |
None
|
Returns:
| Type | Description |
|---|---|
list[tuple[str, float]]
|
List of (label, confidence) tuples in input order. |
OptimizationStrategy
Bases: Protocol
Parameter optimization strategy - settings value search.
Default
- DecisionEngine rules (rule-based)
ML implementations
- BayesianOptimizer: Gaussian Process + Expected Improvement
- EvolutionaryOptimizer: CMA-ES (optional, future)
Usage
- SettingsRecommendationService (optimal value search)
- AutoTuningService (parameter suggestion)
Note
DecisionEngine (rule-based) and OptimizationStrategy (ML-based) operate on fundamentally different paradigms. Their integration is handled by SettingsRecommendationService (374), NOT by an adapter in this package. 373 provides the ML implementation; 374 orchestrates the pipeline.
suggest
suggest(
parameter: str,
current_value: float,
bounds: tuple[float, float],
history: list[dict[str, Any]],
objective_metric: str,
minimize: bool = True,
) -> tuple[float, float]
Suggest optimal value for a parameter.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
parameter
|
str
|
Parameter name to optimize. |
required |
current_value
|
float
|
Current parameter value. |
required |
bounds
|
tuple[float, float]
|
(min, max) allowed range. |
required |
history
|
list[dict[str, Any]]
|
Past observations [{parameter: value, metric: value}, ...]. |
required |
objective_metric
|
str
|
Metric name to optimize (e.g., "p99_latency_ms"). |
required |
minimize
|
bool
|
If True, lower metric = better (default). If False, higher metric = better (e.g., throughput RPS). |
True
|
Returns:
| Type | Description |
|---|---|
(suggested_value, expected_improvement)
|
Value and expected gain. |
suggest_batch
suggest_batch(
parameters: list[str],
current_values: dict[str, float],
bounds: dict[str, tuple[float, float]],
history: list[dict[str, Any]],
objective_metrics: list[str],
minimize: bool = True,
) -> dict[str, tuple[float, float]]
Suggest optimal values for multiple parameters simultaneously.
Returns:
| Type | Description |
|---|---|
dict[str, tuple[float, float]]
|
{parameter: (suggested_value, expected_improvement)} |
update_observation
update_observation(
parameters: dict[str, float], metrics: dict[str, float]
) -> None
Record an observation (parameter values -> metric outcomes).
Used to update the internal model after applying a recommendation.
StrategyLifecycle
Bases: Protocol
ML strategy lifecycle management - optional implementation.
Lightweight statistical strategies (ZScore, IQR) do not need to implement this. Heavy ML models (PyTorch, XGBoost, LLM) implement this.
Prior art
- ShutdownHandler ABC: on_shutdown_start(), on_drain_complete()
- ProviderRegistry.health_check_all(): unified provider health checks
- HoltLinearForecaster: predict() -> None when warmup_samples < 30
initialize
initialize() -> None
Load model weights from disk to memory (or GPU VRAM).
Called during Orchestrator startup.
warmup
warmup() -> None
Run a first inference with dummy input.
Purpose: JIT compilation (PyTorch), CUDA kernel caching, TensorRT engine building, etc. Called after initialize() and before the first real detect().
is_ready
is_ready() -> bool
Whether the strategy is ready for inference.
Wired up to the K8s Readiness Probe. Must return an O(1) cached boolean (constrained by readinessProbe.timeoutSeconds=3).
teardown
teardown() -> None
Release resources (GPU VRAM, temporary files).
Called from GracefulShutdownCoordinator.initiate_shutdown().