Audit Pipeline¶
Operator guide
For retention policies, observability metrics, and event correlation, see Audit & Observability.
Kubernaut's audit pipeline provides a complete record of every action taken during remediation -- from signal ingestion to effectiveness assessment, including human approval decisions. Every service includes a buffered audit store that batches events and sends them to DataStorage for persistent storage.
Architecture¶
graph LR
subgraph Service["Each Kubernaut Service"]
H[Handler / Reconciler] -->|StoreAudit| B[Buffered Store]
B -->|batch POST| DS
end
DS[DataStorage<br/>REST API] --> PG[(PostgreSQL<br/>audit_events)]
DS -.->|"single-event paths only"| RD[(Valkey<br/>DLQ)]
Design Principles¶
- Fire-and-forget -- Audit failures never block remediation (DD-AUDIT-002)
- Buffered batching -- Events are queued in-memory and sent in configurable batches
- Graceful shutdown -- Buffers flush on pod termination (DD-007, ADR-032)
- Per-service isolation -- Each service has its own audit client with service-specific event types
- Hash chain integrity -- Events are chained via
SHA256(previous_hash + event_json)for tamper detection
Buffered Audit Store¶
Every Go service instantiates a BufferedAuditStore from pkg/audit/store.go:
Interface¶
| Method | Behavior |
|---|---|
StoreAudit(ctx, event) |
Non-blocking enqueue. If the buffer is full, the event is dropped (not blocking) |
Flush(ctx) |
Blocks until all buffered events are written |
Close() |
Flushes remaining events and stops the background worker |
Flush Triggers¶
Events are flushed in four scenarios:
- Batch full -- When the in-memory batch reaches
BatchSize, it is sent immediately - Timer -- Every
FlushInterval, the current batch is sent regardless of size - Explicit flush --
Flush()drains the buffer and sends all remaining events - Shutdown --
Close()performs a final flush before stopping
Retry Logic¶
- Max retries: 3 per batch (3 total attempts, 2 backoff waits)
- Backoff: 1s, 4s (quadratic:
n²seconds). A third backoff (9s) would only occur ifMaxRetriesis increased beyond 3. - 4xx errors: No retry (permanent failure)
- 5xx / network errors: Retry with backoff
Configuration¶
| Parameter | Default | Recommendation |
|---|---|---|
BufferSize |
10,000 | 10k--50k (DD-AUDIT-004) |
BatchSize |
1,000 (library default; Gateway chart overrides to 100) | 100ā1,000 |
FlushInterval |
1 second | 1s |
MaxRetries |
3 | 3 |
Observability¶
The buffered store exposes one registered Prometheus metric:
| Metric | Type | Labels | Description |
|---|---|---|---|
audit_events_dropped_total |
CounterVec | service |
Events dropped due to full buffer |
Additionally, internal atomic counters (buffered_count, written_count, failed_batch_count) are tracked for debug logging but are not registered as Prometheus metrics.
Event Flow¶
- A handler or reconciler calls
auditStore.StoreAudit(ctx, event)with a structuredAuditEvent - The buffered store enqueues the event into an in-memory channel (non-blocking)
- A background goroutine batches events and sends them via
POST /api/v1/audit/events/batchto DataStorage - DataStorage validates, converts, and inserts the batch into the
audit_eventsPostgreSQL table within a transaction - On PostgreSQL failure, the batch endpoint returns HTTP 500 ā the caller's retry logic handles re-delivery. The batch handler does not use the Valkey DLQ (this is a known gap; see GAP-10 in the DataStorage source).
- On shutdown,
auditStore.Close()flushes remaining events
Event Structure¶
Every audit event includes:
| Field | Type | Description |
|---|---|---|
event_id |
UUID |
Unique identifier (auto-generated) |
event_version |
string |
Schema version (default: 1.0) |
event_timestamp |
timestamptz |
When the event occurred |
event_type |
string |
Hierarchical type (e.g., gateway.signal.received) |
event_category |
string |
Category (e.g., signal, remediation) |
event_action |
string |
Action (e.g., received, completed) |
event_outcome |
string |
success, failure, pending |
actor_type |
string |
service or human |
actor_id |
string |
Service name or operator identity |
resource_type |
string |
Target resource type |
resource_id |
string |
Target resource identifier |
correlation_id |
string |
Links all events for one remediation |
namespace |
string |
Kubernetes namespace |
event_data |
JSONB |
Service-specific payload |
event_hash |
string |
SHA256 hash chain for integrity |
previous_event_hash |
string |
Previous event's hash |
retention_days |
int |
Default: 2555 (7 years) |
is_sensitive |
bool |
PII flag |
Hash Chain (Tamper Detection)¶
Events form a hash chain for integrity verification:
Fields excluded from the hash: event_hash, previous_event_hash, event_date, legal_hold fields. This enables after-the-fact detection of any audit event modification.
Correlation¶
All audit events for a single remediation share a correlation_id set to the RemediationRequest name (DD-AUDIT-CORRELATION-002). This enables:
- Timeline reconstruction -- Query all events for one remediation in chronological order
- CRD reconstruction -- Rebuild the full RemediationRequest from audit data (see Data Persistence: Reconstruction)
- Cross-service tracing -- Follow a remediation across all services
Emitting Services¶
All 9 Go binaries (cmd/*/main.go: Gateway, Signal Processing, AI Analysis, Remediation Orchestrator, Workflow Execution, Effectiveness Monitor, Notification, Auth Webhook, DataStorage) and HolmesGPT API (Python) emit audit events:
| Service | Event Prefix | Key Events |
|---|---|---|
| Gateway | gateway.* |
signal.received, signal.deduplicated, crd.created, crd.failed |
| Signal Processing | signalprocessing.* |
enrichment.completed, classification.completed, phase.transition |
| AI Analysis | aianalysis.* |
investigation.submitted, analysis.completed, rego.evaluation, approval.decision |
| HolmesGPT API | aiagent.* |
enrichment.completed, enrichment.failed |
| Remediation Orchestrator | orchestrator.* |
lifecycle.created, phase.transition, child.created, timeout |
| Workflow Execution | workflowexecution.* |
selection.completed, execution.started, execution.completed, block.cleared |
| Notification | notification.* |
message.sent, message.failed, message.acknowledged, message.escalated |
| Effectiveness Monitor | effectiveness.* |
health.assessed, hash.computed, alert.assessed, metrics.assessed, assessment.completed |
| Auth Webhook | webhook.*, remediationworkflow.*, actiontype.* |
remediationapprovalrequest.decided, remediationrequest.timeout_modified, notification.cancelled, remediationworkflow.admitted.create, remediationworkflow.admitted.delete, remediationworkflow.admitted.denied, actiontype.admitted.create, actiontype.admitted.update, actiontype.admitted.delete, actiontype.denied.create, actiontype.denied.update, actiontype.denied.delete |
| DataStorage | datastorage.*, workflow.catalog.* |
workflow.created, workflow.updated, actiontype.created, actiontype.updated, actiontype.disabled, actiontype.reenabled, actiontype.disable_denied, workflow.catalog.actions_listed, workflow.catalog.workflows_listed, workflow.catalog.workflow_retrieved, workflow.catalog.selection_validated |
Operator Attribution¶
The admission webhook captures human identity for all operator-driven actions:
| Action | Event Type | What's Recorded |
|---|---|---|
| Approve/reject remediation | webhook.remediationapprovalrequest.decided |
Actor identity, decision, reason |
| Clear execution block | workflowexecution.block.cleared |
Actor identity, execution ref |
| Modify timeout | webhook.remediationrequest.timeout_modified |
Actor identity, old/new values |
| Cancel notification | webhook.notification.cancelled |
Actor identity, notification ref |
| Register workflow (CRD CREATE) | remediationworkflow.admitted.create |
Actor identity, workflow name, version |
| Delete workflow (CRD DELETE) | remediationworkflow.admitted.delete |
Actor identity, workflow name |
| Register action type (CRD CREATE) | actiontype.admitted.create |
Actor identity, action type name |
| Update action type (CRD UPDATE) | actiontype.admitted.update |
Actor identity, action type, changed fields |
| Delete action type (CRD DELETE) | actiontype.admitted.delete |
Actor identity, action type name |
This ensures every human action has a recorded identity, timestamp, and context -- critical for SOC2 Type II readiness.
Dead Letter Queue¶
DataStorage uses Valkey streams as a dead letter queue for single-event write paths (e.g., notification audit events). The batch endpoint (POST /api/v1/audit/events/batch) does not use the DLQ ā it returns HTTP 500 on PostgreSQL failure and relies on the caller's retry logic (see GAP-10 in the source).
| Stream | Purpose |
|---|---|
audit:dlq:events |
Generic audit event batches |
audit:dlq:notifications |
Notification-specific audit events |
audit:dead-letter:{type} |
Events that exceeded retry attempts |
The DLQ uses Valkey consumer groups (XREADGROUP) for reliable delivery and XAck for acknowledgment. Maximum stream length is 10,000 entries.
Next Steps¶
- Data Persistence -- PostgreSQL schema, partitioning, and reconstruction
- Audit & Observability -- User guide for audit features
- System Overview -- How audit fits into the overall architecture