Event Observer
Intent
Enable real-time synchronization of clinical context across applications through FHIR subscriptions and FHIRcast, allowing multiple healthcare applications to coordinate their views and workflows.
Forces
- Interactive Viewing & Context Sync: Clinical workflows require real-time coordination between multiple applications.
- Metadata vs Payload (Imaging): Clinical data (FHIR) and imaging data (DICOM) live in separate systems with different access patterns.
Structure
The Event Observer pattern implements real-time context synchronization using FHIR Subscriptions and FHIRcast protocols to enable loosely coupled application coordination.
Key Components
Event Hub (FHIRcast Hub)
Central coordination point for context events and application subscriptions
Subscription Engine
Manages FHIR Subscription resources and event filtering
Context Filter
Filters events based on subscription criteria and user permissions
Event Transformer
Transforms events between different formats (FHIRcast, FHIR Subscription, custom)
Delivery Service
Routes events to subscribed applications via various transport mechanisms
Behavior
Multi-Application Context Sync
The following diagrams show context synchronization across applications:
Implementation Considerations
FHIRcast Hub Implementation
Central hub for FHIRcast context synchronization managing subscriptions, event distribution, and WebSocket connections across applications.
from dataclasses import dataclass
from typing import Dict, List, Optional, Callable
from datetime import datetime
import asyncio
import json
import websockets
import aiohttp
@dataclass
class ContextEvent:
id: str
timestamp: datetime
event_type: str # patient-open, patient-close, study-open, etc.
context: Dict[str, any]
source_application: str
@dataclass
class Subscription:
id: str
application_id: str
event_types: List[str]
delivery_endpoint: str
delivery_method: str # 'websocket', 'webhook', 'sse'
filters: Dict[str, any] = None
active: bool = True
class FHIRcastHub:
def __init__(self):
self.subscriptions: Dict[str, Subscription] = {}
self.active_sessions: Dict[str, Dict] = {} # session_id -> context
self.websocket_connections: Dict[str, websockets.WebSocketServerProtocol] = {}
async def subscribe(self, subscription: Subscription) -> str:
"""Register application for context events"""
self.subscriptions[subscription.id] = subscription
# If websocket subscription, wait for connection
if subscription.delivery_method == 'websocket':
await self._wait_for_websocket_connection(subscription.application_id)
return subscription.id
async def unsubscribe(self, subscription_id: str):
"""Remove subscription"""
if subscription_id in self.subscriptions:
subscription = self.subscriptions[subscription_id]
# Close websocket connection if applicable
if subscription.delivery_method == 'websocket':
await self._close_websocket_connection(subscription.application_id)
del self.subscriptions[subscription_id]
async def publish_event(self, event: ContextEvent, session_id: str):
"""Publish context event to subscribers"""
# Update session context
if session_id not in self.active_sessions:
self.active_sessions[session_id] = {}
self.active_sessions[session_id].update(event.context)
# Find matching subscriptions
matching_subscriptions = self._find_matching_subscriptions(event, session_id)
# Deliver to subscribers
delivery_tasks = []
for subscription in matching_subscriptions:
task = asyncio.create_task(
self._deliver_event(subscription, event, session_id)
)
delivery_tasks.append(task)
# Wait for all deliveries (with timeout)
await asyncio.gather(*delivery_tasks, return_exceptions=True)
def _find_matching_subscriptions(self, event: ContextEvent, session_id: str) -> List[Subscription]:
"""Find subscriptions that should receive this event"""
matching = []
for subscription in self.subscriptions.values():
# Check if subscription is active
if not subscription.active:
continue
# Check event type filter
if event.event_type not in subscription.event_types:
continue
# Don't send event back to source application
if subscription.application_id == event.source_application:
continue
# Apply additional filters
if subscription.filters and not self._event_matches_filters(event, subscription.filters):
continue
matching.append(subscription)
return matching
async def _deliver_event(self, subscription: Subscription, event: ContextEvent, session_id: str):
"""Deliver event using appropriate delivery method"""
try:
if subscription.delivery_method == 'websocket':
await self._deliver_via_websocket(subscription, event, session_id)
elif subscription.delivery_method == 'webhook':
await self._deliver_via_webhook(subscription, event, session_id)
elif subscription.delivery_method == 'sse':
await self._deliver_via_sse(subscription, event, session_id)
except Exception as e:
print(f"Failed to deliver event to {subscription.application_id}: {e}")
async def _deliver_via_websocket(self, subscription: Subscription,
event: ContextEvent, session_id: str):
"""Deliver event via WebSocket"""
ws_connection = self.websocket_connections.get(subscription.application_id)
if not ws_connection:
return
event_message = {
'id': event.id,
'timestamp': event.timestamp.isoformat(),
'event': {
'hub.topic': session_id,
'hub.event': event.event_type,
'context': event.context
}
}
await ws_connection.send(json.dumps(event_message))
async def _deliver_via_webhook(self, subscription: Subscription,
event: ContextEvent, session_id: str):
"""Deliver event via HTTP webhook"""
payload = {
'timestamp': event.timestamp.isoformat(),
'id': event.id,
'event': {
'hub.topic': session_id,
'hub.event': event.event_type,
'context': event.context
}
}
async with aiohttp.ClientSession() as session:
try:
async with session.post(
subscription.delivery_endpoint,
json=payload,
timeout=aiohttp.ClientTimeout(total=10)
) as response:
if response.status not in [200, 202, 204]:
raise DeliveryError(f"Webhook returned {response.status}")
except asyncio.TimeoutError:
raise DeliveryError("Webhook delivery timeout")
class FHIRSubscriptionHandler:
def __init__(self, fhircast_hub: FHIRcastHub):
self.fhircast_hub = fhircast_hub
self.active_subscriptions: Dict[str, Dict] = {}
async def handle_subscription_notification(self, subscription_id: str, bundle: Dict):
"""Process FHIR Subscription notification"""
# Extract subscription info
subscription_info = self.active_subscriptions.get(subscription_id)
if not subscription_info:
return
# Process each entry in the notification bundle
for entry in bundle.get('entry', []):
resource = entry.get('resource', {})
# Convert to context event
context_event = await self._resource_to_context_event(
resource, subscription_info
)
if context_event:
# Publish via FHIRcast
await self.fhircast_hub.publish_event(
context_event,
subscription_info['session_id']
)
async def _resource_to_context_event(self, resource: Dict, subscription_info: Dict) -> Optional[ContextEvent]:
"""Convert FHIR resource to FHIRcast context event"""
resource_type = resource.get('resourceType')
if resource_type == 'Observation':
# New observation -> potentially update patient context
patient_ref = resource.get('subject', {}).get('reference')
if patient_ref:
return ContextEvent(
id=self._generate_event_id(),
timestamp=datetime.utcnow(),
event_type='observation-update',
context={
'patient': [{'reference': patient_ref}],
'observation': [{'reference': f"{resource_type}/{resource['id']}"}]
},
source_application='fhir-subscription-engine'
)
elif resource_type == 'ImagingStudy':
# New imaging study available
patient_ref = resource.get('subject', {}).get('reference')
if patient_ref:
return ContextEvent(
id=self._generate_event_id(),
timestamp=datetime.utcnow(),
event_type='study-open',
context={
'patient': [{'reference': patient_ref}],
'study': [{'reference': f"{resource_type}/{resource['id']}"}]
},
source_application='fhir-subscription-engine'
)
return None
class ContextualApplication:
"""Base class for context-aware applications"""
def __init__(self, application_id: str, fhircast_hub: FHIRcastHub):
self.application_id = application_id
self.fhircast_hub = fhircast_hub
self.current_context = {}
async def subscribe_to_context(self, event_types: List[str], session_id: str):
"""Subscribe to context events"""
subscription = Subscription(
id=f"{self.application_id}-{session_id}",
application_id=self.application_id,
event_types=event_types,
delivery_endpoint="", # Will use websocket
delivery_method='websocket'
)
await self.fhircast_hub.subscribe(subscription)
async def publish_context_change(self, event_type: str, context: Dict, session_id: str):
"""Publish context change event"""
event = ContextEvent(
id=self._generate_event_id(),
timestamp=datetime.utcnow(),
event_type=event_type,
context=context,
source_application=self.application_id
)
await self.fhircast_hub.publish_event(event, session_id)
async def handle_context_event(self, event: ContextEvent):
"""Handle incoming context event"""
# Update local context
self.current_context.update(event.context)
# Delegate to specific handler
if event.event_type == 'patient-open':
await self._handle_patient_open(event.context)
elif event.event_type == 'study-open':
await self._handle_study_open(event.context)
elif event.event_type == 'patient-close':
await self._handle_patient_close(event.context)
async def _handle_patient_open(self, context: Dict):
"""Handle patient open event"""
pass # Override in subclass
async def _handle_study_open(self, context: Dict):
"""Handle study open event"""
pass # Override in subclass
async def _handle_patient_close(self, context: Dict):
"""Handle patient close event"""
pass # Override in subclass
# Example: Image Viewer that responds to context
class ContextualImageViewer(ContextualApplication):
async def _handle_patient_open(self, context: Dict):
"""Load imaging studies when patient context opens"""
patient_refs = context.get('patient', [])
if patient_refs:
patient_id = patient_refs[0]['reference'].split('/')[-1]
await self._load_patient_studies(patient_id)
async def _handle_study_open(self, context: Dict):
"""Display specific study when study context opens"""
study_refs = context.get('study', [])
if study_refs:
study_id = study_refs[0]['reference'].split('/')[-1]
await self._display_study(study_id)
async def _load_patient_studies(self, patient_id: str):
"""Load all imaging studies for patient"""
# Implementation specific to viewer
pass
async def _display_study(self, study_id: str):
"""Display specific imaging study"""
# Implementation specific to viewer
pass
Integration with Imaging Workflows
Integrates FHIRcast events with imaging workflows for synchronized patient/study selection across EMR and PACS viewers.
class ImagingWorkflowCoordinator:
def __init__(self, fhircast_hub: FHIRcastHub, imaging_bridge):
self.fhircast_hub = fhircast_hub
self.imaging_bridge = imaging_bridge
async def handle_study_available_event(self, imaging_study: Dict):
"""Handle new imaging study becoming available"""
# Extract context information
patient_ref = imaging_study.get('subject', {}).get('reference')
study_uid = None
# Find Study Instance UID from identifiers
for identifier in imaging_study.get('identifier', []):
if identifier.get('type', {}).get('coding', [{}])[0].get('code') == '110180':
study_uid = identifier.get('value')
break
if patient_ref and study_uid:
# Publish study-open event
context_event = ContextEvent(
id=self._generate_event_id(),
timestamp=datetime.utcnow(),
event_type='imaging-study-available',
context={
'patient': [{'reference': patient_ref}],
'study': [{'reference': f"ImagingStudy/{imaging_study['id']}"}],
'studyUID': study_uid
},
source_application='imaging-bridge'
)
# Publish to all sessions where this patient is active
active_sessions = self._find_active_patient_sessions(patient_ref)
for session_id in active_sessions:
await self.fhircast_hub.publish_event(context_event, session_id)
Related Patterns
- IID Facade: IID Facade initiates viewer sessions that Event Observer keeps synchronized
- Imaging Bridge: Imaging Bridge provides study context that triggers synchronization events
- Audit & Provenance Chain: Context changes and synchronization events are logged for audit
- Security Strategy: Event subscriptions require authenticated and authorized sessions
Benefits
- Real-time Synchronization: Immediate context updates across applications
- Loose Coupling: Applications don't need direct knowledge of each other
- Workflow Efficiency: Reduces manual context entry and switching overhead
- Standards Compliance: Built on FHIRcast and FHIR Subscription standards
- Scalability: Event-driven architecture supports many concurrent applications
Trade-offs
- Network Overhead: Real-time events generate additional network traffic
- Complexity: Event-driven architecture is more complex than direct integration
- Reliability: Network issues can disrupt context synchronization
- Security: Context events may contain sensitive information requiring protection
- Event Ordering: Race conditions possible with rapid context changes
References
- FHIRcast - Real-time application synchronization
- FHIR Subscriptions - Event notification mechanism
- Subscriptions R5 Backport - R5 subscriptions in R4
Event Filtering
Implement intelligent filtering to only send relevant events to each application. Consider user permissions and application capabilities when filtering.
Context Conflicts
Handle scenarios where multiple users or sessions have conflicting context changes. Consider session isolation and conflict resolution strategies.