baldur.services — Security & Notification
Security-violation detection (service, config, violation/severity enums, the
severity lookup table) and the OSS notification value types that surface
incidents. The delivery service (SecurityNotificationService) ships in PRO —
see its reference.
Violation detection
SecurityViolationService
SecurityViolationService(
config: SecurityConfig | None = None,
repository: SecurityIncidentRepository | None = None,
cache: CacheProviderInterface | None = None,
)
Service for handling security violations.
Security violations are NEVER auto-recovered. They are: 1. Immediately blocked 2. Logged with full forensic context 3. Routed to security team for investigation
Usage
service = SecurityViolationService() result = service.handle_violation( violation_type=ViolationType.SIGNATURE_INVALID, request_info={"ip": "1.2.3.4", "user_agent": "..."}, description="Signature validation failed", )
For testing with mock repository
mock_repo = Mock(spec=SecurityIncidentRepository) service = SecurityViolationService(repository=mock_repo)
Initialize the security violation service.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
config
|
SecurityConfig | None
|
Optional configuration, loads from settings if None |
None
|
repository
|
SecurityIncidentRepository | None
|
Optional repository for DI, uses default adapter if None |
None
|
cache
|
CacheProviderInterface | None
|
Optional cache provider for DI, uses default if None |
None
|
repository
property
repository: SecurityIncidentRepository
Get the repository, creating default adapter if needed.
cache
property
cache: CacheProviderInterface
Get the cache provider, creating default if needed.
handle_violation
handle_violation(
violation_type: str | ViolationType,
request_info: dict[str, Any] | None = None,
user_id: int | None = None,
entity_refs: dict[str, int] | None = None,
description: str = "",
raw_request_data: dict[str, Any] | None = None,
) -> SecurityViolationResult
Handle a security violation.
This method: 1. Creates a SecurityIncident record via repository 2. Takes immediate protective action based on violation type 3. Triggers security team notification 4. Returns result with action taken
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
violation_type
|
str | ViolationType
|
Type of security violation |
required |
request_info
|
dict[str, Any] | None
|
Request info dict with 'ip', 'user_agent' keys |
None
|
user_id
|
int | None
|
Associated user ID (if authenticated) |
None
|
entity_refs
|
dict[str, int] | None
|
Related entity references (e.g., {"order_id": 123}) |
None
|
description
|
str
|
Detailed description of the violation |
''
|
raw_request_data
|
dict[str, Any] | None
|
Sanitized request data for forensics |
None
|
Returns:
| Type | Description |
|---|---|
SecurityViolationResult
|
SecurityViolationResult with incident ID and action taken |
record_violation
record_violation(
violation_type: str,
details: dict[str, Any] | None = None,
request_info: dict[str, Any] | None = None,
user_id: int | None = None,
) -> SecurityViolationResult
Simplified interface for recording a violation.
This is a convenience method that wraps handle_violation for cases like CorruptionShield where simpler parameter passing is needed.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
violation_type
|
str
|
Type of violation (e.g., "corruption_injection_attempt") |
required |
details
|
dict[str, Any] | None
|
Violation details dict (becomes description + raw_request_data) |
None
|
request_info
|
dict[str, Any] | None
|
Optional request info with 'ip', 'user_agent' |
None
|
user_id
|
int | None
|
Optional associated user ID |
None
|
Returns:
| Type | Description |
|---|---|
SecurityViolationResult
|
SecurityViolationResult with incident ID and action taken |
is_ip_banned
is_ip_banned(ip_address: str) -> bool
Check if an IP address is banned.
SecurityViolationResult
dataclass
SecurityViolationResult(
success: bool,
incident_id: int | None = None,
action_taken: str = "",
error: str | None = None,
protection_result: ProtectionResult | None = None,
)
Result of security violation handling.
handled
classmethod
handled(
incident_id: int,
action: str,
protection_result: ProtectionResult | None = None,
) -> SecurityViolationResult
Factory for successfully handled violation.
failed
classmethod
failed(error: str) -> SecurityViolationResult
Factory for failed handling.
SecurityConfig
dataclass
SecurityConfig(
rate_limit_window_seconds: int = 60,
rate_limit_max_requests: int = 100,
temporary_ban_hours: int = 1,
permanent_ban_threshold: int = 5,
suspicious_ip_cache_timeout: int = 86400,
injection_ban_hours: int = 24,
failed_login_threshold: int = 5,
suspicious_ip_cache_prefix: str = "security:suspicious_ip:",
banned_ip_cache_prefix: str = "security:banned_ip:",
session_engine: str = "django.contrib.sessions.backends.db",
session_cookie_age: int = 1209600,
)
Configuration for security violation handling.
from_settings
classmethod
from_settings() -> SecurityConfig
Load configuration from settings.
ViolationType
Bases: str, Enum
Types of security violations that never self-heal (domain-neutral).
RECOVERY_LOOP_DETECTED
class-attribute
instance-attribute
RECOVERY_LOOP_DETECTED = 'recovery_loop_detected'
Recovery/adjustment infinite-loop detected - the most severe.
CONFLICTING_ADJUSTMENT
class-attribute
instance-attribute
CONFLICTING_ADJUSTMENT = 'conflicting_adjustment'
Conflicting autonomous adjustment detected (e.g., A→B→A repetition).
HEALING_TIMEOUT
class-attribute
instance-attribute
HEALING_TIMEOUT = 'healing_timeout'
Baldur operation timed out.
FLAPPING_DETECTED
class-attribute
instance-attribute
FLAPPING_DETECTED = 'flapping_detected'
Parameter flapping detected (repeated micro-adjustments).
ANOMALY_STATISTICAL
class-attribute
instance-attribute
ANOMALY_STATISTICAL = 'anomaly_statistical'
L3 statistical anomaly detected (Z-score based).
ANOMALY_BEHAVIORAL
class-attribute
instance-attribute
ANOMALY_BEHAVIORAL = 'anomaly_behavioral'
Behavioral anomaly detected (sequence-pattern deviation).
SCHEMA_VIOLATION
class-attribute
instance-attribute
SCHEMA_VIOLATION = 'schema_violation'
L1 schema violation (missing required field, type mismatch).
BUSINESS_RULE_VIOLATION
class-attribute
instance-attribute
BUSINESS_RULE_VIOLATION = 'business_rule_violation'
L2 business-rule violation.
AUDIT_TAMPERING
class-attribute
instance-attribute
AUDIT_TAMPERING = 'audit_tampering'
Audit-log tampering attempt detected.
HASH_CHAIN_BROKEN
class-attribute
instance-attribute
HASH_CHAIN_BROKEN = 'hash_chain_broken'
ContinuousAuditRecorder hash-chain integrity violation.
WAL_CORRUPTION
class-attribute
instance-attribute
WAL_CORRUPTION = 'wal_corruption'
WAL CRC32 checksum mismatch.
UNAUTHORIZED_OVERRIDE
class-attribute
instance-attribute
UNAUTHORIZED_OVERRIDE = 'unauthorized_override'
Unauthorized configuration-change attempt.
GOVERNANCE_BYPASS_ATTEMPT
class-attribute
instance-attribute
GOVERNANCE_BYPASS_ATTEMPT = 'governance_bypass_attempt'
Kill Switch/Emergency Mode bypass attempt.
PRIVILEGE_ESCALATION
class-attribute
instance-attribute
PRIVILEGE_ESCALATION = 'privilege_escalation'
Privilege-escalation attempt.
Severity
Bases: str, Enum
Severity levels for security incidents.
SEVERITY_BY_VIOLATION_TYPE
module-attribute
SEVERITY_BY_VIOLATION_TYPE: dict[str, Severity] = {
SIGNATURE_INVALID: CRITICAL,
DATA_TAMPERED: CRITICAL,
TOKEN_FORGED: CRITICAL,
REPLAY_ATTACK: CRITICAL,
UNAUTHORIZED_ACCESS: HIGH,
INJECTION_ATTEMPT: HIGH,
RATE_LIMIT_ABUSE: MEDIUM,
SUSPICIOUS_ACTIVITY: MEDIUM,
RECOVERY_LOOP_DETECTED: CRITICAL,
CONFLICTING_ADJUSTMENT: HIGH,
HEALING_TIMEOUT: MEDIUM,
FLAPPING_DETECTED: HIGH,
AUDIT_TAMPERING: CRITICAL,
HASH_CHAIN_BROKEN: CRITICAL,
WAL_CORRUPTION: CRITICAL,
GOVERNANCE_BYPASS_ATTEMPT: CRITICAL,
PRIVILEGE_ESCALATION: CRITICAL,
ANOMALY_STATISTICAL: HIGH,
ANOMALY_BEHAVIORAL: HIGH,
UNAUTHORIZED_OVERRIDE: HIGH,
BUSINESS_RULE_VIOLATION: HIGH,
SCHEMA_VIOLATION: MEDIUM,
}
handle_security_violation
handle_security_violation(
violation_type: str | ViolationType,
request_info: dict[str, Any] | None = None,
user_id: int | None = None,
description: str = "",
**kwargs: Any
) -> SecurityViolationResult
Convenience function to handle a security violation.
This is the main entry point for security violation handling.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
violation_type
|
str | ViolationType
|
Type of security violation |
required |
request_info
|
dict[str, Any] | None
|
Request info dict with 'ip', 'user_agent' keys |
None
|
user_id
|
int | None
|
Associated user ID |
None
|
description
|
str
|
Description of the violation |
''
|
**kwargs
|
Any
|
Additional arguments passed to handle_violation |
{}
|
Returns:
| Type | Description |
|---|---|
SecurityViolationResult
|
SecurityViolationResult |
Notification
The notification value types are OSS; the delivery service that consumes them
(SecurityNotificationService) ships in PRO — see
the PRO notification service reference.
SecurityNotificationResult
dataclass
SecurityNotificationResult(
incident_id: int,
results: list[ChannelDeliveryResult] = list(),
)
Aggregate result of all notification attempts.
all_success
property
all_success: bool
Check if all notifications were successful.
any_success
property
any_success: bool
Check if any notification was successful.
add_result
add_result(result: ChannelDeliveryResult) -> None
Add a notification result.
NotificationConfig
dataclass
NotificationConfig(
slack_webhook_url: str = "",
slack_critical_channel: str = "#critical-alerts",
slack_high_channel: str = "#ops-alerts",
slack_medium_channel: str = "#dev-alerts",
email_critical_recipients: list[str] = list(),
email_high_recipients: list[str] = list(),
sms_critical_recipients: list[str] = list(),
pagerduty_service_key: str = "",
pagerduty_enabled: bool = False,
webhook_urls: list[str] = list(),
webhook_headers: dict[str, str] = dict(),
enabled: bool = True,
dry_run: bool = False,
)
Configuration for security notifications.
from_settings
classmethod
from_settings() -> NotificationConfig
Load configuration from ChannelTargetSettings and NotificationSettings.
NotificationChannel
module-attribute
NotificationChannel = MessageChannel
ChannelDeliveryResult
dataclass
ChannelDeliveryResult(
channel: str,
success: bool,
message: str = "",
error: str | None = None,
)
Result of a notification attempt.
Singleton accessors
One process-wide singleton accessor rounds out this surface. It is built by a generic singleton factory, so it carries no standalone signature page; call it with no arguments to obtain the shared instance:
get_security_violation_service()→ the sharedSecurityViolationService.
The notification service accessor (get_security_notification_service()) ships
in PRO — see the PRO reference.