Overview
Compliance audit logs must answer: who did what to which resource, when, from where, and with what result. They must also be tamper-evident — an auditor must be able to verify logs haven't been modified.
Regulatory retention requirements:
| Standard | Retention |
|---|---|
| HIPAA | 6 years |
| PCI DSS | 12 months online + 12 months archive |
| SOC 2 (typical) | 1 year |
| GDPR | Defined by purpose (often 1-3 years) |
Required Log Fields
typescript
interface AuditLogEntry {
// Core required fields
id: string; // UUID — unique event identifier
timestamp: string; // ISO 8601 UTC — when the event occurred
actor_id: string; // User/service that performed the action
actor_type: 'user' | 'service' | 'admin' | 'system';
action: string; // What happened: "read" | "write" | "delete" | "login" | "export"
resource_type: string; // "patient_record" | "invoice" | "user_account"
resource_id: string; // ID of the affected resource
result: 'success' | 'failure' | 'denied';
// Context fields
ip_address: string; // Source IP
user_agent?: string; // Browser/client identifier
session_id?: string; // Session identifier
// Optional fields
changed_fields?: string[]; // For write operations: which fields changed
old_values?: Record<string, unknown>; // Previous values (be careful with PII)
reason?: string; // Clinical justification (HIPAA), business reason
// Tamper-evidence
prev_hash?: string; // Hash of previous log entry (hash chaining)
hash: string; // SHA-256 of this entry
}
Hash Chain Implementation
Hash chaining makes log tampering detectable: each entry includes a hash of the previous entry. Modifying any entry breaks the chain.
python
import json
import hashlib
import uuid
from datetime import datetime, timezone
from typing import Optional
class AuditLogger:
def __init__(self, db_connection):
self.db = db_connection
self._last_hash: Optional[str] = None
def _compute_hash(self, entry: dict, prev_hash: Optional[str]) -> str:
"""Compute SHA-256 of the log entry for tamper-evidence."""
hashable = {
"id": entry["id"],
"timestamp": entry["timestamp"],
"actor_id": entry["actor_id"],
"action": entry["action"],
"resource_type": entry["resource_type"],
"resource_id": entry["resource_id"],
"result": entry["result"],
"prev_hash": prev_hash or "GENESIS"
}
content = json.dumps(hashable, sort_keys=True)
return hashlib.sha256(content.encode()).hexdigest()
async def log(
self,
actor_id: str,
actor_type: str,
action: str,
resource_type: str,
resource_id: str,
result: str,
ip_address: str,
**kwargs
) -> dict:
"""Create and persist a tamper-evident audit log entry."""
# Get last hash for chaining
prev_hash = self._last_hash or await self._get_last_hash()
entry = {
"id": str(uuid.uuid4()),
"timestamp": datetime.now(timezone.utc).isoformat(),
"actor_id": actor_id,
"actor_type": actor_type,
"action": action,
"resource_type": resource_type,
"resource_id": resource_id,
"result": result,
"ip_address": ip_address,
"prev_hash": prev_hash or "GENESIS",
**kwargs
}
entry["hash"] = self._compute_hash(entry, prev_hash)
self._last_hash = entry["hash"]
# Persist to append-only storage
await self.db.audit_logs.insert_one(entry)
return entry
async def verify_chain(self, limit: int = 1000) -> bool:
"""Verify the hash chain integrity."""
entries = await self.db.audit_logs.find(
sort=[("timestamp", 1)],
limit=limit
)
prev_hash = "GENESIS"
for i, entry in enumerate(entries):
expected_hash = self._compute_hash(entry, prev_hash)
if entry["hash"] != expected_hash:
print(f"❌ Chain broken at entry {i}: id={entry['id']}")
return False
if entry["prev_hash"] != prev_hash:
print(f"❌ prev_hash mismatch at entry {i}")
return False
prev_hash = entry["hash"]
print(f"✅ Chain verified: {len(entries)} entries intact")
return True
async def _get_last_hash(self) -> Optional[str]:
last = await self.db.audit_logs.find_one(sort=[("timestamp", -1)])
return last["hash"] if last else None
Express.js Audit Middleware
javascript
const { v4: uuidv4 } = require('uuid');
const crypto = require('crypto');
class AuditLogger {
constructor(db) {
this.db = db;
}
async log({ actorId, actorType = 'user', action, resourceType, resourceId,
result, ipAddress, sessionId, reason, changedFields }) {
const prevHash = await this.getLastHash();
const entry = {
id: uuidv4(),
timestamp: new Date().toISOString(),
actor_id: actorId,
actor_type: actorType,
action,
resource_type: resourceType,
resource_id: resourceId,
result,
ip_address: ipAddress,
session_id: sessionId,
reason,
changed_fields: changedFields,
prev_hash: prevHash || 'GENESIS',
};
entry.hash = this.computeHash(entry);
await this.db.auditLogs.create(entry);
return entry;
}
computeHash(entry) {
const hashable = JSON.stringify({
id: entry.id,
timestamp: entry.timestamp,
actor_id: entry.actor_id,
action: entry.action,
resource_type: entry.resource_type,
resource_id: entry.resource_id,
result: entry.result,
prev_hash: entry.prev_hash,
});
return crypto.createHash('sha256').update(hashable).digest('hex');
}
async getLastHash() {
const last = await this.db.auditLogs.findOne({ order: [['timestamp', 'DESC']] });
return last?.hash || null;
}
}
// Express middleware — auto-log all requests
const createAuditMiddleware = (auditLogger) => async (req, res, next) => {
const start = Date.now();
// Capture response
const originalSend = res.send;
res.send = function(body) {
res.send = originalSend;
// Log after response sent
setImmediate(async () => {
try {
await auditLogger.log({
actorId: req.user?.id || 'anonymous',
actorType: req.user ? 'user' : 'anonymous',
action: `${req.method.toLowerCase()}_${req.route?.path || req.path}`,
resourceType: req.params.resourceType || 'api_request',
resourceId: req.params.id || req.path,
result: res.statusCode < 400 ? 'success' :
res.statusCode === 403 ? 'denied' : 'failure',
ipAddress: req.ip,
sessionId: req.session?.id,
duration_ms: Date.now() - start,
});
} catch (err) {
console.error('Audit log failed:', err);
// Never let audit logging errors break the main flow
}
});
return originalSend.apply(this, arguments);
};
next();
};
PostgreSQL Append-Only Table
sql
-- Append-only audit log table
-- The CHECK constraint and trigger prevent UPDATE/DELETE
CREATE TABLE audit_logs (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
timestamp TIMESTAMPTZ NOT NULL DEFAULT NOW(),
actor_id TEXT NOT NULL,
actor_type TEXT NOT NULL CHECK (actor_type IN ('user', 'service', 'admin', 'system')),
action TEXT NOT NULL,
resource_type TEXT NOT NULL,
resource_id TEXT NOT NULL,
result TEXT NOT NULL CHECK (result IN ('success', 'failure', 'denied')),
ip_address INET,
session_id TEXT,
reason TEXT,
changed_fields TEXT[],
prev_hash TEXT,
hash TEXT NOT NULL UNIQUE
);
-- Create append-only trigger — prevent modification
CREATE OR REPLACE FUNCTION prevent_audit_modification()
RETURNS TRIGGER AS $$
BEGIN
RAISE EXCEPTION 'Audit logs are immutable — modification not allowed';
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER audit_logs_immutable
BEFORE UPDATE OR DELETE ON audit_logs
FOR EACH ROW EXECUTE FUNCTION prevent_audit_modification();
-- Indexes for common queries
CREATE INDEX idx_audit_actor_id ON audit_logs(actor_id);
CREATE INDEX idx_audit_resource ON audit_logs(resource_type, resource_id);
CREATE INDEX idx_audit_timestamp ON audit_logs(timestamp DESC);
CREATE INDEX idx_audit_action ON audit_logs(action);
-- Query: All actions by a user in the last 30 days
SELECT timestamp, action, resource_type, resource_id, result, ip_address
FROM audit_logs
WHERE actor_id = 'user-123'
AND timestamp > NOW() - INTERVAL '30 days'
ORDER BY timestamp DESC;
-- Query: All access to a specific patient record
SELECT timestamp, actor_id, action, result, ip_address, reason
FROM audit_logs
WHERE resource_type = 'patient_record'
AND resource_id = 'patient-456'
ORDER BY timestamp DESC;
-- Query: Failed access attempts (security monitoring)
SELECT actor_id, COUNT(*) as failed_attempts, MAX(timestamp) as last_attempt
FROM audit_logs
WHERE result IN ('failure', 'denied')
AND timestamp > NOW() - INTERVAL '24 hours'
GROUP BY actor_id
HAVING COUNT(*) > 5
ORDER BY failed_attempts DESC;
AWS CloudTrail + Immutable S3
bash
# Create CloudTrail that logs to immutable S3 bucket
aws cloudtrail create-trail \
--name compliance-audit-trail \
--s3-bucket-name my-audit-logs-bucket \
--include-global-service-events \
--is-multi-region-trail \
--enable-log-file-validation # Cryptographic log integrity
# Create S3 bucket with Object Lock (WORM - Write Once Read Many)
aws s3api create-bucket \
--bucket my-audit-logs-bucket \
--object-lock-enabled-for-bucket
# Set default retention: compliance mode (cannot delete even by admin)
aws s3api put-object-lock-configuration \
--bucket my-audit-logs-bucket \
--object-lock-configuration '{
"ObjectLockEnabled": "Enabled",
"Rule": {
"DefaultRetention": {
"Mode": "COMPLIANCE",
"Years": 6
}
}
}'
# Verify log file integrity (CloudTrail)
aws cloudtrail validate-logs \
--trail-arn arn:aws:cloudtrail:us-east-1:123456789:trail/compliance-audit-trail \
--start-time 2024-01-01T00:00:00Z
FastAPI Audit Middleware (Python)
python
from fastapi import FastAPI, Request
import time
app = FastAPI()
@app.middleware("http")
async def audit_middleware(request: Request, call_next):
start_time = time.time()
# Extract context before processing
actor_id = None
if hasattr(request.state, 'user'):
actor_id = request.state.user.id
response = await call_next(request)
duration_ms = int((time.time() - start_time) * 1000)
# Determine result from status code
if response.status_code < 400:
result = "success"
elif response.status_code == 403:
result = "denied"
else:
result = "failure"
# Log asynchronously to avoid blocking response
import asyncio
asyncio.create_task(audit_logger.log(
actor_id=actor_id or "anonymous",
actor_type="user" if actor_id else "anonymous",
action=f"{request.method.lower()}:{request.url.path}",
resource_type="api_endpoint",
resource_id=str(request.url.path),
result=result,
ip_address=request.client.host,
duration_ms=duration_ms,
http_status=response.status_code,
))
return response
Compliance Checklist
- All required fields logged (actor, action, resource, timestamp, IP, result)
- Tamper-evidence implemented (hash chaining or digital signatures)
- Append-only storage (trigger prevents UPDATE/DELETE, or S3 Object Lock)
- Log aggregation centralized (avoid local file logs that can be deleted)
- Retention policy matches regulation (HIPAA: 6yr, PCI: 1yr+1yr archive)
- Alerts for anomalies (failed logins, access spikes, unusual hours)
- Logs reviewed periodically (monthly for SOC 2, daily for high-security)
- Log access itself is audited (who viewed audit logs)
- Backup of audit logs separate from primary storage
- WORM storage for regulatory compliance archives