Skip to content

baldur_pro.services.bulkhead — Bulkhead

Concurrency isolation primitives: BulkheadPolicy, the semaphore and thread-pool bulkheads, and the @bulkhead decorator.

🔒 PRO Feature — requires a baldur-pro license

These symbols ship in the baldur-pro distribution. PRO modules import normally — there is no ImportError. PRO features activate only when baldur.init() runs with a valid BALDUR_LICENSE_KEY; without it the system runs with OSS defaults and register_pro_services() logs entitlement.pro_registration_skipped.

bulkhead

Bulkhead Pattern - prevent cascading failures through resource isolation.

The Bulkhead pattern, originating from ship design, isolates resources so that a failure in one component does not propagate to others.

Key components: - SemaphoreBulkhead: semaphore-based concurrency limit (I/O bound) - AsyncSemaphoreBulkhead: async semaphore-based bulkhead - ThreadPoolBulkhead: thread-pool-based isolation (CPU bound) - BulkheadRegistry: per-domain bulkhead management - @bulkhead: sync/async auto-dispatching decorator

Usage

from baldur_pro.services.bulkhead import ( SemaphoreBulkhead, bulkhead, get_bulkhead_registry, ) from baldur.core.connection_health import ConnectionType

Direct use

bulkhead = SemaphoreBulkhead("my_domain", max_concurrent=10) with bulkhead.acquire(timeout=5.0): do_work()

Registry use

registry = get_bulkhead_registry() db_bulkhead = registry.get(ConnectionType.DATABASE) with db_bulkhead.acquire(): db_operation()

Decorator use

@bulkhead(ConnectionType.DATABASE) def db_operation(): pass

@bulkhead(ConnectionType.DATABASE) async def async_db_operation(): pass

Status: Public

AsyncSemaphoreBulkhead

AsyncSemaphoreBulkhead(name: str, max_concurrent: int = 10)

Async semaphore-based bulkhead.

Limits concurrency in an asyncio environment to prevent resource exhaustion. Does not block the event loop; suited to async I/O work.

Features: - asyncio.Semaphore-based non-blocking wait - asyncio.wait_for-based timeout - Rejection statistics tracking

Parameters:

Name Type Description Default
name str

Bulkhead name (domain identifier)

required
max_concurrent int

Maximum concurrent executions

10

name property

name: str

Bulkhead name.

acquire async

acquire(
    timeout: float | None = None,
) -> AsyncGenerator[None, None]

Acquire a resource asynchronously.

Parameters:

Name Type Description Default
timeout float | None

Wait timeout (seconds). None fails immediately (non-blocking).

None

Yields:

Type Description
AsyncGenerator[None, None]

None

Raises:

Type Description
BulkheadFullError

When resource acquisition fails

try_acquire async

try_acquire(timeout: float | None = None) -> bool

Acquisition attempt, mirroring this class's acquire timeout contract.

Parameters:

Name Type Description Default
timeout float | None

Maximum time (seconds) to wait for capacity — an upper bound on waiting, not a guarantee of it. None means no waiting (immediate verdict).

None

Returns:

Type Description
bool

True on success, False on failure

release async

release() -> None

Release the resource.

get_state

get_state() -> BulkheadState

Return the current state (synchronous method).

Bulkhead

Bases: ABC

Bulkhead abstract interface.

Prevents a failure in one component from propagating to other components through resource isolation.

Usage (context manager): bulkhead = SemaphoreBulkhead("database", max_concurrent=10)

with bulkhead.acquire(timeout=5.0):
    do_database_work()

Usage (decorator): @bulkhead.wrap def do_work(): pass

name abstractmethod property

name: str

Bulkhead name (domain identifier).

acquire abstractmethod

acquire(
    timeout: float | None = None,
) -> Generator[None, None, None]

Acquire the resource (context manager).

Parameters:

Name Type Description Default
timeout float | None

Wait timeout (seconds). If None, fail immediately (non-blocking).

None

Yields:

Type Description
None

None

Raises:

Type Description
BulkheadFullError

When resource acquisition fails

try_acquire abstractmethod

try_acquire(timeout: float | None = None) -> bool

Attempt to acquire the resource.

Parameters:

Name Type Description Default
timeout float | None

Maximum time (seconds) the implementation may wait for capacity — an upper bound on waiting, not a guarantee of it. None means no waiting (immediate verdict). Implementations whose capacity model already buffers bursts (bounded-queue thread pools) may return an immediate verdict for any timeout value.

None

Returns:

Type Description
bool

True if acquisition succeeded, False if it failed

release abstractmethod

release() -> None

Release the resource.

get_state abstractmethod

get_state() -> BulkheadState

Return the current state.

wrap

wrap(fn: Callable[..., T]) -> Callable[..., T]

Decorator that wraps a function with the bulkhead.

Parameters:

Name Type Description Default
fn Callable[..., T]

Function to wrap

required

Returns:

Type Description
Callable[..., T]

Function with the bulkhead applied

BulkheadState dataclass

BulkheadState(
    name: str,
    bulkhead_type: BulkheadType,
    max_concurrent: int,
    active_count: int,
    waiting_count: int,
    rejected_count: int,
    last_rejection_time: datetime | None = None,
)

Bulkhead current state data.

name instance-attribute

name: str

Bulkhead name (domain identifier)

bulkhead_type instance-attribute

bulkhead_type: BulkheadType

Bulkhead type

max_concurrent instance-attribute

max_concurrent: int

Maximum allowed concurrent execution count

active_count instance-attribute

active_count: int

Number of currently running tasks

waiting_count instance-attribute

waiting_count: int

Number of waiting tasks

rejected_count instance-attribute

rejected_count: int

Total number of rejected requests

last_rejection_time class-attribute instance-attribute

last_rejection_time: datetime | None = None

Last rejection time

available_permits property

available_permits: int

Number of available permits.

utilization_percent property

utilization_percent: float

Utilization (0-100%).

BulkheadType

Bases: str, Enum

Bulkhead type.

SEMAPHORE class-attribute instance-attribute

SEMAPHORE = 'semaphore'

Semaphore-based - limits only the concurrent execution count

THREAD_POOL class-attribute instance-attribute

THREAD_POOL = 'thread_pool'

Thread-pool-based - isolated execution in a dedicated thread pool

BulkheadError

BulkheadError(message: str = '', *, code: str = '')

Bases: ResilienceError

Base exception for bulkhead-related failures.

extra_context

extra_context() -> dict

Return structlog-bindable context for bulkhead errors.

BulkheadFullError

BulkheadFullError(
    bulkhead_name: str,
    max_concurrent: int,
    active_count: int,
)

Bases: PolicyRejectedException, BulkheadError

Raised when the bulkhead has no permits available and a new call is rejected.

Multi-inherits PolicyRejectedException so the outer PolicyComposer catch hierarchy classifies bulkhead rejection as PolicyOutcome.REJECTED rather than the generic except Exception branch (which would mislabel them as FAILURE).

Parameters:

Name Type Description Default
bulkhead_name str

bulkhead identifier

required
max_concurrent int

maximum allowed concurrent executions

required
active_count int

number of executions currently active

required

extra_context

extra_context() -> dict

Return structlog-bindable context for bulkhead full errors.

BulkheadNotFoundError

BulkheadNotFoundError(
    bulkhead_name: str, registered_names: list[str]
)

Bases: BulkheadError, KeyError

Raised when a bulkhead is requested for a domain that has not been provisioned.

Custom domains must be provisioned (via register(), get_or_create(), or a policy/settings helper) before a @bulkhead-decorated function is first called, on both sync and async callees.

Multi-inherits KeyError so the registry's long-standing not-found contract stays intact for existing except KeyError consumers (the traffic-gate skip-degradation path, the admin API 404 mapping), while except BulkheadError / except BaldurError also classify it. Multi-inheritance precedent: BulkheadFullError(PolicyRejectedException, BulkheadError).

__str__ is overridden because KeyError.__str__ would otherwise repr-quote the message (the base hierarchy defines no __str__), mangling the actionable text into "Bulkhead not found: ..." with surrounding quotes.

Parameters:

Name Type Description Default
bulkhead_name str

the requested, unregistered domain

required
registered_names list[str]

currently registered compartment names

required

extra_context

extra_context() -> dict

Return structlog-bindable context for bulkhead not-found errors.

BulkheadTimeoutError

BulkheadTimeoutError(bulkhead_name: str, timeout: float)

Bases: TimeoutPolicyError, BulkheadError

Raised when a bulkhead-managed call exceeds its timeout budget.

Emitted by ThreadPoolBulkhead when the wrapped task does not complete within the configured timeout. Multi-inherits TimeoutPolicyError so the outer PolicyComposer catch hierarchy classifies the failure as PolicyOutcome.TIMEOUT (instead of funneling into the generic except Exception branch as FAILURE).

Parameters:

Name Type Description Default
bulkhead_name str

bulkhead identifier

required
timeout float

configured timeout in seconds

required

extra_context

extra_context() -> dict

Return structlog-bindable context for bulkhead timeout errors.

BulkheadMetricsUpdater

BulkheadMetricsUpdater(interval: float = 10.0)

Background thread that periodically refreshes every bulkhead's metrics.

Pulls the current state of every registered bulkhead and writes it into the Prometheus metrics on a schedule.

Usage

updater = BulkheadMetricsUpdater(interval=10.0) updater.start()

... application runs ...

updater.stop()

Parameters:

Name Type Description Default
interval float

metric update interval (seconds)

10.0

start

start() -> None

Start the metrics updater.

stop

stop() -> None

Stop the metrics updater.

AsyncBulkheadPolicy

AsyncBulkheadPolicy(
    async_bulkhead: AsyncSemaphoreBulkhead,
    timeout: float | None = None,
)

Asynchronous Bulkhead Policy — AsyncResiliencePolicy Protocol implementation.

Wraps AsyncSemaphoreBulkhead and returns results in PolicyResult form. Follows the same exception handling contract as the synchronous BulkheadPolicy.

Since AsyncSemaphoreBulkhead is a separate class that does not inherit from Bulkhead(ABC), it is implemented as a class fully separated from the synchronous BulkheadPolicy.

Parameters:

Name Type Description Default
async_bulkhead AsyncSemaphoreBulkhead

AsyncSemaphoreBulkhead instance (DI).

required
timeout float | None

Resource acquisition timeout (seconds). If None, fail immediately.

None

name property

name: str

Policy name.

bulkhead_name property

bulkhead_name: str

Domain identifier of the internal AsyncSemaphoreBulkhead.

execute async

execute(
    func: Callable[..., T],
    *args: Any,
    context: PolicyContext | None = None,
    **kwargs: Any
) -> PolicyResult[T]

Execute the function within the asynchronous bulkhead resource.

BulkheadFullError → absorbed into PolicyResult(outcome=REJECTED). Business exceptions during function execution are re-propagated via raise.

BulkheadPolicy

BulkheadPolicy(
    bulkhead: Bulkhead, timeout: float | None = None
)

Bases: ResiliencePolicy[T]

Synchronous Bulkhead Policy — resource isolation.

Internally reuses the existing Bulkhead ABC implementations. SemaphoreBulkhead runs via the acquire() context manager, and ThreadPoolBulkhead runs via the execute() method.

Exception handling contract (same as CircuitBreakerPolicy): - BulkheadFullError → absorbed into PolicyResult(outcome=REJECTED) - BulkheadTimeoutError → absorbed into PolicyResult(outcome=TIMEOUT) (ThreadPool only) - Business exception during function execution → re-propagated via raise

Parameters:

Name Type Description Default
bulkhead Bulkhead

Bulkhead ABC implementation (SemaphoreBulkhead, ThreadPoolBulkhead). Injected via DI. Registry lookup uses the bulkhead_policy() factory.

required
timeout float | None

Resource acquisition timeout (seconds). If None, fail immediately (Fast Fail). For ThreadPoolBulkhead, used as the execute() execution timeout.

None

name property

name: str

Policy name.

bulkhead_name property

bulkhead_name: str

Domain identifier of the internal Bulkhead instance.

execute

execute(
    func: Callable[..., T],
    *args: Any,
    context: PolicyContext | None = None,
    **kwargs: Any
) -> PolicyResult[T]

Execute the function within the bulkhead resource.

For ThreadPoolBulkhead: - isinstance check → call ThreadPoolBulkhead.execute() - leverages dedicated thread-pool isolation + ContextVar propagation - BulkheadTimeoutError → mapped to PolicyOutcome.TIMEOUT

For SemaphoreBulkhead: - with acquire(timeout) context manager - BulkheadFullError → mapped to PolicyOutcome.REJECTED

Business exceptions during function execution are re-propagated via raise.

BulkheadRegistry

BulkheadRegistry(settings: BulkheadSettings | None = None)

Per-domain bulkhead registry.

Integrates with the existing ConnectionType and also supports custom domains. Reflects runtime configuration changes by subscribing to the CONFIG_UPDATED event.

Features: - Automatic registration of default bulkheads per ConnectionType - Custom domain support - Automatic creation of asynchronous bulkheads - Fine-grained bulkheads per DB alias / cache instance

Parameters:

Name Type Description Default
settings BulkheadSettings | None

Bulkhead settings. Uses defaults if None.

None

close

close() -> None

Unsubscribe EventBus handlers.

Idempotent: safe to call multiple times.

get

get(name: str | ConnectionType) -> Bulkhead

Look up a bulkhead.

Parameters:

Name Type Description Default
name str | ConnectionType

Domain name or ConnectionType

required

Returns:

Type Description
Bulkhead

Bulkhead instance

Raises:

Type Description
BulkheadNotFoundError

Unregistered domain. Subclasses KeyError, so existing except KeyError consumers keep working; the message names the missing domain and lists the registered compartments.

get_or_create

get_or_create(
    name: str,
    max_concurrent: int | None = None,
    bulkhead_type: str = "semaphore",
) -> Bulkhead

Look up or create a bulkhead.

Parameters:

Name Type Description Default
name str

Domain name

required
max_concurrent int | None

Maximum concurrent execution count (default if None)

None
bulkhead_type str

"semaphore" or "thread_pool"

'semaphore'

Returns:

Type Description
Bulkhead

Bulkhead instance

get_async

get_async(
    name: str | ConnectionType,
) -> AsyncSemaphoreBulkhead

Look up an asynchronous bulkhead.

Creates/returns the asynchronous version deriving its capacity from the synchronous twin. Strict: a domain with no synchronous twin is treated identically to get() — provisioning must precede the async lookup, so the async twin can never be a registry-invisible, default-capacity mint.

Parameters:

Name Type Description Default
name str | ConnectionType

Domain name or ConnectionType

required

Returns:

Type Description
AsyncSemaphoreBulkhead

AsyncSemaphoreBulkhead instance

Raises:

Type Description
BulkheadNotFoundError

No synchronous twin for name. Subclasses KeyError; message lists the registered compartments.

get_for_database

get_for_database(alias: str = 'default') -> Bulkhead

Return the bulkhead for a DB alias.

Parameters:

Name Type Description Default
alias str

Django DB alias (default, replica, analytics, etc.)

'default'

Returns:

Type Description
Bulkhead

The bulkhead for the given alias

get_for_cache

get_for_cache(name: str = 'default') -> Bulkhead

Return the bulkhead for a cache instance.

Parameters:

Name Type Description Default
name str

Cache name (default, session, etc.)

'default'

Returns:

Type Description
Bulkhead

The bulkhead for the given cache

register

register(bulkhead: Bulkhead) -> None

Register a custom bulkhead.

Overwriting a built-in name (one of the four ConnectionType values) is allowed — it is the only path for swapping a built-in's implementation type and is load-bearing for the re-register capacity flow — but the override is transient (the next config reload clobbers it back to the settings-derived built-in), so a WARNING is emitted to flag the footgun.

The async twin for the same name is invalidated so async callees pick up the new capacity on next access.

Parameters:

Name Type Description Default
bulkhead Bulkhead

Bulkhead to register

required

unregister

unregister(name: str) -> bool

Unregister a bulkhead.

The async twin for the same name is invalidated alongside the sync entry.

Parameters:

Name Type Description Default
name str

Domain name

required

Returns:

Type Description
bool

True if unregistration succeeded, False if the name was not registered

Raises:

Type Description
ValueError

name is a built-in compartment (one of the four ConnectionType values). Built-ins are settings-owned protection compartments — removing one silently degrades isolation and does not even persist (the next config reload rebuilds it), so the mutation is blocked rather than allowed.

get_all_states

get_all_states() -> dict[str, BulkheadState]

Return all bulkhead states.

list_names

list_names() -> list[str]

Return all registered domain names.

SemaphoreBulkhead

SemaphoreBulkhead(
    name: str, max_concurrent: int = 10, fair: bool = True
)

Bases: Bulkhead

Semaphore-based bulkhead.

Limits the concurrent execution count to prevent resource exhaustion. Suitable for I/O-bound work (DB queries, cache lookups, etc.).

Features: - Maximum concurrent execution count limit - Timeout-based wait support - Rejection statistics tracking

Parameters:

Name Type Description Default
name str

Bulkhead name (domain identifier)

required
max_concurrent int

Maximum concurrent execution count

10
fair bool

If True, guarantees FIFO ordering (future implementation)

True

name property

name: str

Bulkhead name.

acquire

acquire(
    timeout: float | None = None,
) -> Generator[None, None, None]

Acquire the resource.

Parameters:

Name Type Description Default
timeout float | None

Wait timeout (seconds). If None, fail immediately (non-blocking).

None

Yields:

Type Description
None

None

Raises:

Type Description
BulkheadFullError

When resource acquisition fails

try_acquire

try_acquire(timeout: float | None = None) -> bool

Attempt to acquire. Non-blocking if timeout is None, otherwise waits up to the given time.

Parameters:

Name Type Description Default
timeout float | None

Wait timeout (seconds). If None, succeed/fail immediately.

None

Returns:

Type Description
bool

True if acquisition succeeded, False if it failed

release

release() -> None

Release the resource.

get_state

get_state() -> BulkheadState

Return the current state.

ThreadPoolBulkhead

ThreadPoolBulkhead(
    name: str,
    max_workers: int = 5,
    queue_size: int = 10,
    thread_name_prefix: str | None = None,
)

Bases: Bulkhead

Thread-pool-based bulkhead.

Provides complete isolation by running tasks in a dedicated thread pool. Request ID, tracing context, etc. are propagated to the worker thread via contextvars.copy_context().

Features: - Complete isolation with a dedicated thread pool - Automatic ContextVar propagation (test mode, request overrides, etc.) - Wait queue size limit - Timeout support

Parameters:

Name Type Description Default
name str

Bulkhead name (domain identifier)

required
max_workers int

Maximum number of worker threads

5
queue_size int

Wait queue size (requests are rejected when exceeded)

10
thread_name_prefix str | None

Thread name prefix

None

name property

name: str

Bulkhead name.

acquire

acquire(
    timeout: float | None = None,
) -> Generator[None, None, None]

For Thread Pool Bulkhead, using submit/execute is recommended. This method is provided for Bulkhead interface compatibility.

Parameters:

Name Type Description Default
timeout float | None

Unused (interface compatibility)

None

Raises:

Type Description
BulkheadFullError

When the queue is full

try_acquire

try_acquire(timeout: float | None = None) -> bool

Acquisition attempt with an immediate verdict.

The bounded wait queue (max_workers + queue_size) is already the burst buffer, so the verdict is immediate for any timeout value — timeout is accepted for contract parity but not waited on. Since timeout is an upper bound on waiting, zero waiting satisfies it.

Parameters:

Name Type Description Default
timeout float | None

Upper bound on waiting (seconds). Accepted for interface parity; this implementation returns an immediate verdict regardless of its value.

None

Returns:

Type Description
bool

True if acquisition succeeded, False if the compartment is full

release

release() -> None

Release the resource.

submit

submit(
    fn: Callable[..., T], *args: Any, **kwargs: Any
) -> Future[T]

Submit a task asynchronously (with ContextVar propagation).

Uses contextvars.copy_context() to propagate the current thread's ContextVars to the worker thread. This keeps the test-mode flag, request overrides, etc. preserved within the thread pool.

Parameters:

Name Type Description Default
fn Callable[..., T]

Function to execute

required
*args Any

Positional arguments

()
**kwargs Any

Keyword arguments

{}

Returns:

Type Description
Future[T]

Future object

Raises:

Type Description
BulkheadFullError

When the wait queue is full

execute

execute(
    fn: Callable[..., T],
    *args: Any,
    timeout: float = 30.0,
    **kwargs: Any
) -> T

Execute a task synchronously.

Calls submit() and waits for the result.

Parameters:

Name Type Description Default
fn Callable[..., T]

Function to execute

required
*args Any

Positional arguments

()
timeout float

Execution timeout (seconds)

30.0
**kwargs Any

Keyword arguments

{}

Returns:

Type Description
T

Function execution result

Raises:

Type Description
BulkheadFullError

When the wait queue is full

BulkheadTimeoutError

When a timeout occurs

get_state

get_state() -> BulkheadState

Return the current state.

shutdown

shutdown(wait: bool = True) -> None

Shut down the thread pool.

Parameters:

Name Type Description Default
wait bool

If True, shut down after waiting for pending tasks to complete

True

bulkhead

bulkhead(
    name: str | ConnectionType,
    timeout: float | None = None,
    fallback: Callable[..., T] | None = None,
) -> Callable[[Callable[..., T]], Callable[..., T]]

Bulkhead decorator (sync/async auto-dispatching).

Detects the function type via asyncio.iscoroutinefunction() and automatically applies the appropriate bulkhead (sync/async).

Custom domains must be provisioned (via get_or_create(), register(), or a policy/settings helper) before the decorated function is first called. An unregistered domain raises BulkheadNotFoundError — naming the registered compartments — on both sync and async callees; the error is raised before any fallback is considered, so fallback does not mask it. The four built-in compartments (database, cache, external_api, message_queue) are always available.

Parameters:

Name Type Description Default
name str | ConnectionType

Domain name or ConnectionType

required
timeout float | None

Resource acquisition wait timeout (seconds). If None, fail immediately.

None
fallback Callable[..., T] | None

Alternative function to call when the bulkhead is full

None

Returns:

Type Description
Callable[[Callable[..., T]], Callable[..., T]]

Decorator with the bulkhead applied

Examples:

Synchronous function (built-in compartment)

@bulkhead(ConnectionType.DATABASE) def db_query(): return execute_query()

Asynchronous function (built-in compartment)

@bulkhead(ConnectionType.DATABASE) async def async_db_query(): return await execute_async_query()

Timeout setting (built-in compartment)

@bulkhead("external_api", timeout=5.0) def call_external_api(): return requests.get(url)

Custom domain with fallback — provision it first, then decorate

from baldur_pro.services.bulkhead import get_bulkhead_registry

get_bulkhead_registry().get_or_create("reports", max_concurrent=5)

@bulkhead("reports", fallback=lambda: {"status": "unavailable"}) def get_data(): return fetch_data()

bulkhead_for_cache

bulkhead_for_cache(
    name: str = "default",
    timeout: float | None = None,
    fallback: Callable[..., T] | None = None,
) -> Callable[[Callable[..., T]], Callable[..., T]]

Per-cache-instance bulkhead decorator.

Internally uses BulkheadPolicy/AsyncBulkheadPolicy and looks up the bulkhead for the given cache via the Registry's get_for_cache().

Parameters:

Name Type Description Default
name str

Cache name (default, session, etc.)

'default'
timeout float | None

Resource acquisition wait timeout (seconds)

None
fallback Callable[..., T] | None

(deprecated) Alternative function to call when the bulkhead is full. Using the BulkheadPolicy + FallbackPolicy combination directly is recommended.

None

Examples:

@bulkhead_for_cache("default") def get_cached_value(key: str): return cache.get(key)

@bulkhead_for_cache("session") def get_session_data(session_id: str): return session_cache.get(session_id)

bulkhead_for_database

bulkhead_for_database(
    alias: str = "default",
    timeout: float | None = None,
    fallback: Callable[..., T] | None = None,
) -> Callable[[Callable[..., T]], Callable[..., T]]

Per-DB-alias bulkhead decorator.

Internally uses BulkheadPolicy/AsyncBulkheadPolicy and looks up the bulkhead for the given alias via the Registry's get_for_database().

Parameters:

Name Type Description Default
alias str

Django DB alias (default, replica, analytics, etc.)

'default'
timeout float | None

Resource acquisition wait timeout (seconds)

None
fallback Callable[..., T] | None

(deprecated) Alternative function to call when the bulkhead is full. Using the BulkheadPolicy + FallbackPolicy combination directly is recommended.

None

Examples:

@bulkhead_for_database("default") def write_to_db(): Model.objects.using("default").create(...)

@bulkhead_for_database("replica") def read_from_replica(): return Model.objects.using("replica").all()

get_metrics_updater

get_metrics_updater(
    interval: float = 10.0,
) -> BulkheadMetricsUpdater

Return the BulkheadMetricsUpdater singleton.

increment_rejected_count

increment_rejected_count(bulkhead_name: str) -> None

Increment the rejected counter.

Parameters:

Name Type Description Default
bulkhead_name str

bulkhead name

required

start_metrics_updater

start_metrics_updater(
    interval: float = 10.0,
) -> BulkheadMetricsUpdater

Start the metrics updater (convenience function).

stop_metrics_updater

stop_metrics_updater() -> None

Stop the metrics updater (convenience function).

update_bulkhead_metrics

update_bulkhead_metrics(
    bulkhead_name: str,
    bulkhead_type: str,
    active_count: int,
    max_concurrent: int,
    waiting_count: int,
    rejected_count: int,
) -> None

Update bulkhead metrics.

Parameters:

Name Type Description Default
bulkhead_name str

bulkhead name

required
bulkhead_type str

bulkhead type (semaphore, thread_pool)

required
active_count int

current number of active requests

required
max_concurrent int

maximum concurrent capacity

required
waiting_count int

number of waiting requests

required
rejected_count int

total rejected requests

required

bulkhead_operation_span

bulkhead_operation_span(
    bulkhead_name: str, operation: str
) -> Generator[dict[str, Any], None, None]

Create a Span for an operation inside a bulkhead (optional).

Parameters:

Name Type Description Default
bulkhead_name str

Bulkhead name

required
operation str

Operation name (e.g., "db_query", "api_call")

required

Yields:

Name Type Description
span_data dict[str, Any]

Dictionary of data to attach to the Span

Examples:

with bulkhead.acquire(): with bulkhead_operation_span("database", "user_query") as span_data: users = User.objects.all() span_data["count"] = len(users)

bulkhead_span

bulkhead_span(
    bulkhead_name: str,
    max_concurrent: int,
    timeout: float | None = None,
) -> Generator[dict[str, Any], None, None]

Create an OpenTelemetry Span for a bulkhead operation (optional).

When OTel is disabled, returns only an empty context and works normally.

Parameters:

Name Type Description Default
bulkhead_name str

Bulkhead name

required
max_concurrent int

Maximum concurrent executions

required
timeout float | None

Timeout setting

None

Yields:

Name Type Description
span_data dict[str, Any]

Dictionary of data to attach to the Span

Examples:

with bulkhead_span("database", 10) as span_data: # Perform work result = do_work() span_data["result_size"] = len(result)

async_bulkhead_policy

async_bulkhead_policy(
    name: str,
    max_concurrent: int | None = None,
    timeout: float | None = None,
) -> AsyncBulkheadPolicy

AsyncBulkheadPolicy factory — BulkheadRegistry singleton integration.

Calls the Registry's get_async() to guarantee a single global AsyncSemaphoreBulkhead instance for the same name.

Parameters:

Name Type Description Default
name str

Domain name (Registry key)

required
max_concurrent int | None

Maximum concurrent execution count (Registry default if None). The synchronous Bulkhead is provisioned first (so the domain is registry-visible to the admin API, metrics, and shutdown iteration) and used as the configuration basis for the asynchronous instance.

None
timeout float | None

Resource acquisition timeout (fail immediately if None)

None

Returns:

Type Description
AsyncBulkheadPolicy

AsyncBulkheadPolicy instance (uses the Registry singleton AsyncSemaphoreBulkhead)

bulkhead_policy

bulkhead_policy(
    name: str,
    max_concurrent: int | None = None,
    timeout: float | None = None,
    bulkhead_type: str = "semaphore",
) -> BulkheadPolicy

BulkheadPolicy factory — BulkheadRegistry singleton integration.

Calls the Registry's get_or_create() to guarantee a single global Bulkhead instance for the same name.

The Registry dependency occurs only in this factory function. The BulkheadPolicy class itself does not know the Registry (test-friendly).

Parameters:

Name Type Description Default
name str

Domain name (Registry key)

required
max_concurrent int | None

Maximum concurrent execution count (Registry default if None)

None
timeout float | None

Resource acquisition timeout (fail immediately if None)

None
bulkhead_type str

"semaphore" or "thread_pool"

'semaphore'

Returns:

Type Description
BulkheadPolicy

BulkheadPolicy instance (uses the Registry singleton Bulkhead)

get_bulkhead_registry

get_bulkhead_registry() -> BulkheadRegistry

Return the BulkheadRegistry singleton.

reset_bulkhead_registry

reset_bulkhead_registry() -> None

Reset the singleton (for testing). Calls close() before clearing.