Audit & Provenance Chain
Intent
Implement end-to-end audit logging and provenance tracking through a chain of responsibility pattern, enabling comprehensive traceability for regulatory compliance.
Forces
- Auditability & Trust: Healthcare systems must maintain comprehensive audit trails for regulatory compliance.
- Granular Sharing & Legal Obligations: Healthcare data sharing must respect complex, context-dependent consent and privacy rules.
- Legacy System Preservation: Healthcare organizations must modernize gradually while preserving existing clinical semantics and workflows.
Structure
The Audit & Provenance Chain pattern implements a chain of responsibility where each handler in the processing chain contributes to the audit trail and provenance record.
Key Components
AuditChain
Chain of responsibility for audit event creation
AuditHandler
Base interface for handlers that contribute audit information
ProvenanceBuilder
Constructs FHIR Provenance resources for data lineage
AuditRepository
Stores and indexes audit events for retrieval
AuditEventFactory
Creates properly formatted FHIR AuditEvent resources
Behavior
Audit Chain Processing
The following sequence shows how audit events are created as data flows through a processing chain:
Chain Processing Steps
- Request Receipt
- Authentication
- Authorization
- Data Access
- Transformation
- Response
Implementation Considerations
AuditEvent Resource
Example FHIR AuditEvent resource following IHE ATNA/BALP profiles with agent identification, source attribution, and entity references.
{
"resourceType": "AuditEvent",
"id": "fhir-access-audit",
"type": {
"system": "http://dicom.nema.org/resources/ontology/DCM",
"code": "110110",
"display": "Patient Record"
},
"subtype": [
{
"system": "http://hl7.org/fhir/restful-interaction",
"code": "read",
"display": "read"
}
],
"action": "R",
"period": {
"start": "2024-01-15T10:30:00Z",
"end": "2024-01-15T10:30:01Z"
},
"recorded": "2024-01-15T10:30:01Z",
"outcome": "0",
"outcomeDesc": "Success",
"agent": [
{
"type": {
"coding": [
{
"system": "http://terminology.hl7.org/CodeSystem/v3-ParticipationType",
"code": "IRCP",
"display": "information recipient"
}
]
},
"who": {
"reference": "Practitioner/dr-smith",
"display": "Dr. Jane Smith"
},
"requestor": true,
"network": {
"address": "192.168.1.100",
"type": "2"
}
},
{
"type": {
"coding": [
{
"system": "http://dicom.nema.org/resources/ontology/DCM",
"code": "110153",
"display": "Source Role ID"
}
]
},
"who": {
"identifier": {
"system": "urn:ietf:rfc:3986",
"value": "urn:oid:2.16.840.1.113883.3.123"
},
"display": "ACME EHR System"
},
"requestor": false,
"network": {
"address": "ehr.acme-healthcare.org",
"type": "1"
}
}
],
"source": {
"site": "ACME Healthcare FHIR Server",
"observer": {
"reference": "Device/fhir-server-01"
},
"type": [
{
"system": "http://terminology.hl7.org/CodeSystem/security-source-type",
"code": "4",
"display": "Application Server"
}
]
},
"entity": [
{
"what": {
"reference": "Patient/12345"
},
"type": {
"system": "http://terminology.hl7.org/CodeSystem/audit-entity-type",
"code": "1",
"display": "Person"
},
"role": {
"system": "http://terminology.hl7.org/CodeSystem/object-role",
"code": "1",
"display": "Patient"
}
},
{
"what": {
"reference": "Observation/lab-result-789"
},
"type": {
"system": "http://terminology.hl7.org/CodeSystem/audit-entity-type",
"code": "2",
"display": "System Object"
},
"role": {
"system": "http://terminology.hl7.org/CodeSystem/object-role",
"code": "4",
"display": "Domain Resource"
},
"securityLabel": [
{
"system": "http://terminology.hl7.org/CodeSystem/v3-Confidentiality",
"code": "R",
"display": "Restricted"
}
]
}
]
}
Chain Handler Implementation
Implements chain of responsibility for audit trail management, tracking requests through multiple processing hops with correlation IDs.
from dataclasses import dataclass, field
from typing import List, Dict, Optional, Any
from datetime import datetime
from enum import Enum
import json
import asyncio
class AuditAction(Enum):
CREATE = "C"
READ = "R"
UPDATE = "U"
DELETE = "D"
EXECUTE = "E"
@dataclass
class AuditContext:
"""Context information for audit trail"""
user_id: str
user_display: str
client_ip: str
client_application: str
session_id: str
correlation_id: str
timestamp: datetime = field(default_factory=datetime.utcnow)
@dataclass
class ProvenanceRecord:
"""Tracks data provenance through the chain"""
resource_id: str
resource_type: str
action: AuditAction
source_system: str
target_system: str
transformation_applied: Optional[str] = None
parent_provenance_id: Optional[str] = None
class AuditChainHandler:
"""
Handles audit trail and provenance chain for FHIR requests
as they flow through multiple systems.
"""
def __init__(self, audit_repository, provenance_repository):
self.audit_repo = audit_repository
self.provenance_repo = provenance_repository
self.chain_validators = []
async def start_chain(self, context: AuditContext,
request: Dict[str, Any]) -> str:
"""
Initialize audit chain for a new request.
Returns chain_id for correlation.
"""
chain_id = self._generate_chain_id()
# Create initial audit event
audit_event = self._create_audit_event(
chain_id=chain_id,
context=context,
action=self._determine_action(request),
stage="initiated",
details={"request_summary": self._summarize_request(request)}
)
await self.audit_repo.store(audit_event)
return chain_id
async def record_hop(self, chain_id: str,
source_system: str,
target_system: str,
context: AuditContext,
resources: List[Dict],
transformation: Optional[str] = None):
"""
Record a hop in the processing chain as data moves
between systems.
"""
# Create provenance records for each resource
provenance_records = []
for resource in resources:
provenance = ProvenanceRecord(
resource_id=resource.get('id'),
resource_type=resource.get('resourceType'),
action=AuditAction.READ, # Or determine from context
source_system=source_system,
target_system=target_system,
transformation_applied=transformation
)
provenance_records.append(provenance)
# Store provenance
await self.provenance_repo.store_batch(chain_id, provenance_records)
# Create audit event for this hop
audit_event = self._create_audit_event(
chain_id=chain_id,
context=context,
action=AuditAction.READ,
stage="processing",
details={
"source": source_system,
"target": target_system,
"resource_count": len(resources),
"transformation": transformation
}
)
await self.audit_repo.store(audit_event)
async def complete_chain(self, chain_id: str,
context: AuditContext,
outcome: str,
response_summary: Dict):
"""
Complete the audit chain when request processing finishes.
"""
audit_event = self._create_audit_event(
chain_id=chain_id,
context=context,
action=AuditAction.EXECUTE,
stage="completed",
details={
"outcome": outcome,
"response_summary": response_summary
}
)
await self.audit_repo.store(audit_event)
# Validate chain integrity
await self._validate_chain(chain_id)
async def record_error(self, chain_id: str,
context: AuditContext,
error: Exception,
stage: str):
"""Record an error in the audit chain."""
audit_event = self._create_audit_event(
chain_id=chain_id,
context=context,
action=AuditAction.EXECUTE,
stage=f"error_{stage}",
details={
"error_type": type(error).__name__,
"error_message": str(error),
"outcome": "error"
}
)
await self.audit_repo.store(audit_event)
async def get_chain_history(self, chain_id: str) -> List[Dict]:
"""Retrieve complete audit trail for a chain."""
audit_events = await self.audit_repo.find_by_chain(chain_id)
provenance_records = await self.provenance_repo.find_by_chain(chain_id)
return {
"chain_id": chain_id,
"audit_events": audit_events,
"provenance": provenance_records,
"integrity_verified": await self._verify_integrity(chain_id)
}
def _create_audit_event(self, chain_id: str,
context: AuditContext,
action: AuditAction,
stage: str,
details: Dict) -> Dict:
"""Create FHIR AuditEvent resource."""
return {
"resourceType": "AuditEvent",
"type": {
"system": "http://dicom.nema.org/resources/ontology/DCM",
"code": "110110",
"display": "Patient Record"
},
"action": action.value,
"recorded": datetime.utcnow().isoformat() + "Z",
"outcome": "0" if "error" not in stage else "8",
"agent": [
{
"who": {
"identifier": {"value": context.user_id},
"display": context.user_display
},
"requestor": True,
"network": {
"address": context.client_ip,
"type": "2"
}
}
],
"extension": [
{
"url": "http://example.org/fhir/audit-chain-id",
"valueString": chain_id
},
{
"url": "http://example.org/fhir/audit-stage",
"valueString": stage
},
{
"url": "http://example.org/fhir/correlation-id",
"valueString": context.correlation_id
}
]
}
def _generate_chain_id(self) -> str:
"""Generate unique chain identifier."""
import uuid
return f"chain-{uuid.uuid4()}"
def _determine_action(self, request: Dict) -> AuditAction:
"""Determine audit action from request."""
method = request.get('method', 'GET').upper()
action_map = {
'GET': AuditAction.READ,
'POST': AuditAction.CREATE,
'PUT': AuditAction.UPDATE,
'PATCH': AuditAction.UPDATE,
'DELETE': AuditAction.DELETE
}
return action_map.get(method, AuditAction.EXECUTE)
def _summarize_request(self, request: Dict) -> Dict:
"""Create summary of request for audit."""
return {
"method": request.get('method'),
"path": request.get('path'),
"resource_type": request.get('resource_type')
}
async def _validate_chain(self, chain_id: str):
"""Run chain validators."""
for validator in self.chain_validators:
await validator.validate(chain_id)
async def _verify_integrity(self, chain_id: str) -> bool:
"""Verify integrity of audit chain."""
# Implementation would check for gaps, tampering, etc.
return True
Provenance Tracking
Creates and manages FHIR Provenance resources to track data lineage, derivations, and transformations through the processing chain.
from dataclasses import dataclass
from typing import List, Dict, Optional, Any
from datetime import datetime
from enum import Enum
class ProvenanceActivity(Enum):
DERIVATION = "derivation"
REVISION = "revision"
QUOTATION = "quotation"
SOURCE = "source"
TRANSFORMATION = "transformation"
@dataclass
class ProvenanceAgent:
"""Agent involved in provenance activity"""
agent_type: str # 'author', 'performer', 'custodian', 'assembler'
who_reference: str
who_display: str
on_behalf_of: Optional[str] = None
@dataclass
class ProvenanceEntity:
"""Entity referenced in provenance"""
role: str # 'derivation', 'revision', 'quotation', 'source'
reference: str
display: Optional[str] = None
class ProvenanceTracker:
"""
Tracks and maintains provenance information for FHIR resources
as they flow through the system.
"""
def __init__(self, fhir_client, provenance_store):
self.fhir_client = fhir_client
self.provenance_store = provenance_store
def create_provenance(self,
target_references: List[str],
activity: ProvenanceActivity,
agents: List[ProvenanceAgent],
entities: List[ProvenanceEntity] = None,
reason: str = None,
policy: List[str] = None) -> Dict:
"""
Create a FHIR Provenance resource.
"""
provenance = {
"resourceType": "Provenance",
"target": [{"reference": ref} for ref in target_references],
"recorded": datetime.utcnow().isoformat() + "Z",
"activity": {
"coding": [{
"system": "http://terminology.hl7.org/CodeSystem/v3-DataOperation",
"code": self._activity_to_code(activity),
"display": activity.value
}]
},
"agent": [self._agent_to_fhir(agent) for agent in agents]
}
if entities:
provenance["entity"] = [
self._entity_to_fhir(entity) for entity in entities
]
if reason:
provenance["reason"] = [{
"coding": [{
"system": "http://terminology.hl7.org/CodeSystem/v3-ActReason",
"code": "TREAT",
"display": reason
}]
}]
if policy:
provenance["policy"] = policy
return provenance
async def track_derivation(self,
source_resource: Dict,
derived_resource: Dict,
transformation: str,
performer: ProvenanceAgent) -> Dict:
"""
Track when a resource is derived from another resource.
Common in data transformations and aggregations.
"""
source_ref = f"{source_resource['resourceType']}/{source_resource['id']}"
derived_ref = f"{derived_resource['resourceType']}/{derived_resource['id']}"
provenance = self.create_provenance(
target_references=[derived_ref],
activity=ProvenanceActivity.DERIVATION,
agents=[performer],
entities=[
ProvenanceEntity(
role="source",
reference=source_ref,
display=f"Source {source_resource['resourceType']}"
)
],
reason=f"Derived via {transformation}"
)
# Store provenance
stored = await self.provenance_store.create(provenance)
return stored
async def track_aggregation(self,
source_resources: List[Dict],
aggregated_resource: Dict,
aggregation_method: str,
performer: ProvenanceAgent) -> Dict:
"""
Track when multiple resources are aggregated into one.
"""
aggregated_ref = f"{aggregated_resource['resourceType']}/{aggregated_resource['id']}"
entities = [
ProvenanceEntity(
role="source",
reference=f"{r['resourceType']}/{r['id']}",
display=f"Source {r['resourceType']}"
)
for r in source_resources
]
provenance = self.create_provenance(
target_references=[aggregated_ref],
activity=ProvenanceActivity.DERIVATION,
agents=[performer],
entities=entities,
reason=f"Aggregation: {aggregation_method}"
)
return await self.provenance_store.create(provenance)
async def track_transmission(self,
resources: List[Dict],
source_system: str,
target_system: str,
transmitter: ProvenanceAgent) -> Dict:
"""
Track when resources are transmitted between systems.
"""
target_refs = [
f"{r['resourceType']}/{r['id']}" for r in resources
]
provenance = self.create_provenance(
target_references=target_refs,
activity=ProvenanceActivity.SOURCE,
agents=[
transmitter,
ProvenanceAgent(
agent_type="custodian",
who_reference=f"Organization/{source_system}",
who_display=f"Source: {source_system}"
),
ProvenanceAgent(
agent_type="custodian",
who_reference=f"Organization/{target_system}",
who_display=f"Target: {target_system}"
)
],
reason="Cross-system data transmission"
)
return await self.provenance_store.create(provenance)
async def get_provenance_chain(self, resource_reference: str) -> List[Dict]:
"""
Retrieve the complete provenance chain for a resource.
Follows entity references to build full history.
"""
chain = []
visited = set()
to_process = [resource_reference]
while to_process:
ref = to_process.pop(0)
if ref in visited:
continue
visited.add(ref)
# Find provenance records for this resource
provenance_records = await self.provenance_store.find_by_target(ref)
for prov in provenance_records:
chain.append(prov)
# Add source entities to process
for entity in prov.get('entity', []):
if entity.get('role') == 'source':
source_ref = entity.get('what', {}).get('reference')
if source_ref and source_ref not in visited:
to_process.append(source_ref)
# Sort by recorded date
chain.sort(key=lambda p: p.get('recorded', ''), reverse=True)
return chain
def _activity_to_code(self, activity: ProvenanceActivity) -> str:
"""Map activity to FHIR code."""
mapping = {
ProvenanceActivity.DERIVATION: "DERIVE",
ProvenanceActivity.REVISION: "UPDATE",
ProvenanceActivity.QUOTATION: "COPY",
ProvenanceActivity.SOURCE: "CREATE",
ProvenanceActivity.TRANSFORMATION: "TRANSFORM"
}
return mapping.get(activity, "CREATE")
def _agent_to_fhir(self, agent: ProvenanceAgent) -> Dict:
"""Convert ProvenanceAgent to FHIR agent structure."""
fhir_agent = {
"type": {
"coding": [{
"system": "http://terminology.hl7.org/CodeSystem/provenance-participant-type",
"code": agent.agent_type
}]
},
"who": {
"reference": agent.who_reference,
"display": agent.who_display
}
}
if agent.on_behalf_of:
fhir_agent["onBehalfOf"] = {"reference": agent.on_behalf_of}
return fhir_agent
def _entity_to_fhir(self, entity: ProvenanceEntity) -> Dict:
"""Convert ProvenanceEntity to FHIR entity structure."""
return {
"role": entity.role,
"what": {
"reference": entity.reference,
"display": entity.display
}
}
Related Patterns
- Broker: Broker triggers audit events for all routed requests
- Security Strategy: Security context from authentication is captured in audit records
- Privacy Enforcement: Privacy decisions and consent evaluations are audited
- Population Export Pipeline: Bulk export operations generate comprehensive audit trails
Benefits
- Comprehensive Tracking: Complete audit trail from request to response
- Standards Compliance: Implements IHE ATNA/BALP requirements
- Data Lineage: Provenance resources track data origins and transformations
- Regulatory Support: Supports HIPAA, GDPR audit requirements
- Flexible Architecture: Chain pattern allows modular audit handlers
Trade-offs
- Storage Requirements: Audit logs can grow rapidly
- Performance Impact: Logging adds overhead to each transaction
- Complexity: Managing audit chain configuration and handlers
- Retention Policies: Balancing compliance with storage costs
References
- FHIR AuditEvent - Audit event resource
- FHIR Provenance - Data lineage tracking
- IHE ATNA - Audit Trail and Node Authentication
- IHE BALP - Basic Audit Log Patterns
Asynchronous Logging
Consider asynchronous audit event storage to minimize impact on transaction latency. Use message queues to decouple audit generation from persistence.