Skip to content

Event Observer

Intent

Enable real-time synchronization of clinical context across applications through FHIR subscriptions and FHIRcast, allowing multiple healthcare applications to coordinate their views and workflows.

Forces

  • Interactive Viewing & Context Sync: Clinical workflows require real-time coordination between multiple applications.
  • Metadata vs Payload (Imaging): Clinical data (FHIR) and imaging data (DICOM) live in separate systems with different access patterns.

Structure

The Event Observer pattern implements real-time context synchronization using FHIR Subscriptions and FHIRcast protocols to enable loosely coupled application coordination.

Event Observer Architecture

Key Components

Event Hub (FHIRcast Hub)

Central coordination point for context events and application subscriptions

Subscription Engine

Manages FHIR Subscription resources and event filtering

Context Filter

Filters events based on subscription criteria and user permissions

Event Transformer

Transforms events between different formats (FHIRcast, FHIR Subscription, custom)

Delivery Service

Routes events to subscribed applications via various transport mechanisms

Behavior

Multi-Application Context Sync

The following diagrams show context synchronization across applications:

Multi-Application Context Sync

Implementation Considerations

FHIRcast Hub Implementation

Central hub for FHIRcast context synchronization managing subscriptions, event distribution, and WebSocket connections across applications.

FHIRcast Hub Implementation
from dataclasses import dataclass
from typing import Dict, List, Optional, Callable
from datetime import datetime
import asyncio
import json
import websockets
import aiohttp

@dataclass
class ContextEvent:
    id: str
    timestamp: datetime
    event_type: str  # patient-open, patient-close, study-open, etc.
    context: Dict[str, any]
    source_application: str

@dataclass
class Subscription:
    id: str
    application_id: str
    event_types: List[str]
    delivery_endpoint: str
    delivery_method: str  # 'websocket', 'webhook', 'sse'
    filters: Dict[str, any] = None
    active: bool = True

class FHIRcastHub:
    def __init__(self):
        self.subscriptions: Dict[str, Subscription] = {}
        self.active_sessions: Dict[str, Dict] = {}  # session_id -> context
        self.websocket_connections: Dict[str, websockets.WebSocketServerProtocol] = {}

    async def subscribe(self, subscription: Subscription) -> str:
        """Register application for context events"""

        self.subscriptions[subscription.id] = subscription

        # If websocket subscription, wait for connection
        if subscription.delivery_method == 'websocket':
            await self._wait_for_websocket_connection(subscription.application_id)

        return subscription.id

    async def unsubscribe(self, subscription_id: str):
        """Remove subscription"""

        if subscription_id in self.subscriptions:
            subscription = self.subscriptions[subscription_id]

            # Close websocket connection if applicable
            if subscription.delivery_method == 'websocket':
                await self._close_websocket_connection(subscription.application_id)

            del self.subscriptions[subscription_id]

    async def publish_event(self, event: ContextEvent, session_id: str):
        """Publish context event to subscribers"""

        # Update session context
        if session_id not in self.active_sessions:
            self.active_sessions[session_id] = {}

        self.active_sessions[session_id].update(event.context)

        # Find matching subscriptions
        matching_subscriptions = self._find_matching_subscriptions(event, session_id)

        # Deliver to subscribers
        delivery_tasks = []
        for subscription in matching_subscriptions:
            task = asyncio.create_task(
                self._deliver_event(subscription, event, session_id)
            )
            delivery_tasks.append(task)

        # Wait for all deliveries (with timeout)
        await asyncio.gather(*delivery_tasks, return_exceptions=True)

    def _find_matching_subscriptions(self, event: ContextEvent, session_id: str) -> List[Subscription]:
        """Find subscriptions that should receive this event"""

        matching = []

        for subscription in self.subscriptions.values():
            # Check if subscription is active
            if not subscription.active:
                continue

            # Check event type filter
            if event.event_type not in subscription.event_types:
                continue

            # Don't send event back to source application
            if subscription.application_id == event.source_application:
                continue

            # Apply additional filters
            if subscription.filters and not self._event_matches_filters(event, subscription.filters):
                continue

            matching.append(subscription)

        return matching

    async def _deliver_event(self, subscription: Subscription, event: ContextEvent, session_id: str):
        """Deliver event using appropriate delivery method"""

        try:
            if subscription.delivery_method == 'websocket':
                await self._deliver_via_websocket(subscription, event, session_id)
            elif subscription.delivery_method == 'webhook':
                await self._deliver_via_webhook(subscription, event, session_id)
            elif subscription.delivery_method == 'sse':
                await self._deliver_via_sse(subscription, event, session_id)

        except Exception as e:
            print(f"Failed to deliver event to {subscription.application_id}: {e}")

    async def _deliver_via_websocket(self, subscription: Subscription, 
                                   event: ContextEvent, session_id: str):
        """Deliver event via WebSocket"""

        ws_connection = self.websocket_connections.get(subscription.application_id)
        if not ws_connection:
            return

        event_message = {
            'id': event.id,
            'timestamp': event.timestamp.isoformat(),
            'event': {
                'hub.topic': session_id,
                'hub.event': event.event_type,
                'context': event.context
            }
        }

        await ws_connection.send(json.dumps(event_message))

    async def _deliver_via_webhook(self, subscription: Subscription, 
                                 event: ContextEvent, session_id: str):
        """Deliver event via HTTP webhook"""

        payload = {
            'timestamp': event.timestamp.isoformat(),
            'id': event.id,
            'event': {
                'hub.topic': session_id,
                'hub.event': event.event_type,
                'context': event.context
            }
        }

        async with aiohttp.ClientSession() as session:
            try:
                async with session.post(
                    subscription.delivery_endpoint,
                    json=payload,
                    timeout=aiohttp.ClientTimeout(total=10)
                ) as response:
                    if response.status not in [200, 202, 204]:
                        raise DeliveryError(f"Webhook returned {response.status}")
            except asyncio.TimeoutError:
                raise DeliveryError("Webhook delivery timeout")

class FHIRSubscriptionHandler:
    def __init__(self, fhircast_hub: FHIRcastHub):
        self.fhircast_hub = fhircast_hub
        self.active_subscriptions: Dict[str, Dict] = {}

    async def handle_subscription_notification(self, subscription_id: str, bundle: Dict):
        """Process FHIR Subscription notification"""

        # Extract subscription info
        subscription_info = self.active_subscriptions.get(subscription_id)
        if not subscription_info:
            return

        # Process each entry in the notification bundle
        for entry in bundle.get('entry', []):
            resource = entry.get('resource', {})

            # Convert to context event
            context_event = await self._resource_to_context_event(
                resource, subscription_info
            )

            if context_event:
                # Publish via FHIRcast
                await self.fhircast_hub.publish_event(
                    context_event, 
                    subscription_info['session_id']
                )

    async def _resource_to_context_event(self, resource: Dict, subscription_info: Dict) -> Optional[ContextEvent]:
        """Convert FHIR resource to FHIRcast context event"""

        resource_type = resource.get('resourceType')

        if resource_type == 'Observation':
            # New observation -> potentially update patient context
            patient_ref = resource.get('subject', {}).get('reference')
            if patient_ref:
                return ContextEvent(
                    id=self._generate_event_id(),
                    timestamp=datetime.utcnow(),
                    event_type='observation-update',
                    context={
                        'patient': [{'reference': patient_ref}],
                        'observation': [{'reference': f"{resource_type}/{resource['id']}"}]
                    },
                    source_application='fhir-subscription-engine'
                )

        elif resource_type == 'ImagingStudy':
            # New imaging study available
            patient_ref = resource.get('subject', {}).get('reference')
            if patient_ref:
                return ContextEvent(
                    id=self._generate_event_id(),
                    timestamp=datetime.utcnow(),
                    event_type='study-open',
                    context={
                        'patient': [{'reference': patient_ref}],
                        'study': [{'reference': f"{resource_type}/{resource['id']}"}]
                    },
                    source_application='fhir-subscription-engine'
                )

        return None

class ContextualApplication:
    """Base class for context-aware applications"""

    def __init__(self, application_id: str, fhircast_hub: FHIRcastHub):
        self.application_id = application_id
        self.fhircast_hub = fhircast_hub
        self.current_context = {}

    async def subscribe_to_context(self, event_types: List[str], session_id: str):
        """Subscribe to context events"""

        subscription = Subscription(
            id=f"{self.application_id}-{session_id}",
            application_id=self.application_id,
            event_types=event_types,
            delivery_endpoint="",  # Will use websocket
            delivery_method='websocket'
        )

        await self.fhircast_hub.subscribe(subscription)

    async def publish_context_change(self, event_type: str, context: Dict, session_id: str):
        """Publish context change event"""

        event = ContextEvent(
            id=self._generate_event_id(),
            timestamp=datetime.utcnow(),
            event_type=event_type,
            context=context,
            source_application=self.application_id
        )

        await self.fhircast_hub.publish_event(event, session_id)

    async def handle_context_event(self, event: ContextEvent):
        """Handle incoming context event"""

        # Update local context
        self.current_context.update(event.context)

        # Delegate to specific handler
        if event.event_type == 'patient-open':
            await self._handle_patient_open(event.context)
        elif event.event_type == 'study-open':
            await self._handle_study_open(event.context)
        elif event.event_type == 'patient-close':
            await self._handle_patient_close(event.context)

    async def _handle_patient_open(self, context: Dict):
        """Handle patient open event"""
        pass  # Override in subclass

    async def _handle_study_open(self, context: Dict):
        """Handle study open event"""
        pass  # Override in subclass

    async def _handle_patient_close(self, context: Dict):
        """Handle patient close event"""
        pass  # Override in subclass

# Example: Image Viewer that responds to context
class ContextualImageViewer(ContextualApplication):
    async def _handle_patient_open(self, context: Dict):
        """Load imaging studies when patient context opens"""

        patient_refs = context.get('patient', [])
        if patient_refs:
            patient_id = patient_refs[0]['reference'].split('/')[-1]
            await self._load_patient_studies(patient_id)

    async def _handle_study_open(self, context: Dict):
        """Display specific study when study context opens"""

        study_refs = context.get('study', [])
        if study_refs:
            study_id = study_refs[0]['reference'].split('/')[-1]
            await self._display_study(study_id)

    async def _load_patient_studies(self, patient_id: str):
        """Load all imaging studies for patient"""
        # Implementation specific to viewer
        pass

    async def _display_study(self, study_id: str):
        """Display specific imaging study"""
        # Implementation specific to viewer
        pass

Integration with Imaging Workflows

Integrates FHIRcast events with imaging workflows for synchronized patient/study selection across EMR and PACS viewers.

Integration with Imaging Workflows
class ImagingWorkflowCoordinator:
    def __init__(self, fhircast_hub: FHIRcastHub, imaging_bridge):
        self.fhircast_hub = fhircast_hub
        self.imaging_bridge = imaging_bridge

    async def handle_study_available_event(self, imaging_study: Dict):
        """Handle new imaging study becoming available"""

        # Extract context information
        patient_ref = imaging_study.get('subject', {}).get('reference')
        study_uid = None

        # Find Study Instance UID from identifiers
        for identifier in imaging_study.get('identifier', []):
            if identifier.get('type', {}).get('coding', [{}])[0].get('code') == '110180':
                study_uid = identifier.get('value')
                break

        if patient_ref and study_uid:
            # Publish study-open event
            context_event = ContextEvent(
                id=self._generate_event_id(),
                timestamp=datetime.utcnow(),
                event_type='imaging-study-available',
                context={
                    'patient': [{'reference': patient_ref}],
                    'study': [{'reference': f"ImagingStudy/{imaging_study['id']}"}],
                    'studyUID': study_uid
                },
                source_application='imaging-bridge'
            )

            # Publish to all sessions where this patient is active
            active_sessions = self._find_active_patient_sessions(patient_ref)
            for session_id in active_sessions:
                await self.fhircast_hub.publish_event(context_event, session_id)

  • IID Facade: IID Facade initiates viewer sessions that Event Observer keeps synchronized
  • Imaging Bridge: Imaging Bridge provides study context that triggers synchronization events
  • Audit & Provenance Chain: Context changes and synchronization events are logged for audit
  • Security Strategy: Event subscriptions require authenticated and authorized sessions

Benefits

  • Real-time Synchronization: Immediate context updates across applications
  • Loose Coupling: Applications don't need direct knowledge of each other
  • Workflow Efficiency: Reduces manual context entry and switching overhead
  • Standards Compliance: Built on FHIRcast and FHIR Subscription standards
  • Scalability: Event-driven architecture supports many concurrent applications

Trade-offs

  • Network Overhead: Real-time events generate additional network traffic
  • Complexity: Event-driven architecture is more complex than direct integration
  • Reliability: Network issues can disrupt context synchronization
  • Security: Context events may contain sensitive information requiring protection
  • Event Ordering: Race conditions possible with rapid context changes

References


Event Filtering

Implement intelligent filtering to only send relevant events to each application. Consider user permissions and application capabilities when filtering.

Context Conflicts

Handle scenarios where multiple users or sessions have conflicting context changes. Consider session isolation and conflict resolution strategies.