Skip to content

Audit & Provenance Chain

Intent

Implement end-to-end audit logging and provenance tracking through a chain of responsibility pattern, enabling comprehensive traceability for regulatory compliance.

Forces

  • Auditability & Trust: Healthcare systems must maintain comprehensive audit trails for regulatory compliance.
  • Granular Sharing & Legal Obligations: Healthcare data sharing must respect complex, context-dependent consent and privacy rules.
  • Legacy System Preservation: Healthcare organizations must modernize gradually while preserving existing clinical semantics and workflows.

Structure

The Audit & Provenance Chain pattern implements a chain of responsibility where each handler in the processing chain contributes to the audit trail and provenance record.

Audit & Provenance Chain Class Diagram

Key Components

AuditChain

Chain of responsibility for audit event creation

AuditHandler

Base interface for handlers that contribute audit information

ProvenanceBuilder

Constructs FHIR Provenance resources for data lineage

AuditRepository

Stores and indexes audit events for retrieval

AuditEventFactory

Creates properly formatted FHIR AuditEvent resources

Behavior

Audit Chain Processing

The following sequence shows how audit events are created as data flows through a processing chain:

Audit & Provenance Flow

Chain Processing Steps

  1. Request Receipt
  2. Authentication
  3. Authorization
  4. Data Access
  5. Transformation
  6. Response

Implementation Considerations

AuditEvent Resource

Example FHIR AuditEvent resource following IHE ATNA/BALP profiles with agent identification, source attribution, and entity references.

AuditEvent Resource
{
  "resourceType": "AuditEvent",
  "id": "fhir-access-audit",
  "type": {
    "system": "http://dicom.nema.org/resources/ontology/DCM",
    "code": "110110",
    "display": "Patient Record"
  },
  "subtype": [
    {
      "system": "http://hl7.org/fhir/restful-interaction",
      "code": "read",
      "display": "read"
    }
  ],
  "action": "R",
  "period": {
    "start": "2024-01-15T10:30:00Z",
    "end": "2024-01-15T10:30:01Z"
  },
  "recorded": "2024-01-15T10:30:01Z",
  "outcome": "0",
  "outcomeDesc": "Success",
  "agent": [
    {
      "type": {
        "coding": [
          {
            "system": "http://terminology.hl7.org/CodeSystem/v3-ParticipationType",
            "code": "IRCP",
            "display": "information recipient"
          }
        ]
      },
      "who": {
        "reference": "Practitioner/dr-smith",
        "display": "Dr. Jane Smith"
      },
      "requestor": true,
      "network": {
        "address": "192.168.1.100",
        "type": "2"
      }
    },
    {
      "type": {
        "coding": [
          {
            "system": "http://dicom.nema.org/resources/ontology/DCM",
            "code": "110153",
            "display": "Source Role ID"
          }
        ]
      },
      "who": {
        "identifier": {
          "system": "urn:ietf:rfc:3986",
          "value": "urn:oid:2.16.840.1.113883.3.123"
        },
        "display": "ACME EHR System"
      },
      "requestor": false,
      "network": {
        "address": "ehr.acme-healthcare.org",
        "type": "1"
      }
    }
  ],
  "source": {
    "site": "ACME Healthcare FHIR Server",
    "observer": {
      "reference": "Device/fhir-server-01"
    },
    "type": [
      {
        "system": "http://terminology.hl7.org/CodeSystem/security-source-type",
        "code": "4",
        "display": "Application Server"
      }
    ]
  },
  "entity": [
    {
      "what": {
        "reference": "Patient/12345"
      },
      "type": {
        "system": "http://terminology.hl7.org/CodeSystem/audit-entity-type",
        "code": "1",
        "display": "Person"
      },
      "role": {
        "system": "http://terminology.hl7.org/CodeSystem/object-role",
        "code": "1",
        "display": "Patient"
      }
    },
    {
      "what": {
        "reference": "Observation/lab-result-789"
      },
      "type": {
        "system": "http://terminology.hl7.org/CodeSystem/audit-entity-type",
        "code": "2",
        "display": "System Object"
      },
      "role": {
        "system": "http://terminology.hl7.org/CodeSystem/object-role",
        "code": "4",
        "display": "Domain Resource"
      },
      "securityLabel": [
        {
          "system": "http://terminology.hl7.org/CodeSystem/v3-Confidentiality",
          "code": "R",
          "display": "Restricted"
        }
      ]
    }
  ]
}

Chain Handler Implementation

Implements chain of responsibility for audit trail management, tracking requests through multiple processing hops with correlation IDs.

Chain Handler Implementation
from dataclasses import dataclass, field
from typing import List, Dict, Optional, Any
from datetime import datetime
from enum import Enum
import json
import asyncio

class AuditAction(Enum):
    CREATE = "C"
    READ = "R"
    UPDATE = "U"
    DELETE = "D"
    EXECUTE = "E"

@dataclass
class AuditContext:
    """Context information for audit trail"""
    user_id: str
    user_display: str
    client_ip: str
    client_application: str
    session_id: str
    correlation_id: str
    timestamp: datetime = field(default_factory=datetime.utcnow)

@dataclass
class ProvenanceRecord:
    """Tracks data provenance through the chain"""
    resource_id: str
    resource_type: str
    action: AuditAction
    source_system: str
    target_system: str
    transformation_applied: Optional[str] = None
    parent_provenance_id: Optional[str] = None

class AuditChainHandler:
    """
    Handles audit trail and provenance chain for FHIR requests
    as they flow through multiple systems.
    """

    def __init__(self, audit_repository, provenance_repository):
        self.audit_repo = audit_repository
        self.provenance_repo = provenance_repository
        self.chain_validators = []

    async def start_chain(self, context: AuditContext, 
                         request: Dict[str, Any]) -> str:
        """
        Initialize audit chain for a new request.
        Returns chain_id for correlation.
        """
        chain_id = self._generate_chain_id()

        # Create initial audit event
        audit_event = self._create_audit_event(
            chain_id=chain_id,
            context=context,
            action=self._determine_action(request),
            stage="initiated",
            details={"request_summary": self._summarize_request(request)}
        )

        await self.audit_repo.store(audit_event)

        return chain_id

    async def record_hop(self, chain_id: str, 
                        source_system: str,
                        target_system: str,
                        context: AuditContext,
                        resources: List[Dict],
                        transformation: Optional[str] = None):
        """
        Record a hop in the processing chain as data moves
        between systems.
        """
        # Create provenance records for each resource
        provenance_records = []
        for resource in resources:
            provenance = ProvenanceRecord(
                resource_id=resource.get('id'),
                resource_type=resource.get('resourceType'),
                action=AuditAction.READ,  # Or determine from context
                source_system=source_system,
                target_system=target_system,
                transformation_applied=transformation
            )
            provenance_records.append(provenance)

        # Store provenance
        await self.provenance_repo.store_batch(chain_id, provenance_records)

        # Create audit event for this hop
        audit_event = self._create_audit_event(
            chain_id=chain_id,
            context=context,
            action=AuditAction.READ,
            stage="processing",
            details={
                "source": source_system,
                "target": target_system,
                "resource_count": len(resources),
                "transformation": transformation
            }
        )

        await self.audit_repo.store(audit_event)

    async def complete_chain(self, chain_id: str,
                            context: AuditContext,
                            outcome: str,
                            response_summary: Dict):
        """
        Complete the audit chain when request processing finishes.
        """
        audit_event = self._create_audit_event(
            chain_id=chain_id,
            context=context,
            action=AuditAction.EXECUTE,
            stage="completed",
            details={
                "outcome": outcome,
                "response_summary": response_summary
            }
        )

        await self.audit_repo.store(audit_event)

        # Validate chain integrity
        await self._validate_chain(chain_id)

    async def record_error(self, chain_id: str,
                          context: AuditContext,
                          error: Exception,
                          stage: str):
        """Record an error in the audit chain."""
        audit_event = self._create_audit_event(
            chain_id=chain_id,
            context=context,
            action=AuditAction.EXECUTE,
            stage=f"error_{stage}",
            details={
                "error_type": type(error).__name__,
                "error_message": str(error),
                "outcome": "error"
            }
        )

        await self.audit_repo.store(audit_event)

    async def get_chain_history(self, chain_id: str) -> List[Dict]:
        """Retrieve complete audit trail for a chain."""
        audit_events = await self.audit_repo.find_by_chain(chain_id)
        provenance_records = await self.provenance_repo.find_by_chain(chain_id)

        return {
            "chain_id": chain_id,
            "audit_events": audit_events,
            "provenance": provenance_records,
            "integrity_verified": await self._verify_integrity(chain_id)
        }

    def _create_audit_event(self, chain_id: str,
                           context: AuditContext,
                           action: AuditAction,
                           stage: str,
                           details: Dict) -> Dict:
        """Create FHIR AuditEvent resource."""
        return {
            "resourceType": "AuditEvent",
            "type": {
                "system": "http://dicom.nema.org/resources/ontology/DCM",
                "code": "110110",
                "display": "Patient Record"
            },
            "action": action.value,
            "recorded": datetime.utcnow().isoformat() + "Z",
            "outcome": "0" if "error" not in stage else "8",
            "agent": [
                {
                    "who": {
                        "identifier": {"value": context.user_id},
                        "display": context.user_display
                    },
                    "requestor": True,
                    "network": {
                        "address": context.client_ip,
                        "type": "2"
                    }
                }
            ],
            "extension": [
                {
                    "url": "http://example.org/fhir/audit-chain-id",
                    "valueString": chain_id
                },
                {
                    "url": "http://example.org/fhir/audit-stage",
                    "valueString": stage
                },
                {
                    "url": "http://example.org/fhir/correlation-id",
                    "valueString": context.correlation_id
                }
            ]
        }

    def _generate_chain_id(self) -> str:
        """Generate unique chain identifier."""
        import uuid
        return f"chain-{uuid.uuid4()}"

    def _determine_action(self, request: Dict) -> AuditAction:
        """Determine audit action from request."""
        method = request.get('method', 'GET').upper()
        action_map = {
            'GET': AuditAction.READ,
            'POST': AuditAction.CREATE,
            'PUT': AuditAction.UPDATE,
            'PATCH': AuditAction.UPDATE,
            'DELETE': AuditAction.DELETE
        }
        return action_map.get(method, AuditAction.EXECUTE)

    def _summarize_request(self, request: Dict) -> Dict:
        """Create summary of request for audit."""
        return {
            "method": request.get('method'),
            "path": request.get('path'),
            "resource_type": request.get('resource_type')
        }

    async def _validate_chain(self, chain_id: str):
        """Run chain validators."""
        for validator in self.chain_validators:
            await validator.validate(chain_id)

    async def _verify_integrity(self, chain_id: str) -> bool:
        """Verify integrity of audit chain."""
        # Implementation would check for gaps, tampering, etc.
        return True

Provenance Tracking

Creates and manages FHIR Provenance resources to track data lineage, derivations, and transformations through the processing chain.

Provenance Tracking
from dataclasses import dataclass
from typing import List, Dict, Optional, Any
from datetime import datetime
from enum import Enum

class ProvenanceActivity(Enum):
    DERIVATION = "derivation"
    REVISION = "revision"
    QUOTATION = "quotation"
    SOURCE = "source"
    TRANSFORMATION = "transformation"

@dataclass
class ProvenanceAgent:
    """Agent involved in provenance activity"""
    agent_type: str  # 'author', 'performer', 'custodian', 'assembler'
    who_reference: str
    who_display: str
    on_behalf_of: Optional[str] = None

@dataclass
class ProvenanceEntity:
    """Entity referenced in provenance"""
    role: str  # 'derivation', 'revision', 'quotation', 'source'
    reference: str
    display: Optional[str] = None

class ProvenanceTracker:
    """
    Tracks and maintains provenance information for FHIR resources
    as they flow through the system.
    """

    def __init__(self, fhir_client, provenance_store):
        self.fhir_client = fhir_client
        self.provenance_store = provenance_store

    def create_provenance(self, 
                         target_references: List[str],
                         activity: ProvenanceActivity,
                         agents: List[ProvenanceAgent],
                         entities: List[ProvenanceEntity] = None,
                         reason: str = None,
                         policy: List[str] = None) -> Dict:
        """
        Create a FHIR Provenance resource.
        """
        provenance = {
            "resourceType": "Provenance",
            "target": [{"reference": ref} for ref in target_references],
            "recorded": datetime.utcnow().isoformat() + "Z",
            "activity": {
                "coding": [{
                    "system": "http://terminology.hl7.org/CodeSystem/v3-DataOperation",
                    "code": self._activity_to_code(activity),
                    "display": activity.value
                }]
            },
            "agent": [self._agent_to_fhir(agent) for agent in agents]
        }

        if entities:
            provenance["entity"] = [
                self._entity_to_fhir(entity) for entity in entities
            ]

        if reason:
            provenance["reason"] = [{
                "coding": [{
                    "system": "http://terminology.hl7.org/CodeSystem/v3-ActReason",
                    "code": "TREAT",
                    "display": reason
                }]
            }]

        if policy:
            provenance["policy"] = policy

        return provenance

    async def track_derivation(self,
                              source_resource: Dict,
                              derived_resource: Dict,
                              transformation: str,
                              performer: ProvenanceAgent) -> Dict:
        """
        Track when a resource is derived from another resource.
        Common in data transformations and aggregations.
        """
        source_ref = f"{source_resource['resourceType']}/{source_resource['id']}"
        derived_ref = f"{derived_resource['resourceType']}/{derived_resource['id']}"

        provenance = self.create_provenance(
            target_references=[derived_ref],
            activity=ProvenanceActivity.DERIVATION,
            agents=[performer],
            entities=[
                ProvenanceEntity(
                    role="source",
                    reference=source_ref,
                    display=f"Source {source_resource['resourceType']}"
                )
            ],
            reason=f"Derived via {transformation}"
        )

        # Store provenance
        stored = await self.provenance_store.create(provenance)

        return stored

    async def track_aggregation(self,
                               source_resources: List[Dict],
                               aggregated_resource: Dict,
                               aggregation_method: str,
                               performer: ProvenanceAgent) -> Dict:
        """
        Track when multiple resources are aggregated into one.
        """
        aggregated_ref = f"{aggregated_resource['resourceType']}/{aggregated_resource['id']}"

        entities = [
            ProvenanceEntity(
                role="source",
                reference=f"{r['resourceType']}/{r['id']}",
                display=f"Source {r['resourceType']}"
            )
            for r in source_resources
        ]

        provenance = self.create_provenance(
            target_references=[aggregated_ref],
            activity=ProvenanceActivity.DERIVATION,
            agents=[performer],
            entities=entities,
            reason=f"Aggregation: {aggregation_method}"
        )

        return await self.provenance_store.create(provenance)

    async def track_transmission(self,
                                resources: List[Dict],
                                source_system: str,
                                target_system: str,
                                transmitter: ProvenanceAgent) -> Dict:
        """
        Track when resources are transmitted between systems.
        """
        target_refs = [
            f"{r['resourceType']}/{r['id']}" for r in resources
        ]

        provenance = self.create_provenance(
            target_references=target_refs,
            activity=ProvenanceActivity.SOURCE,
            agents=[
                transmitter,
                ProvenanceAgent(
                    agent_type="custodian",
                    who_reference=f"Organization/{source_system}",
                    who_display=f"Source: {source_system}"
                ),
                ProvenanceAgent(
                    agent_type="custodian", 
                    who_reference=f"Organization/{target_system}",
                    who_display=f"Target: {target_system}"
                )
            ],
            reason="Cross-system data transmission"
        )

        return await self.provenance_store.create(provenance)

    async def get_provenance_chain(self, resource_reference: str) -> List[Dict]:
        """
        Retrieve the complete provenance chain for a resource.
        Follows entity references to build full history.
        """
        chain = []
        visited = set()
        to_process = [resource_reference]

        while to_process:
            ref = to_process.pop(0)
            if ref in visited:
                continue
            visited.add(ref)

            # Find provenance records for this resource
            provenance_records = await self.provenance_store.find_by_target(ref)

            for prov in provenance_records:
                chain.append(prov)

                # Add source entities to process
                for entity in prov.get('entity', []):
                    if entity.get('role') == 'source':
                        source_ref = entity.get('what', {}).get('reference')
                        if source_ref and source_ref not in visited:
                            to_process.append(source_ref)

        # Sort by recorded date
        chain.sort(key=lambda p: p.get('recorded', ''), reverse=True)

        return chain

    def _activity_to_code(self, activity: ProvenanceActivity) -> str:
        """Map activity to FHIR code."""
        mapping = {
            ProvenanceActivity.DERIVATION: "DERIVE",
            ProvenanceActivity.REVISION: "UPDATE",
            ProvenanceActivity.QUOTATION: "COPY",
            ProvenanceActivity.SOURCE: "CREATE",
            ProvenanceActivity.TRANSFORMATION: "TRANSFORM"
        }
        return mapping.get(activity, "CREATE")

    def _agent_to_fhir(self, agent: ProvenanceAgent) -> Dict:
        """Convert ProvenanceAgent to FHIR agent structure."""
        fhir_agent = {
            "type": {
                "coding": [{
                    "system": "http://terminology.hl7.org/CodeSystem/provenance-participant-type",
                    "code": agent.agent_type
                }]
            },
            "who": {
                "reference": agent.who_reference,
                "display": agent.who_display
            }
        }

        if agent.on_behalf_of:
            fhir_agent["onBehalfOf"] = {"reference": agent.on_behalf_of}

        return fhir_agent

    def _entity_to_fhir(self, entity: ProvenanceEntity) -> Dict:
        """Convert ProvenanceEntity to FHIR entity structure."""
        return {
            "role": entity.role,
            "what": {
                "reference": entity.reference,
                "display": entity.display
            }
        }

Benefits

  • Comprehensive Tracking: Complete audit trail from request to response
  • Standards Compliance: Implements IHE ATNA/BALP requirements
  • Data Lineage: Provenance resources track data origins and transformations
  • Regulatory Support: Supports HIPAA, GDPR audit requirements
  • Flexible Architecture: Chain pattern allows modular audit handlers

Trade-offs

  • Storage Requirements: Audit logs can grow rapidly
  • Performance Impact: Logging adds overhead to each transaction
  • Complexity: Managing audit chain configuration and handlers
  • Retention Policies: Balancing compliance with storage costs

References


Asynchronous Logging

Consider asynchronous audit event storage to minimize impact on transaction latency. Use message queues to decouple audit generation from persistence.