Skip to content

baldur_pro.services.canary — Canary Rollout

Progressive canary rollouts with safety interlocks: CanaryRolloutService, CanaryFeatureFlag, and CanarySafetyInterlock.

🔒 PRO Feature — requires a baldur-pro license

These symbols ship in the baldur-pro distribution. PRO modules import normally — there is no ImportError. PRO features activate only when baldur.init() runs with a valid BALDUR_LICENSE_KEY; without it the system runs with OSS defaults and register_pro_services() logs entitlement.pro_registration_skipped.

canary

Canary Rollout Module.

Gradual rollout and automatic rollback system for configuration changes.

Reference: docs/baldur/middleware_system/71_CANARY_CONFIG_ROLLOUT.md

Components
  • models.py: CanaryState, CanaryStage, CanaryRollout, CanaryMetrics, PassCriteria
  • service.py: CanaryRolloutService (thin orchestrator), get_canary_rollout_service
  • state_machine.py: RolloutStateMachine (state transition rules)
  • cluster_applicator.py: ClusterApplicator (per-cluster config application)
  • evaluator.py: CanaryEvaluator (Shadow + Live evaluation)
  • serializer.py: RolloutSerializer (serialization/deserialization)
  • metrics_collector.py: CanaryMetricsCollector (metrics collection)
  • locking.py: CanaryConfigLock, ConfigLockError
  • versioning.py: VersionChecker, check_version_and_rollback, VersionConflictError
  • chaos_guard.py: CanaryChaosGuard, ChaosConflictPolicy, ChaosConflictResult
  • audit.py: log_canary_action, log_canary_error
  • cross_cluster.py: CrossClusterNotifier, CrossClusterPropagationRequest, GovernancePolicySync
  • feature_flag.py: CanaryFeatureFlag, CanaryFlagConfig, CanaryConfigMiddleware
Usage

from baldur_pro.services.canary import ( # Data Models CanaryState, CanaryStage, CanaryRollout, CanaryMetrics, PassCriteria, # Service get_canary_rollout_service, CanaryRolloutService, # Locking CanaryConfigLock, ConfigLockError, # Versioning VersionConflictError, check_version_and_rollback, # Chaos Guard CanaryChaosGuard, ChaosConflictPolicy, )

Example: Create and start a rollout

service = get_canary_rollout_service() rollout = service.create_rollout( config_type="circuit_breaker", new_values={"failure_threshold": 3}, stages=[ CanaryStage(name="canary", clusters=["seoul-canary"], percentage=10), ], created_by="admin@example.com", ) service.start_rollout(rollout.id)

Status: Public

CanaryStage dataclass

CanaryStage(
    name: str,
    clusters: list[str],
    percentage: float,
    duration_minutes: int = 5,
    auto_promote: bool = True,
    pass_criteria: PassCriteria = PassCriteria(),
    error_rate_threshold: float = 0.05,
    latency_increase_threshold: float = 0.5,
)

One stage in a multi-stage canary rollout plan.

A rollout is composed of stages; each stage applies the new configuration to a subset of clusters and waits for the stage duration plus pass-criteria evaluation before promoting.

CanaryState

Bases: str, Enum

Canary rollout state machine.

State transitions: - CREATED: initial, not yet started - CANARY: applied to a subset of clusters - PROMOTING: advancing to the next stage - PAUSED: temporarily halted (manual or automatic) - COMPLETED: fully rolled out to all clusters - ROLLED_BACK: rollback finished - FAILED: failure state - CANCELLED: cancelled by operator

PassCriteria dataclass

PassCriteria(
    error_rate_absolute_max: float = 0.05,
    error_rate_increase_max: float = 0.01,
    latency_p95_delta_ms: float = 50.0,
    latency_p99_delta_pct: float = 0.2,
    error_budget_drain_rate_max: float = 1.2,
    error_budget_remaining_min: float = 0.1,
    min_requests_required: int = 100,
    evaluation_window_seconds: int = 300,
)

Promotion thresholds for canary stage evaluation.

Pure threshold DTO consumed by LiveCanaryEvaluator. The evaluator owns the comparison logic; this class holds the values only.

for_tier classmethod

for_tier(tier_id: str) -> PassCriteria

Return tier-default PassCriteria for critical/standard/non_essential.

CanaryChaosGuard

CanaryChaosGuard(policy: ChaosConflictPolicy = None)

Defense against conflicts between Canary Rollout and Chaos experiments.

Applying a Canary rollout to a cluster where a chaos experiment is running makes it impossible to tell whether a metric change is caused by the configuration change or by the chaos. This guard detects the conflict and handles it according to the policy.

Default policy: SMART (exclude only the clusters under experiment)

Example

guard = CanaryChaosGuard(policy=ChaosConflictPolicy.SMART)

result = guard.check_conflict( target_clusters=["seoul", "tokyo", "singapore"], force_during_chaos=False, )

if result.has_conflict: logger.warning( "chaos_clusters_excluded", chaos_clusters=result.chaos_clusters, )

if not result.can_proceed: raise ValueError("All target clusters have active chaos")

Proceed with the rollout on result.safe_clusters

proceed_with_clusters(result.safe_clusters)

Initialize CanaryChaosGuard.

Parameters:

Name Type Description Default
policy ChaosConflictPolicy

Conflict policy (default: SMART)

None

policy property writable

policy: ChaosConflictPolicy

Look up the current policy.

check_conflict

check_conflict(
    target_clusters: list[str],
    force_during_chaos: bool = False,
) -> ChaosConflictResult

Check for chaos experiment conflicts.

Parameters:

Name Type Description Default
target_clusters list[str]

Rollout target clusters

required
force_during_chaos bool

Force-proceed flag (warning only if True)

False

Returns:

Type Description
ChaosConflictResult

Conflict check result (ChaosConflictResult)

Note

Use force_during_chaos=True only in emergencies. Metrics-based automatic rollback may malfunction.

ChaosConflictPolicy

Bases: str, Enum

Chaos conflict policy.

How to handle the case where a chaos experiment is running on a target rollout cluster.

Values

STRICT: Fully block the Canary while a chaos experiment is running SMART: Proceed while excluding only the clusters under chaos (default) LOOSE: Proceed with all clusters after a warning (not recommended)

ChaosConflictResult dataclass

ChaosConflictResult(
    has_conflict: bool,
    chaos_clusters: list[str],
    safe_clusters: list[str],
    policy_applied: ChaosConflictPolicy,
    can_proceed: bool,
    warning_message: str | None = None,
)

Conflict check result.

Attributes:

Name Type Description
has_conflict bool

Whether a conflict occurred

chaos_clusters list[str]

List of clusters under a chaos experiment

safe_clusters list[str]

List of clusters available for rollout

policy_applied ChaosConflictPolicy

Applied policy

can_proceed bool

Whether the rollout can proceed

warning_message str | None

Warning message (on conflict)

ClusterApplicator

ClusterApplicator(rollout_ttl_days: int = 7)

Applies canary config to individual clusters via CacheProviderInterface.

This handles LOCAL cluster cache key storage only. For cross-cluster synchronization, see cross_cluster.py.

Initialize the ClusterApplicator.

Parameters:

Name Type Description Default
rollout_ttl_days int

TTL for cluster config keys in days.

7

cache property

cache: CacheProviderInterface | None

CacheProviderInterface (lazy loading via ProviderRegistry).

apply_to_clusters

apply_to_clusters(
    rollout: CanaryRollout, clusters: list[str]
) -> None

Apply config to multiple clusters.

Parameters:

Name Type Description Default
rollout CanaryRollout

The canary rollout containing config to apply.

required
clusters list[str]

List of cluster IDs to apply config to.

required

apply_config_to_cluster

apply_config_to_cluster(
    cluster: str, config_type: str, values: dict[str, Any]
) -> None

Apply config to a specific cluster via cache provider.

The actual implementation stores config in a cluster-specific cache key. The sync mechanism depends on deployment: - Same Redis: namespace-based key storage - Separate Redis: per-cluster Redis connection - API-based: cluster API call

Parameters:

Name Type Description Default
cluster str

Cluster ID.

required
config_type str

Config type (circuit_breaker, dlq, retry, etc.).

required
values dict[str, Any]

Config values to apply.

required

ClusterConfigChange dataclass

ClusterConfigChange(
    config_type: str,
    previous_value: dict[str, Any],
    new_value: dict[str, Any],
    changed_by: str = "",
    changed_at: datetime = utc_now(),
    rollout_id: str | None = None,
    reason: str = "",
)

Bases: SerializableMixin

Configuration change information.

Attributes:

Name Type Description
config_type str

config type (circuit_breaker, dlq, retry, etc.)

previous_value dict[str, Any]

previous config value

new_value dict[str, Any]

new config value

changed_by str

who changed it

changed_at datetime

change time

rollout_id str | None

related Canary Rollout ID (if any)

reason str

change reason

from_dict classmethod

from_dict(data: dict[str, Any]) -> ClusterConfigChange

Create from a dictionary.

CrossClusterNotifier

CrossClusterNotifier(
    current_cluster: str | None = None,
    other_clusters: list[str] | None = None,
    notification_backend: NotificationBackend | None = None,
    default_channel: str | None = None,
)

Cross-cluster notifier.

Sends a notification to the operators of other clusters on a config change. Does not apply automatically; only shares information.

Attributes:

Name Type Description
current_cluster

current cluster ID

other_clusters

list of clusters to notify

notification_backend

notification delivery backend

Example

notifier = CrossClusterNotifier( current_cluster="cluster-a", other_clusters=["cluster-b", "cluster-c"], )

notifier.notify_config_change(ClusterConfigChange( config_type="circuit_breaker", previous_value={"failure_threshold": 5}, new_value={"failure_threshold": 3}, changed_by="admin@example.com", ))

Initialize CrossClusterNotifier.

Parameters:

Name Type Description Default
current_cluster str | None

current cluster ID (defaults to the BALDUR_NAMESPACE env var)

None
other_clusters list[str] | None

list of clusters to notify

None
notification_backend NotificationBackend | None

notification delivery backend (default: logging)

None
default_channel str | None

default notification channel

None

DEFAULT_CHANNEL property

DEFAULT_CHANNEL: str

Get default channel from SlackChannelSettings.

notify_config_change

notify_config_change(
    change: ClusterConfigChange,
    result: str = "success",
    channel: str | None = None,
) -> dict[str, bool]

Notify the operators of other clusters about a config change.

Does not apply automatically; only shares information.

Parameters:

Name Type Description Default
change ClusterConfigChange

config change information

required
result str

rollout result ("success", "rolled_back", etc.)

'success'
channel str | None

notification channel (None to use the default)

None

Returns:

Type Description
dict[str, bool]

Dictionary of notification success per cluster

add_cluster

add_cluster(cluster: str) -> None

Add a cluster to the notification targets.

remove_cluster

remove_cluster(cluster: str) -> None

Remove a cluster from the notification targets.

CrossClusterPropagationRequest

CrossClusterPropagationRequest(
    store: CrossClusterStore | None = None,
    notification_backend: NotificationBackend | None = None,
    default_expiry_hours: int = 24,
    on_apply: (
        Callable[[PropagationRequest], bool] | None
    ) = None,
    default_channel: str | None = None,
    redis_client: Any = None,
)

Cross-cluster config propagation request (manual approval).

A safe approach where propagating a config to another cluster requires the approval of the target cluster's operator before it is applied.

Workflow: 1. Create a request (request_propagation) 2. Send an approval-request notification to the target cluster 3. The operator calls approve() or reject() 4. On approval, run the config-apply callback

Attributes:

Name Type Description
notification_backend

notification delivery backend

default_expiry_hours

request expiration time (default 24 hours)

on_apply

config-apply callback function

Example

requester = CrossClusterPropagationRequest()

Create a request

request_id = requester.request_propagation( source_cluster="cluster-a", target_clusters=["cluster-b"], change=config_change, )

(On the other cluster) approve

requester.approve(request_id, approved_by="admin@cluster-b.com")

Initialize CrossClusterPropagationRequest.

Parameters:

Name Type Description Default
store CrossClusterStore | None

CrossClusterStore instance (auto-resolved if None)

None
notification_backend NotificationBackend | None

notification delivery backend

None
default_expiry_hours int

request expiration time

24
on_apply Callable[[PropagationRequest], bool] | None

config-apply callback function

None
default_channel str | None

default notification channel

None
redis_client Any

Deprecated — use store parameter instead

None

store property

store

CrossClusterStore (Lazy loading via ProviderRegistry).

request_propagation

request_propagation(
    source_cluster: str,
    target_clusters: list[str],
    change: ClusterConfigChange,
    expiry_hours: int | None = None,
) -> str

Create a config propagation request to other clusters.

Parameters:

Name Type Description Default
source_cluster str

the requesting source cluster

required
target_clusters list[str]

list of target clusters

required
change ClusterConfigChange

the config change to propagate

required
expiry_hours int | None

request expiration time

None

Returns:

Type Description
str

Request ID (applied only after the target cluster's operator approves)

approve

approve(
    request_id: str, approved_by: str
) -> tuple[bool, str | None]

Approve a config propagation request.

Parameters:

Name Type Description Default
request_id str

request ID

required
approved_by str

approver

required

Returns:

Type Description
tuple[bool, str | None]

(success, error message)

reject

reject(
    request_id: str, rejected_by: str, reason: str = ""
) -> tuple[bool, str | None]

Reject a config propagation request.

Parameters:

Name Type Description Default
request_id str

request ID

required
rejected_by str

rejecter

required
reason str

rejection reason

''

Returns:

Type Description
tuple[bool, str | None]

(success, error message)

get_request

get_request(request_id: str) -> PropagationRequest | None

Query a request.

get_pending_requests

get_pending_requests(
    cluster: str,
) -> list[PropagationRequest]

Query the list of pending requests for a cluster.

GovernancePolicy dataclass

GovernancePolicy(
    policy_id: str,
    config_type: str,
    rules: list[dict[str, Any]] = list(),
    version: int = 1,
    created_by: str = "",
    created_at: datetime = utc_now(),
)

Bases: SerializableMixin

Governance policy.

Defines the allowed range of config values. This policy is synchronized automatically (it is not the actual config value).

Attributes:

Name Type Description
policy_id str

policy ID

config_type str

config type

rules list[dict[str, Any]]

list of policy rules e.g. {"max": 5, "min": 1, "field": "retry_max_attempts"}

version int

policy version

created_by str

creator

created_at datetime

creation time

validate_config

validate_config(
    config: dict[str, Any],
) -> tuple[bool, list[str]]

Validate whether the config values comply with the policy.

Parameters:

Name Type Description Default
config dict[str, Any]

config values to validate

required

Returns:

Type Description
tuple[bool, list[str]]

(compliant, list of violation messages)

GovernancePolicySync

GovernancePolicySync(
    clusters: list[str] | None = None,
    store: CrossClusterStore | None = None,
    notification_backend: NotificationBackend | None = None,
    redis_client: Any = None,
)

Governance policy synchronization (automatic).

Synchronizes only governance policies automatically. These are not the actual config values, but the allowed range of config values (upper/lower bounds).

Synchronized: - retry_max_attempts <= 5 (upper bound) - failure_threshold >= 2 (lower bound)

Not synchronized: - Actual config values (Blast Radius risk)

Attributes:

Name Type Description
clusters

list of clusters to synchronize

notification_backend

notification delivery backend

Example

policy_sync = GovernancePolicySync( clusters=["cluster-a", "cluster-b"], )

policy = GovernancePolicy( policy_id="circuit-breaker-limits", config_type="circuit_breaker", rules=[ {"field": "failure_threshold", "min": 2, "max": 10}, {"field": "reset_timeout_seconds", "max": 300}, ], )

policy_sync.sync_policy(policy)

Initialize GovernancePolicySync.

Parameters:

Name Type Description Default
clusters list[str] | None

list of clusters to synchronize

None
store CrossClusterStore | None

CrossClusterStore instance (auto-resolved if None)

None
notification_backend NotificationBackend | None

notification delivery backend

None
redis_client Any

Deprecated — use store parameter instead

None

store property

store

CrossClusterStore (Lazy loading via ProviderRegistry).

sync_policy

sync_policy(policy: GovernancePolicy) -> dict[str, bool]

Synchronize a governance policy.

Synchronizes the policy to all target clusters.

Parameters:

Name Type Description Default
policy GovernancePolicy

the governance policy to synchronize

required

Returns:

Type Description
dict[str, bool]

Synchronization success per cluster

get_policy

get_policy(config_type: str) -> GovernancePolicy | None

Query a policy.

validate_config_against_policy

validate_config_against_policy(
    config_type: str, config: dict[str, Any]
) -> tuple[bool, list[str]]

Validate whether the config values comply with the governance policy.

Parameters:

Name Type Description Default
config_type str

config type

required
config dict[str, Any]

config values to validate

required

Returns:

Type Description
tuple[bool, list[str]]

(compliant, list of violation messages)

PropagationRequest dataclass

PropagationRequest(
    request_id: str,
    source_cluster: str,
    target_cluster: str,
    config_change: ClusterConfigChange,
    status: PropagationRequestStatus = PropagationRequestStatus.PENDING_APPROVAL,
    created_at: datetime = utc_now(),
    expires_at: datetime | None = None,
    approved_by: str | None = None,
    approved_at: datetime | None = None,
    rejected_by: str | None = None,
    rejected_at: datetime | None = None,
    reject_reason: str | None = None,
)

Bases: SerializableMixin

Config propagation request.

The config is applied once approved by the other cluster.

from_dict classmethod

from_dict(data: dict[str, Any]) -> PropagationRequest

Create from a dictionary.

PropagationRequestStatus

Bases: str, Enum

Config propagation request status.

CanaryEvaluator

Shadow Evaluation Gate and Live Canary Evaluation.

Handles pre-start shadow evaluation checks and pre-promote live canary metric evaluation.

check_shadow_evaluation

check_shadow_evaluation(
    rollout: CanaryRollout,
    bypass_shadow: bool,
    bypass_shadow_reason: str,
) -> bool | None

Check Shadow Evaluation result before starting a rollout.

Returns:

Name Type Description
True bool | None

Passed (can start)

False bool | None

Blocked (cannot start)

None bool | None

Shadow evaluation not run or disabled (skip check)

check_live_canary_evaluation

check_live_canary_evaluation(
    rollout: CanaryRollout, tier_id: str | None = None
) -> bool | None

Evaluate live canary node metrics before promotion.

Returns:

Name Type Description
True bool | None

Passed (can promote)

False bool | None

Blocked (cannot promote)

None bool | None

Disabled or error (skip check)

check_error_budget_drain

check_error_budget_drain(
    rollout: CanaryRollout, tier_id: str | None = None
) -> bool | None

Check error-budget drain rate before promotion (PRO-only).

Blocks promotion when the canary is burning error budget faster than the stage's error_budget_drain_rate_max (using the larger of the 1h/6h burn rate to catch both fast and sustained burn) OR when the remaining budget is below error_budget_remaining_min. The effective criteria are the stage's pass_criteria with the tier floor applied, identical to the live-evaluation path.

The drain check evaluates only when the Error Budget feature is enabled and wired with real stats; while it is OFF (the default) or unwired, the check honestly skips (returns None) rather than evaluating fake-healthy simulation data. When enabled, the per-stage PassCriteria values are the operator control (a stage can set a high drain limit to effectively opt out). PRO entitlement gating is inherited from module registration; no OSS import is introduced.

Returns:

Name Type Description
True bool | None

within drain limits (can promote)

False bool | None

burning too fast or below the remaining floor (blocked)

None bool | None

budget signal absent — feature OFF/unwired, or service unavailable/error (skip — fail-open)

CanaryConfigMiddleware

CanaryConfigMiddleware(get_response: Callable)

Django middleware - Request-level Canary configuration application.

Performs the Canary decision at the start of the request and attaches the result to the request object.

Usage (settings.py): MIDDLEWARE = [ ... 'baldur_pro.services.canary.feature_flag.CanaryConfigMiddleware', ... ]

Access in views

def my_view(request): canary_decisions = getattr(request, 'canary_decisions', {}) if canary_decisions.get('circuit_breaker', {}).get('use_canary'): # Use the Canary configuration pass

Initialize the middleware.

feature_flag property

feature_flag: CanaryFeatureFlag

CanaryFeatureFlag singleton.

CanaryDecision dataclass

CanaryDecision(
    use_canary: bool,
    reason: str,
    strategy_used: str,
    effective_config: dict[str, Any] = dict(),
)

Result of the Canary application decision.

Attributes:

Name Type Description
use_canary bool

Whether to use the Canary configuration

reason str

Reason for the decision

strategy_used str

Selection strategy used

effective_config dict[str, Any]

Configuration values to be applied finally

CanaryFeatureFlag

CanaryFeatureFlag(
    context_extractor: (
        RequestContextExtractor | None
    ) = None,
)

Request-level Canary Feature Flag.

Decides whether to apply the Canary configuration per request within a single cluster.

Key features: - Probability-based Canary selection (consistency guaranteed) - Whitelist-based forced application - Header-based manual override - Independent Canary management per configuration

Example

feature_flag = CanaryFeatureFlag()

Register Canary configuration

feature_flag.register_flag(CanaryFlagConfig( config_type="circuit_breaker", percentage=10, # 10% of traffic baseline_config={"failure_threshold": 5}, canary_config={"failure_threshold": 3}, ))

Decide Canary application per request

decision = feature_flag.evaluate(request, "circuit_breaker")

if decision.use_canary: # Use the Canary configuration apply_config(decision.effective_config)

Initialize CanaryFeatureFlag.

Parameters:

Name Type Description Default
context_extractor RequestContextExtractor | None

Request context extractor

None

register_flag

register_flag(config: CanaryFlagConfig) -> None

Register a Canary Flag.

Parameters:

Name Type Description Default
config CanaryFlagConfig

Canary Flag configuration

required

unregister_flag

unregister_flag(config_type: str) -> bool

Unregister a Canary Flag.

Parameters:

Name Type Description Default
config_type str

Configuration type

required

Returns:

Type Description
bool

Whether unregistration succeeded

get_flag

get_flag(config_type: str) -> CanaryFlagConfig | None

Look up a Canary Flag.

list_flags

list_flags() -> list[CanaryFlagConfig]

Return a list of all Canary Flags.

should_use_canary_config

should_use_canary_config(
    request: HttpRequest, config_type: str
) -> bool

Decide whether the Canary configuration should be applied to the request.

Parameters:

Name Type Description Default
request HttpRequest

Django HTTP request

required
config_type str

Configuration type

required

Returns:

Type Description
bool

Whether to use the Canary configuration

get_effective_config

get_effective_config(
    request: HttpRequest,
    config_type: str,
    baseline_config: dict[str, Any] | None = None,
    canary_config: dict[str, Any] | None = None,
) -> dict[str, Any]

Return the effective configuration to apply to the request.

Parameters:

Name Type Description Default
request HttpRequest

Django HTTP request

required
config_type str

Configuration type

required
baseline_config dict[str, Any] | None

Baseline configuration (used if absent from the Flag)

None
canary_config dict[str, Any] | None

Canary configuration (used if absent from the Flag)

None

Returns:

Type Description
dict[str, Any]

Configuration values to apply

evaluate

evaluate(
    request: HttpRequest, config_type: str
) -> CanaryDecision

Evaluate whether to apply the Canary configuration.

Decides whether the request belongs to the Canary group based on the strategy.

Parameters:

Name Type Description Default
request HttpRequest

Django HTTP request

required
config_type str

Configuration type

required

Returns:

Type Description
CanaryDecision

CanaryDecision result

update_percentage

update_percentage(
    config_type: str, new_percentage: float
) -> bool

Update the Canary application rate.

Used when gradually raising the rate while monitoring.

Parameters:

Name Type Description Default
config_type str

Configuration type

required
new_percentage float

New rate (0-100)

required

Returns:

Type Description
bool

Whether the update succeeded

CanaryFlagConfig dataclass

CanaryFlagConfig(
    config_type: str,
    enabled: bool = True,
    percentage: float = 10.0,
    strategy: CanarySelectionStrategy = CanarySelectionStrategy.USER_ID_HASH,
    rollout_id: str | None = None,
    whitelist_user_ids: set[str] = set(),
    whitelist_ips: set[str] = set(),
    canary_header: str = "X-Canary-Config",
    baseline_config: dict[str, Any] = dict(),
    canary_config: dict[str, Any] = dict(),
    created_at: datetime = utc_now(),
    expires_at: datetime | None = None,
)

Bases: SerializableMixin

Canary Feature Flag configuration.

Attributes:

Name Type Description
config_type str

Configuration type (circuit_breaker, dlq, retry, etc.)

enabled bool

Whether enabled

percentage float

Canary application rate (0-100)

strategy CanarySelectionStrategy

Selection strategy

rollout_id str | None

Related Canary Rollout ID

whitelist_user_ids set[str]

List of whitelisted user IDs

whitelist_ips set[str]

List of whitelisted IPs

canary_header str

Header identifying a Canary request (for the HEADER_BASED strategy)

baseline_config dict[str, Any]

Baseline configuration values

canary_config dict[str, Any]

Canary configuration values

created_at datetime

Creation time

expires_at datetime | None

Expiration time (optional)

is_expired

is_expired() -> bool

Check whether expired.

from_dict classmethod

from_dict(data: dict[str, Any]) -> CanaryFlagConfig

Create from a dictionary.

CanarySelectionStrategy

Bases: str, Enum

Canary selection strategy.

How to decide whether a request receives the Canary configuration.

CanarySafetyInterlock

CanarySafetyInterlock(
    policy: dict[str, InterlockAction] | None = None,
    fail_closed: bool = True,
    emergency_tracker_factory: Callable | None = None,
)

Safety Interlock for Canary rollouts.

Controls Canary operations based on the Emergency Level.

Features: - Emergency Level-based automatic braking - Configurable policy (per-level action mapping) - Fail-Closed policy (safe mode on backend failure)

Default policy: - NORMAL (0): ALLOW - LEVEL_1 (1): ALLOW_WITH_WARNING - LEVEL_2 (2): PAUSE - LEVEL_3 (3): ROLLBACK

Usage

interlock = CanarySafetyInterlock()

Check before the operation

result = interlock.check(operation="promote", rollout_id="abc123")

if not result.allowed: if result.action == InterlockAction.PAUSE: canary_service.pause(rollout_id) elif result.action == InterlockAction.ROLLBACK: canary_service.rollback(rollout_id)

Initialize CanarySafetyInterlock.

Parameters:

Name Type Description Default
policy dict[str, InterlockAction] | None

Per-Emergency-Level action policy (uses DEFAULT_POLICY if None)

None
fail_closed bool

If True, treat a backend failure as LEVEL_3 (default: True)

True
emergency_tracker_factory Callable | None

EmergencyTracker factory function (for testing)

None

check

check(
    operation: str,
    rollout_id: str | None = None,
    namespace: str | None = None,
) -> InterlockResult

Safety Interlock check.

Looks up the current Emergency Level and decides the action per the policy.

Parameters:

Name Type Description Default
operation str

Operation to perform (start, promote, resume, etc.)

required
rollout_id str | None

Rollout ID (if any)

None
namespace str | None

Target namespace (current instance if None)

None

Returns:

Name Type Description
InterlockResult InterlockResult

check result

check_and_apply

check_and_apply(
    canary_service: CanaryRolloutService,
    rollout_id: str,
    operation: str,
    namespace: str | None = None,
) -> InterlockResult

Safety Interlock check and automatic application.

For a PAUSE or ROLLBACK action, applies it automatically via canary_service.

Parameters:

Name Type Description Default
canary_service CanaryRolloutService

CanaryRolloutService instance

required
rollout_id str

Rollout ID

required
operation str

Operation to perform

required
namespace str | None

Target namespace

None

Returns:

Name Type Description
InterlockResult InterlockResult

check and application result

InterlockAction

Bases: str, Enum

Safety Interlock action.

The action type the Interlock decides for a Canary rollout operation.

ALLOW class-attribute instance-attribute

ALLOW = 'allow'

Allow normal progress.

ALLOW_WITH_WARNING class-attribute instance-attribute

ALLOW_WITH_WARNING = 'allow_with_warning'

Allow progress with a warning log. Used at LEVEL_1.

PAUSE class-attribute instance-attribute

PAUSE = 'pause'

Pause. Used at LEVEL_2. Can resume after the situation is resolved.

ROLLBACK class-attribute instance-attribute

ROLLBACK = 'rollback'

Immediate rollback. Used at LEVEL_3 or Fail-Closed.

BLOCK class-attribute instance-attribute

BLOCK = 'block'

Block the operation. Cannot start a new rollout.

InterlockCheckFailure

Bases: str, Enum

Interlock check failure type.

Records the failure cause when the Fail-Closed policy is applied.

BACKEND_UNAVAILABLE class-attribute instance-attribute

BACKEND_UNAVAILABLE = 'backend_unavailable'

Redis/backend connection unavailable.

TRACKER_ERROR class-attribute instance-attribute

TRACKER_ERROR = 'tracker_error'

EmergencyModeTracker error.

TIMEOUT class-attribute instance-attribute

TIMEOUT = 'timeout'

Check timeout.

InterlockResult dataclass

InterlockResult(
    action: InterlockAction,
    allowed: bool,
    reason: str,
    emergency_level: str = "normal",
    emergency_level_name: str = "normal",
    namespace: str = "",
    delay_seconds: int = 0,
    auto_resume_at: str | None = None,
    check_failure: InterlockCheckFailure | None = None,
    is_fail_closed: bool = False,
    metadata: dict[str, Any] = dict(),
)

Safety Interlock check result.

Holds the Interlock check result for a Canary operation.

Attributes:

Name Type Description
action InterlockAction

Action to perform (ALLOW, PAUSE, ROLLBACK, etc.)

allowed bool

Whether to allow progress (True if action is in the ALLOW* family)

reason str

Reason for the action decision

emergency_level str

Current Emergency level

namespace str

Applied namespace

check_failure InterlockCheckFailure | None

Check failure type (None if normal)

is_fail_closed bool

Whether the decision is due to the Fail-Closed policy

action instance-attribute

action: InterlockAction

Action to perform.

allowed instance-attribute

allowed: bool

Whether to allow progress.

reason instance-attribute

reason: str

Action reason.

emergency_level class-attribute instance-attribute

emergency_level: str = 'normal'

Current Emergency level value.

emergency_level_name class-attribute instance-attribute

emergency_level_name: str = 'normal'

Emergency level name.

namespace class-attribute instance-attribute

namespace: str = ''

Applied namespace.

delay_seconds class-attribute instance-attribute

delay_seconds: int = 0

Delay time (for PAUSE).

auto_resume_at class-attribute instance-attribute

auto_resume_at: str | None = None

Automatic resume time (if any).

check_failure class-attribute instance-attribute

check_failure: InterlockCheckFailure | None = None

Check failure type (None if normal).

is_fail_closed class-attribute instance-attribute

is_fail_closed: bool = False

Whether blocked due to the Fail-Closed policy.

metadata class-attribute instance-attribute

metadata: dict[str, Any] = field(default_factory=dict)

Additional metadata.

allow classmethod

allow(
    emergency_level: str,
    emergency_level_name: str,
    namespace: str,
) -> InterlockResult

Factory for an allow result.

Parameters:

Name Type Description Default
emergency_level str

Emergency level value

required
emergency_level_name str

Emergency level name

required
namespace str

Namespace

required

Returns:

Type Description
InterlockResult

InterlockResult with the ALLOW action

allow_with_warning classmethod

allow_with_warning(
    emergency_level: str,
    emergency_level_name: str,
    namespace: str,
) -> InterlockResult

Factory for an allow-with-warning result.

Parameters:

Name Type Description Default
emergency_level str

Emergency level value

required
emergency_level_name str

Emergency level name

required
namespace str

Namespace

required

Returns:

Type Description
InterlockResult

InterlockResult with the ALLOW_WITH_WARNING action

pause classmethod

pause(
    emergency_level: str,
    emergency_level_name: str,
    namespace: str,
    reason: str | None = None,
) -> InterlockResult

Factory for a pause result.

Parameters:

Name Type Description Default
emergency_level str

Emergency level value

required
emergency_level_name str

Emergency level name

required
namespace str

Namespace

required
reason str | None

Additional reason (default message if absent)

None

Returns:

Type Description
InterlockResult

InterlockResult with the PAUSE action

rollback classmethod

rollback(
    emergency_level: str,
    emergency_level_name: str,
    namespace: str,
    reason: str | None = None,
) -> InterlockResult

Factory for a rollback result.

Parameters:

Name Type Description Default
emergency_level str

Emergency level value

required
emergency_level_name str

Emergency level name

required
namespace str

Namespace

required
reason str | None

Additional reason (default message if absent)

None

Returns:

Type Description
InterlockResult

InterlockResult with the ROLLBACK action

fail_closed classmethod

fail_closed(
    failure: InterlockCheckFailure,
    namespace: str,
    error_message: str,
) -> InterlockResult

Factory for a Fail-Closed result.

On a backend failure, treats it as LEVEL_3 and returns the ROLLBACK action.

Parameters:

Name Type Description Default
failure InterlockCheckFailure

Failure type

required
namespace str

Namespace

required
error_message str

Error message

required

Returns:

Type Description
InterlockResult

InterlockResult with the ROLLBACK action due to Fail-Closed

block classmethod

block(
    emergency_level: str,
    emergency_level_name: str,
    namespace: str,
    reason: str | None = None,
) -> InterlockResult

Factory for a block result.

Parameters:

Name Type Description Default
emergency_level str

Emergency level value

required
emergency_level_name str

Emergency level name

required
namespace str

Namespace

required
reason str | None

Additional reason (default message if absent)

None

Returns:

Type Description
InterlockResult

InterlockResult with the BLOCK action

CanaryConfigLock

CanaryConfigLock(
    redis_client: Any, lock_timeout: timedelta | None = None
)

Config lock for canary rollouts.

Ensures only one rollout is active for a given config_type. Uses RedisDistributedLock for safe distributed operation.

Features
  • Distributed lock (Redis SET NX PX)
  • Rollout ID-based owner identification
  • Auto-expiry (zombie lock prevention)
  • Lock status and owner lookup
Example

lock = CanaryConfigLock(redis_client)

Acquire lock

if lock.acquire("circuit_breaker", "rollout-abc"): try: perform_rollout() finally: lock.release("circuit_breaker", "rollout-abc")

Initialize CanaryConfigLock.

Parameters:

Name Type Description Default
redis_client Any

Redis client instance

required
lock_timeout timedelta | None

Lock auto-expiry time (default from settings)

None

acquire

acquire(
    config_type: str,
    rollout_id: str,
    blocking: bool = False,
) -> bool

Acquire the configuration lock.

Parameters:

Name Type Description Default
config_type str

Configuration type (circuit_breaker, dlq, retry, etc.)

required
rollout_id str

Rollout ID (for owner identification)

required
blocking bool

If True, wait until the lock is acquired (default False)

False

Returns:

Type Description
bool

Whether the lock was acquired

Note

blocking=True is not recommended. A Canary rollout should fail immediately and notify the user.

release

release(config_type: str, rollout_id: str) -> bool

Release the configuration lock.

Releases after verifying the lock owner (handled atomically via a Lua script).

Parameters:

Name Type Description Default
config_type str

Configuration type

required
rollout_id str

Rollout ID (for owner verification)

required

Returns:

Type Description
bool

Whether the lock was released

is_locked

is_locked(config_type: str) -> bool

Check the lock state.

Parameters:

Name Type Description Default
config_type str

Configuration type

required

Returns:

Type Description
bool

True if the lock is held

get_lock_owner

get_lock_owner(config_type: str) -> str | None

Look up the current lock owner.

Parameters:

Name Type Description Default
config_type str

Configuration type

required

Returns:

Type Description
str | None

Lock owner (rollout ID) or None

extend

extend(
    config_type: str,
    rollout_id: str,
    additional_time: timedelta = None,
) -> bool

Extend the lock TTL.

Prevents lock expiry when a rollout takes longer than expected.

Parameters:

Name Type Description Default
config_type str

Configuration type

required
rollout_id str

Rollout ID (for owner verification)

required
additional_time timedelta

Time to extend (default: same as the initial timeout)

None

Returns:

Type Description
bool

Whether the extension succeeded

force_release

force_release(config_type: str) -> bool

Force-release the lock (admin only).

For cleaning up zombie rollouts. Deletes the lock without owner verification.

Parameters:

Name Type Description Default
config_type str

Configuration type

required

Returns:

Type Description
bool

Whether the deletion succeeded

Warning

This method should be used only with admin privileges. Use release() in normal situations.

ConfigLockError

ConfigLockError(
    message: str,
    config_type: str = "",
    current_owner: str | None = None,
)

Bases: BaldurError

Config lock acquisition failed.

Raised when a rollout is already in progress for the same config_type.

Attributes:

Name Type Description
config_type

Configuration type

current_owner

Current lock owner (rollout ID)

CanaryMetricsCollector

Collects canary rollout metrics via LiveCanaryEvaluator.

Queries real-time metrics and converts them to CanaryMetrics format.

collect

collect(rollout: CanaryRollout) -> list[CanaryMetrics]

Collect metrics for a canary rollout.

Parameters:

Name Type Description Default
rollout CanaryRollout

The canary rollout to collect metrics for.

required

Returns:

Type Description
list[CanaryMetrics]

List of CanaryMetrics for the current stage.

CanaryMetrics dataclass

CanaryMetrics(
    cluster: str,
    stage_name: str,
    error_rate_before: float = 0.0,
    error_rate_after: float = 0.0,
    latency_p50_before: float = 0.0,
    latency_p50_after: float = 0.0,
    latency_p99_before: float = 0.0,
    latency_p99_after: float = 0.0,
    requests_total: int = 0,
    errors_total: int = 0,
    is_healthy: bool = True,
    unhealthy_reason: str | None = None,
)

Metrics for a Canary stage.

Determines the health status by comparing metrics before and after the configuration change.

CanaryRollout dataclass

CanaryRollout(
    id: str,
    config_type: str,
    previous_values: dict[str, Any],
    new_values: dict[str, Any],
    state: CanaryState = CanaryState.CREATED,
    current_stage_index: int = 0,
    stages: list[CanaryStage] = list(),
    created_by: str = "",
    created_at: datetime = utc_now(),
    reason: str = "",
    completed_at: datetime | None = None,
    rollback_reason: str | None = None,
    pause_reason: str | None = None,
    pause_triggered_by: str | None = None,
    paused_at: datetime | None = None,
    stage_started_at: datetime | None = None,
    version: int = 0,
)

Canary rollout information.

The full rollout plan and state for a single configuration change.

Example

rollout = CanaryRollout( id="abc12345", config_type="circuit_breaker", previous_values={"failure_threshold": 5}, new_values={"failure_threshold": 3}, stages=[ CanaryStage(name="canary", clusters=["seoul-canary"], percentage=10.0), CanaryStage(name="50%", clusters=["seoul-main"], percentage=50.0), CanaryStage(name="full", clusters=["tokyo", "singapore"], percentage=100.0), ], created_by="admin@example.com", reason="Reduce failure threshold for faster detection", )

pause_reason class-attribute instance-attribute

pause_reason: str | None = None

Pause reason (e.g., 'Error budget below threshold (5.0% < 10.0%)').

pause_triggered_by class-attribute instance-attribute

pause_triggered_by: str | None = None

Pause trigger type.

Values: - "interlock": Automatically paused by the Safety Interlock (the provenance of every emergency-driven pause) - "manual": Manually paused by an operator - "chaos_guard": Paused by the Chaos Guard - "metrics": Paused due to metric degradation - "error_budget": Paused due to error budget exhaustion - "governance": Paused due to a governance check failure - "recovery_compensation": Re-paused while compensating a failed recovery

paused_at class-attribute instance-attribute

paused_at: datetime | None = None

Pause time.

stage_started_at class-attribute instance-attribute

stage_started_at: datetime | None = None

When the current stage's configuration was applied (None before start).

Set on start, on each promote to a next stage, and reset on resume so the stage's observation window is measured from stage entry, not rollout creation. None for rollouts persisted before this field existed — consumers fall back to created_at.

version class-attribute instance-attribute

version: int = 0

Optimistic locking version, incremented on each save.

current_stage property

current_stage: CanaryStage | None

Return the current stage.

affected_clusters property

affected_clusters: list[str]

List of clusters applied so far.

is_terminal property

is_terminal: bool

Check whether it is a terminal state.

progress_percentage property

progress_percentage: float

Rollout progress (0.0 ~ 100.0).

RolloutSerializer

Serializes and deserializes CanaryRollout objects.

Ensures backward compatibility when fields are added.

serialize

serialize(rollout: CanaryRollout) -> dict[str, Any]

Serialize a CanaryRollout to a dictionary.

Parameters:

Name Type Description Default
rollout CanaryRollout

The rollout to serialize.

required

Returns:

Type Description
dict[str, Any]

Dictionary representation suitable for JSON encoding.

deserialize

deserialize(data: dict[str, Any]) -> CanaryRollout

Deserialize a dictionary to a CanaryRollout (backward compatible).

Parameters:

Name Type Description Default
data dict[str, Any]

Dictionary representation of a rollout.

required

Returns:

Type Description
CanaryRollout

Restored CanaryRollout instance.

CanaryRolloutService

CanaryRolloutService(
    store: CanaryRolloutStore | None = None,
)

Bases: EventEmitterMixin

Canary Rollout management service (thin orchestrator).

Delegates to focused sub-modules for specific responsibilities: - RolloutStateMachine: state transition validation - ClusterApplicator: per-cluster config application - CanaryEvaluator: shadow & live evaluation - RolloutSerializer: serialization/deserialization - CanaryMetricsCollector: metrics collection

Example

service = get_canary_rollout_service()

Create rollout

rollout = service.create_rollout( config_type="circuit_breaker", new_values={"failure_threshold": 3}, stages=[...], created_by="admin@example.com", )

Start -> Promote -> Complete

service.start_rollout(rollout.id) service.promote(rollout.id) # next stage service.promote(rollout.id) # complete

Initialize CanaryRolloutService.

Parameters:

Name Type Description Default
store CanaryRolloutStore | None

Optional CanaryRolloutStore injection (auto-resolved if None)

None

store property

store: CanaryRolloutStore | None

CanaryRolloutStore (lazy loading via ProviderRegistry).

rollout_ttl_days property

rollout_ttl_days: int

Rollout data retention period (loaded from Settings).

config_history property

config_history

ConfigHistoryService (lazy loading with graceful fallback).

chaos_guard property

chaos_guard: CanaryChaosGuard

CanaryChaosGuard (lazy loading).

cluster_applicator property

cluster_applicator: ClusterApplicator

ClusterApplicator (lazy loading).

create_rollout

create_rollout(
    config_type: str,
    new_values: dict[str, Any],
    stages: list[CanaryStage],
    created_by: str,
    reason: str = "",
    force_during_chaos: bool = False,
) -> CanaryRollout

Create a new Canary rollout.

Parameters:

Name Type Description Default
config_type str

Config type (circuit_breaker, dlq, retry, etc.)

required
new_values dict[str, Any]

New config values

required
stages list[CanaryStage]

Rollout stage definitions

required
created_by str

Creator (email or username)

required
reason str

Change reason

''
force_during_chaos bool

Force proceed during chaos experiments

False

Returns:

Type Description
CanaryRollout

Created CanaryRollout

Raises:

Type Description
ConfigLockError

When a rollout is already in progress for this config_type

ValueError

When stages is empty

start_rollout

start_rollout(
    rollout_id: str,
    force_during_chaos: bool = False,
    bypass_shadow: bool = False,
    bypass_shadow_reason: str = "",
    bypass_governance: bool = False,
    bypass_reason: str = "",
    requested_by: str = "",
) -> bool

Start a Canary rollout (apply first stage).

Parameters:

Name Type Description Default
rollout_id str

Rollout ID

required
force_during_chaos bool

Force proceed during chaos experiments

False
bypass_shadow bool

Whether to bypass shadow evaluation failure

False
bypass_shadow_reason str

Reason for bypass (min length: settings.bypass_min_reason_length)

''
bypass_governance bool

Skip governance validation (audit required)

False
bypass_reason str

Reason for bypass (required when bypass_governance=True, min length from settings)

''
requested_by str

Requester (for audit logging)

''

Returns:

Type Description
bool

Success status

promote

promote(
    rollout_id: str,
    force: bool = False,
    bypass_governance: bool = False,
    bypass_reason: str = "",
    requested_by: str = "",
    tier_id: str | None = None,
) -> bool

Promote to next stage.

Parameters:

Name Type Description Default
rollout_id str

Rollout ID

required
force bool

Skip metric validation (legacy)

False
bypass_governance bool

Skip governance validation (audit required)

False
bypass_reason str

Reason for bypass (required when bypass_governance=True, min length from settings)

''
requested_by str

Requester (for audit logging)

''
tier_id str | None

Tier ID for apply_tier_floor (None = not applied)

None

Returns:

Type Description
bool

Success status

rollback

rollback(
    rollout_id: str,
    reason: str = "",
    bypass_governance: bool = False,
    bypass_reason: str = "",
    requested_by: str = "",
) -> bool

Perform rollback.

Restores all applied clusters to previous config.

Parameters:

Name Type Description Default
rollout_id str

Rollout ID

required
reason str

Rollback reason

''
bypass_governance bool

Skip governance validation (audit required)

False
bypass_reason str

Reason for bypass (required when bypass_governance=True, min length from settings)

''
requested_by str

Requester (for audit logging)

''

Returns:

Type Description
bool

Success status

pause

pause(
    rollout_id: str,
    reason: str = "",
    triggered_by: str = "manual",
) -> bool

Pause rollout (with reason tracking).

Parameters:

Name Type Description Default
rollout_id str

Rollout ID

required
reason str

Pause reason

''
triggered_by str

Trigger type (manual, interlock, chaos_guard, metrics, error_budget, governance, recovery_compensation)

'manual'

Returns:

Type Description
bool

Success status

resume

resume(
    rollout_id: str,
    *,
    bypass_governance: bool = False,
    bypass_reason: str = "",
    requested_by: str = ""
) -> bool

Resume a paused rollout.

Performs an emergency-only governance re-check (no kill switch, no error budget). If Emergency Mode is at or above the configured min level, resume is blocked — the rollout stays PAUSED.

The auto-recovery CANARY_RESUME step uses bypass_governance=True so a recovery can resume canaries while the emergency it is recovering from is still elevated; the gate stays fail-closed for operator/REST resumes. Mirrors rollback()'s bypass signature.

Parameters:

Name Type Description Default
rollout_id str

Rollout ID

required
bypass_governance bool

Skip the emergency governance re-check (audit required)

False
bypass_reason str

Reason for bypass (required when bypass_governance=True, min length from settings)

''
requested_by str

Requester (for audit logging)

''

Returns:

Type Description
bool

Success status

get_paused_rollouts

get_paused_rollouts(
    triggered_by_whitelist: list[str] | None = None,
) -> list[CanaryRollout]

Get PAUSED rollouts, optionally filtered by pause trigger.

Parameters:

Name Type Description Default
triggered_by_whitelist list[str] | None

Allowed trigger types to include. - None: return all PAUSED rollouts - ["error_budget", "governance"]: only those triggers

None

Returns:

Type Description
list[CanaryRollout]

List of PAUSED CanaryRollout instances matching the filter.

cancel

cancel(rollout_id: str, reason: str = '') -> bool

Cancel rollout (only in pre-start state).

Parameters:

Name Type Description Default
rollout_id str

Rollout ID

required
reason str

Cancel reason

''

Returns:

Type Description
bool

Success status

renew_config_lock

renew_config_lock(
    rollout: CanaryRollout,
) -> LockRenewalOutcome

Renew a live rollout's config-type lock (watchdog-hosted).

Extends the lock TTL so a rollout that stays active past lock_timeout_minutes keeps the single-active-rollout invariant, while the crash-freeze valve is preserved: only rollouts the watchdog visits are renewed, so a stopped/disabled watchdog lane lets locks expire at TTL.

On extend failure the lock state is diagnosed via an owner lookup and recovered where safe — a lapsed lock under a still-live, started rollout is re-acquired, bounding the duplicate-create window after a watchdog/Redis outage to at most one scan cadence.

Parameters:

Name Type Description Default
rollout CanaryRollout

The active rollout whose lock should be renewed.

required

Returns:

Type Description
LockRenewalOutcome

LockRenewalOutcome describing what happened.

get_rollout

get_rollout(rollout_id: str) -> CanaryRollout | None

Get a rollout by ID.

Parameters:

Name Type Description Default
rollout_id str

Rollout ID

required

Returns:

Type Description
CanaryRollout | None

CanaryRollout or None

get_active_rollouts

get_active_rollouts() -> list[CanaryRollout]

Get list of active rollouts.

Returns:

Type Description
list[CanaryRollout]

List of active CanaryRollout instances

get_rollout_for_config

get_rollout_for_config(
    config_type: str,
) -> CanaryRollout | None

Get active rollout for a specific config_type.

Parameters:

Name Type Description Default
config_type str

Config type

required

Returns:

Type Description
CanaryRollout | None

Active rollout or None

get_completed_rollouts

get_completed_rollouts(
    limit: int = 20,
) -> list[CanaryRollout]

Get completed rollouts (completed/rolled_back/failed/cancelled).

Parameters:

Name Type Description Default
limit int

Maximum count (default 20)

20

Returns:

Type Description
list[CanaryRollout]

List of completed CanaryRollout instances

collect_metrics

collect_metrics(rollout_id: str) -> list[CanaryMetrics]

Collect rollout metrics (Public API).

Delegates to CanaryMetricsCollector for real-time metric collection.

Parameters:

Name Type Description Default
rollout_id str

Rollout ID

required

Returns:

Type Description
list[CanaryMetrics]

List of CanaryMetrics

RolloutStateMachine

Canary Rollout state transition rules and validation.

Encapsulates all valid state transitions and provides a single point of truth for lifecycle validation.

can_transition

can_transition(
    current: CanaryState, target: CanaryState
) -> bool

Check if a state transition is valid.

Parameters:

Name Type Description Default
current CanaryState

Current state.

required
target CanaryState

Desired target state.

required

Returns:

Type Description
bool

True if the transition is allowed.

transition

transition(
    rollout: CanaryRollout, target: CanaryState
) -> bool

Validate and execute a state transition.

Parameters:

Name Type Description Default
rollout CanaryRollout

The rollout to transition.

required
target CanaryState

Desired target state.

required

Returns:

Type Description
bool

True if the transition was executed successfully.

is_terminal

is_terminal(state: CanaryState) -> bool

Check if a state is terminal (no outgoing transitions).

Parameters:

Name Type Description Default
state CanaryState

The state to check.

required

Returns:

Type Description
bool

True if the state is terminal.

VersionChecker

Configuration version checker.

Detects concurrent modification conflicts with the Optimistic Locking pattern.

Example

checker = VersionChecker()

Check the version

is_valid, info = checker.check( config_type="circuit_breaker", expected_version=8, )

if not is_valid: raise VersionConflictError( expected_version=8, actual_version=info["actual_version"], conflicting_operator=info["changed_by"], config_type="circuit_breaker", )

check

check(
    config_type: str, expected_version: int
) -> tuple[bool, dict]

Check whether the version matches.

Parameters:

Name Type Description Default
config_type str

Configuration type

required
expected_version int

Expected current version

required

Returns:

Name Type Description
bool

(whether it matches, details dict)

Details dict

{"actual_version": int, "changed_by": str}

get_current_version

get_current_version(config_type: str) -> int

Look up the current version number.

Parameters:

Name Type Description Default
config_type str

Configuration type

required

Returns:

Type Description
int

Current version number (0 if absent)

VersionConflictError

VersionConflictError(
    expected_version: int,
    actual_version: int,
    conflicting_operator: str,
    config_type: str,
)

Bases: BaldurError

Config version conflict.

Raised when optimistic locking detects version mismatch.

Attributes:

Name Type Description
expected_version

Expected current version

actual_version

Actual current version

conflicting_operator

Operator who made the conflicting change

config_type

Configuration type

log_canary_action

log_canary_action(
    action: str,
    rollout: CanaryRollout,
    safety_check_result: dict[str, Any] | None = None,
    additional_context: dict[str, Any] | None = None,
) -> None

Audit log for Canary Rollout actions.

Records all Canary actions for forensics and compliance.

Parameters:

Name Type Description Default
action str

Action type (start, promote, rollback, pause, resume, etc.)

required
rollout CanaryRollout

Rollout information

required
safety_check_result dict[str, Any] | None

Pre-check results (chaos conflict, version check, etc.)

None
additional_context dict[str, Any] | None

Additional context (failure reason, metrics, etc.)

None
Example

log_canary_action( action="start", rollout=rollout, safety_check_result={ "chaos_guard": "passed", "config_lock": "acquired", }, additional_context={ "triggerd_by": "api", }, )

log_canary_error

log_canary_error(
    action: str,
    rollout_id: str,
    config_type: str,
    error: Exception,
    operator: str,
) -> None

Log for Canary operation failure.

Parameters:

Name Type Description Default
action str

Attempted action

required
rollout_id str

Rollout ID

required
config_type str

Configuration type

required
error Exception

Raised exception

required
operator str

Operator

required

log_canary_metrics_check

log_canary_metrics_check(
    rollout_id: str,
    stage_name: str,
    metrics: dict[str, Any],
    passed: bool,
    failure_reason: str | None = None,
) -> None

Log for metrics check results.

Records the metrics check results before promotion.

Parameters:

Name Type Description Default
rollout_id str

Rollout ID

required
stage_name str

Stage name

required
metrics dict[str, Any]

Collected metrics

required
passed bool

Whether the check passed

required
failure_reason str | None

Failure reason (when the check fails)

None

get_cross_cluster_notifier

get_cross_cluster_notifier() -> CrossClusterNotifier

Return the CrossClusterNotifier singleton instance.

get_governance_policy_sync

get_governance_policy_sync() -> GovernancePolicySync

Return the GovernancePolicySync singleton instance.

get_propagation_request_service

get_propagation_request_service() -> (
    CrossClusterPropagationRequest
)

Return the CrossClusterPropagationRequest singleton instance.

reset_cross_cluster_services

reset_cross_cluster_services() -> None

Reset the singleton instances (for tests).

get_canary_feature_flag

get_canary_feature_flag() -> CanaryFeatureFlag

Return the CanaryFeatureFlag singleton instance.

reset_canary_feature_flag

reset_canary_feature_flag() -> None

Reset the singleton instance (for testing).

get_canary_safety_interlock

get_canary_safety_interlock() -> CanarySafetyInterlock

Return the CanarySafetyInterlock singleton.

Returns:

Type Description
CanarySafetyInterlock

CanarySafetyInterlock instance

reset_canary_safety_interlock

reset_canary_safety_interlock() -> None

Reset the singleton (for testing).

apply_tier_floor

apply_tier_floor(
    user_criteria: PassCriteria, tier_id: str
) -> PassCriteria

Apply the stricter of the user criterion and the tier floor.

"max" fields: min(user, tier) → a smaller value is stricter "min" fields: max(user, tier) → a larger value is stricter

Parameters:

Name Type Description Default
user_criteria PassCriteria

PassCriteria specified by the user

required
tier_id str

"critical" | "standard" | "non_essential"

required

Returns:

Type Description
PassCriteria

A new PassCriteria with the tier floor applied

get_canary_rollout_service

get_canary_rollout_service() -> CanaryRolloutService

Return the CanaryRolloutService singleton.

Returns:

Type Description
CanaryRolloutService

CanaryRolloutService instance

reset_canary_rollout_service

reset_canary_rollout_service() -> None

Reset the singleton (for testing).

Warning

Do not use in production.

is_canary_operation

is_canary_operation() -> bool

Check if the current execution context is a canary rollout operation.

Uses domain_tag ContextVar to detect canary context.

Returns:

Type Description
bool

True if current context is tagged as canary operation

check_version_and_rollback

check_version_and_rollback(
    config_type: str,
    target_version: int,
    expected_current_version: int,
    rolled_back_by: str,
) -> ConfigVersion

Perform a rollback after checking the version.

Detects a conflict with Optimistic Locking and performs the rollback if there is no conflict. On a conflict, raises VersionConflictError and writes an Audit log.

Parameters:

Name Type Description Default
config_type str

Configuration type (circuit_breaker, dlq, etc.)

required
target_version int

Target version to roll back to

required
expected_current_version int

Expected current version (optimistic lock)

required
rolled_back_by str

Operator performing the rollback (email or username)

required

Returns:

Type Description
ConfigVersion

Newly created rollback version (ConfigVersion)

Raises:

Type Description
VersionConflictError

On a version conflict

ValueError

When the target version does not exist

Example

try: new_version = check_version_and_rollback( config_type="circuit_breaker", target_version=5, expected_current_version=8, rolled_back_by="admin@example.com", ) print(f"Rolled back to v{target_version}, new version: v{new_version.version}") except VersionConflictError as e: print(f"Conflict: {e.conflicting_operator} modified while you were working")