Population Export Pipeline
Intent
Orchestrate large-scale data extraction using FHIR Bulk Data APIs with proper authentication and transformation, enabling research and analytics use cases.
Forces
- Network Latency & Reliability: Healthcare networks often span institutions with varying connectivity quality.
Structure
The Population Export Pipeline pattern orchestrates FHIR Bulk Data export operations with authentication, transformation, and monitoring capabilities.
Key Components
ExportOrchestrator
Coordinates the overall export workflow
BulkDataClient
Interacts with FHIR Bulk Data API endpoints
BackendAuthenticator
Handles SMART Backend Services authentication
TransformationPipeline
Applies transformations to exported data
ProgressMonitor
Tracks export job status and progress
Behavior
Bulk Export Workflow
The following sequence shows the complete bulk data export process:
Export Steps
- Authenticate
- Initiate Export
- Monitor Progress
- Download Data
- Transform Data
- Load Destination
Implementation Considerations
Export Request
Client for initiating FHIR Bulk Data export operations with support for system, group, and patient-level exports with filtering parameters.
from typing import Dict, List, Optional, Any
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
class ExportType(Enum):
SYSTEM = "system" # $export on system level
GROUP = "group" # $export on Group resource
PATIENT = "patient" # $export on Patient resource
class OutputFormat(Enum):
NDJSON = "application/fhir+ndjson"
PARQUET = "application/x-parquet"
@dataclass
class ExportRequest:
"""Configuration for a FHIR Bulk Data Export request"""
export_type: ExportType
output_format: OutputFormat = OutputFormat.NDJSON
since: Optional[datetime] = None
resource_types: List[str] = field(default_factory=list)
type_filters: Dict[str, str] = field(default_factory=dict)
group_id: Optional[str] = None
patient_id: Optional[str] = None
class BulkExportClient:
"""
Client for initiating and managing FHIR Bulk Data Export operations.
Implements the FHIR Bulk Data Access (Flat FHIR) specification.
"""
def __init__(self, fhir_base_url: str, auth_token: str):
self.base_url = fhir_base_url.rstrip('/')
self.auth_token = auth_token
self.session = None # Would be aiohttp.ClientSession in real impl
async def initiate_export(self, request: ExportRequest) -> str:
"""
Initiate a bulk export operation.
Returns the content-location URL for polling status.
"""
# Build export URL based on type
url = self._build_export_url(request)
# Build query parameters
params = self._build_export_params(request)
# Make the kick-off request
headers = {
'Authorization': f'Bearer {self.auth_token}',
'Accept': 'application/fhir+json',
'Prefer': 'respond-async'
}
# In real implementation:
# response = await self.session.get(url, params=params, headers=headers)
#
# if response.status == 202:
# return response.headers['Content-Location']
# else:
# raise ExportError(f"Export initiation failed: {response.status}")
# Placeholder return
return f"{self.base_url}/$export-poll-status/job-12345"
def _build_export_url(self, request: ExportRequest) -> str:
"""Build the export endpoint URL based on export type."""
if request.export_type == ExportType.SYSTEM:
return f"{self.base_url}/$export"
elif request.export_type == ExportType.GROUP:
if not request.group_id:
raise ValueError("group_id required for Group export")
return f"{self.base_url}/Group/{request.group_id}/$export"
elif request.export_type == ExportType.PATIENT:
if request.patient_id:
return f"{self.base_url}/Patient/{request.patient_id}/$export"
else:
return f"{self.base_url}/Patient/$export"
raise ValueError(f"Unknown export type: {request.export_type}")
def _build_export_params(self, request: ExportRequest) -> Dict[str, str]:
"""Build query parameters for export request."""
params = {}
# Output format
params['_outputFormat'] = request.output_format.value
# Since parameter for incremental exports
if request.since:
params['_since'] = request.since.isoformat() + 'Z'
# Resource types to include
if request.resource_types:
params['_type'] = ','.join(request.resource_types)
# Type filters (e.g., Patient?gender=female)
if request.type_filters:
type_filters = []
for resource_type, filter_expr in request.type_filters.items():
type_filters.append(f"{resource_type}?{filter_expr}")
params['_typeFilter'] = ','.join(type_filters)
return params
async def create_system_export(self,
resource_types: List[str] = None,
since: datetime = None) -> str:
"""
Convenience method for system-level export.
"""
request = ExportRequest(
export_type=ExportType.SYSTEM,
resource_types=resource_types or [],
since=since
)
return await self.initiate_export(request)
async def create_group_export(self,
group_id: str,
resource_types: List[str] = None,
since: datetime = None) -> str:
"""
Convenience method for group-level export.
Exports data for all patients in the specified Group.
"""
request = ExportRequest(
export_type=ExportType.GROUP,
group_id=group_id,
resource_types=resource_types or [],
since=since
)
return await self.initiate_export(request)
async def create_patient_export(self,
patient_id: str = None,
resource_types: List[str] = None,
since: datetime = None) -> str:
"""
Convenience method for patient-level export.
If patient_id is None, exports all patients.
"""
request = ExportRequest(
export_type=ExportType.PATIENT,
patient_id=patient_id,
resource_types=resource_types or [],
since=since
)
return await self.initiate_export(request)
# Example usage
async def example_exports():
client = BulkExportClient(
fhir_base_url='https://fhir.example.org/r4',
auth_token='eyJhbGciOiJSUzI1NiIs...'
)
# System-level export of all data
status_url = await client.create_system_export()
print(f"System export started: {status_url}")
# Export specific resource types since last week
from datetime import timedelta
last_week = datetime.utcnow() - timedelta(days=7)
status_url = await client.create_system_export(
resource_types=['Patient', 'Observation', 'Condition'],
since=last_week
)
print(f"Incremental export started: {status_url}")
# Export for a specific cohort (Group)
status_url = await client.create_group_export(
group_id='diabetes-cohort-2024',
resource_types=['Patient', 'Observation', 'MedicationRequest']
)
print(f"Cohort export started: {status_url}")
Progress Monitoring
Monitors long-running export jobs with polling, exponential backoff, progress callbacks, and timeout handling.
from typing import Dict, List, Optional, Callable
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
import asyncio
import time
class ExportStatus(Enum):
PENDING = "pending"
IN_PROGRESS = "in-progress"
COMPLETED = "completed"
ERROR = "error"
CANCELLED = "cancelled"
@dataclass
class ExportProgress:
"""Progress information for a bulk export job"""
status: ExportStatus
progress_percent: float = 0.0
resources_exported: int = 0
estimated_completion: Optional[datetime] = None
current_resource_type: Optional[str] = None
output_files: List[Dict] = field(default_factory=list)
error_message: Optional[str] = None
transaction_time: Optional[datetime] = None
class ExportMonitor:
"""
Monitors FHIR Bulk Data Export jobs with polling and callbacks.
"""
def __init__(self,
fhir_client,
poll_interval: float = 5.0,
max_poll_interval: float = 60.0,
backoff_factor: float = 1.5):
self.fhir_client = fhir_client
self.poll_interval = poll_interval
self.max_poll_interval = max_poll_interval
self.backoff_factor = backoff_factor
self.active_jobs = {}
async def monitor_export(self,
status_url: str,
on_progress: Callable[[ExportProgress], None] = None,
on_complete: Callable[[ExportProgress], None] = None,
on_error: Callable[[ExportProgress], None] = None,
timeout: float = 3600) -> ExportProgress:
"""
Monitor an export job until completion or timeout.
"""
start_time = time.time()
current_interval = self.poll_interval
while True:
# Check timeout
elapsed = time.time() - start_time
if elapsed > timeout:
raise TimeoutError(f"Export timed out after {timeout} seconds")
# Poll status
progress = await self._poll_status(status_url)
# Callback for progress updates
if on_progress:
on_progress(progress)
# Check completion states
if progress.status == ExportStatus.COMPLETED:
if on_complete:
on_complete(progress)
return progress
elif progress.status == ExportStatus.ERROR:
if on_error:
on_error(progress)
raise ExportError(progress.error_message or "Export failed")
elif progress.status == ExportStatus.CANCELLED:
raise ExportCancelled("Export was cancelled")
# Wait before next poll (with exponential backoff)
await asyncio.sleep(current_interval)
current_interval = min(
current_interval * self.backoff_factor,
self.max_poll_interval
)
async def _poll_status(self, status_url: str) -> ExportProgress:
"""
Poll the export status endpoint.
"""
headers = {
'Accept': 'application/json'
}
# In real implementation:
# response = await self.fhir_client.get(status_url, headers=headers)
#
# if response.status == 202:
# # Still in progress
# progress_header = response.headers.get('X-Progress', '0%')
# return ExportProgress(
# status=ExportStatus.IN_PROGRESS,
# progress_percent=self._parse_progress(progress_header)
# )
# elif response.status == 200:
# # Completed
# result = await response.json()
# return self._parse_completion(result)
# else:
# # Error
# return ExportProgress(
# status=ExportStatus.ERROR,
# error_message=f"Status check failed: {response.status}"
# )
# Placeholder for demo
return ExportProgress(
status=ExportStatus.IN_PROGRESS,
progress_percent=50.0,
resources_exported=1000
)
def _parse_progress(self, progress_header: str) -> float:
"""Parse X-Progress header value."""
try:
return float(progress_header.rstrip('%'))
except ValueError:
return 0.0
def _parse_completion(self, result: Dict) -> ExportProgress:
"""Parse completed export response."""
output_files = []
for output in result.get('output', []):
output_files.append({
'type': output.get('type'),
'url': output.get('url'),
'count': output.get('count')
})
return ExportProgress(
status=ExportStatus.COMPLETED,
progress_percent=100.0,
output_files=output_files,
transaction_time=datetime.fromisoformat(
result.get('transactionTime', '').replace('Z', '+00:00')
) if result.get('transactionTime') else None
)
async def cancel_export(self, status_url: str) -> bool:
"""
Request cancellation of an export job.
"""
# In real implementation:
# response = await self.fhir_client.delete(status_url)
# return response.status == 202
return True
def create_progress_callback(self,
callback: Callable[[float, str], None]
) -> Callable[[ExportProgress], None]:
"""
Create a simple progress callback wrapper.
"""
def wrapper(progress: ExportProgress):
message = f"Status: {progress.status.value}"
if progress.current_resource_type:
message += f" ({progress.current_resource_type})"
if progress.resources_exported:
message += f" - {progress.resources_exported} resources"
callback(progress.progress_percent, message)
return wrapper
class ExportError(Exception):
"""Error during bulk export"""
pass
class ExportCancelled(Exception):
"""Export was cancelled"""
pass
# Example usage with progress bar
async def run_monitored_export():
from tqdm import tqdm
# Assume we have fhir_client and status_url
fhir_client = None
status_url = "https://fhir.example.org/$export-poll/job-123"
monitor = ExportMonitor(fhir_client)
# Create progress bar
pbar = tqdm(total=100, desc="Exporting", unit="%")
last_progress = 0
def update_progress(progress: ExportProgress):
nonlocal last_progress
delta = progress.progress_percent - last_progress
pbar.update(delta)
last_progress = progress.progress_percent
pbar.set_postfix({
'status': progress.status.value,
'resources': progress.resources_exported
})
try:
result = await monitor.monitor_export(
status_url,
on_progress=update_progress,
timeout=7200 # 2 hour timeout
)
pbar.close()
print(f"\nExport completed!")
print(f"Output files: {len(result.output_files)}")
for f in result.output_files:
print(f" - {f['type']}: {f['url']} ({f.get('count', '?')} resources)")
except ExportError as e:
pbar.close()
print(f"\nExport failed: {e}")
except TimeoutError:
pbar.close()
print("\nExport timed out")
NDJSON Processing
Processes NDJSON (Newline Delimited JSON) export files with streaming, filtering, transformation, and memory-efficient batch operations.
from typing import Dict, List, Iterator, Any, Optional, Callable
from dataclasses import dataclass
import json
import gzip
from pathlib import Path
import asyncio
from concurrent.futures import ThreadPoolExecutor
@dataclass
class NDJSONFile:
"""Metadata about an NDJSON file from bulk export"""
resource_type: str
url: str
count: Optional[int] = None
class NDJSONProcessor:
"""
Processes NDJSON (Newline Delimited JSON) files from FHIR Bulk Export.
Supports streaming, filtering, and transformation.
"""
def __init__(self, max_workers: int = 4):
self.max_workers = max_workers
self.executor = ThreadPoolExecutor(max_workers=max_workers)
def stream_resources(self, file_path: str) -> Iterator[Dict]:
"""
Stream resources from an NDJSON file.
Memory efficient - processes one resource at a time.
"""
opener = gzip.open if file_path.endswith('.gz') else open
with opener(file_path, 'rt', encoding='utf-8') as f:
for line_num, line in enumerate(f, 1):
line = line.strip()
if not line:
continue
try:
yield json.loads(line)
except json.JSONDecodeError as e:
# Log error but continue processing
print(f"JSON parse error at line {line_num}: {e}")
continue
def filter_resources(self,
file_path: str,
predicate: Callable[[Dict], bool]) -> Iterator[Dict]:
"""
Stream and filter resources based on predicate function.
"""
for resource in self.stream_resources(file_path):
if predicate(resource):
yield resource
def transform_resources(self,
file_path: str,
transformer: Callable[[Dict], Dict]) -> Iterator[Dict]:
"""
Stream and transform resources.
"""
for resource in self.stream_resources(file_path):
yield transformer(resource)
async def process_export_files(self,
files: List[NDJSONFile],
processor: Callable[[Dict], None],
download_dir: str = './downloads') -> Dict:
"""
Process multiple NDJSON files from a bulk export.
Downloads and processes files in parallel.
"""
stats = {
'files_processed': 0,
'resources_processed': 0,
'errors': []
}
# Download files (in real impl, would use aiohttp)
local_files = await self._download_files(files, download_dir)
# Process each file
for file_info, local_path in zip(files, local_files):
try:
count = 0
for resource in self.stream_resources(local_path):
processor(resource)
count += 1
stats['files_processed'] += 1
stats['resources_processed'] += count
except Exception as e:
stats['errors'].append({
'file': file_info.url,
'error': str(e)
})
return stats
async def _download_files(self,
files: List[NDJSONFile],
download_dir: str) -> List[str]:
"""Download NDJSON files from URLs."""
Path(download_dir).mkdir(parents=True, exist_ok=True)
local_paths = []
for file_info in files:
# In real implementation, would download from URL
# Here we just generate expected local path
filename = f"{file_info.resource_type}.ndjson"
local_path = str(Path(download_dir) / filename)
local_paths.append(local_path)
return local_paths
def write_ndjson(self,
output_path: str,
resources: Iterator[Dict],
compress: bool = True):
"""
Write resources to NDJSON file.
"""
if compress and not output_path.endswith('.gz'):
output_path += '.gz'
opener = gzip.open if output_path.endswith('.gz') else open
with opener(output_path, 'wt', encoding='utf-8') as f:
for resource in resources:
f.write(json.dumps(resource, separators=(',', ':')))
f.write('\n')
def merge_files(self,
input_files: List[str],
output_file: str,
transformer: Callable[[Dict], Dict] = None):
"""
Merge multiple NDJSON files into one.
Optionally transform resources during merge.
"""
def resource_generator():
for file_path in input_files:
for resource in self.stream_resources(file_path):
if transformer:
yield transformer(resource)
else:
yield resource
self.write_ndjson(output_file, resource_generator())
def split_by_type(self,
input_file: str,
output_dir: str) -> Dict[str, str]:
"""
Split NDJSON file by resource type.
Returns mapping of resource type to output file.
"""
Path(output_dir).mkdir(parents=True, exist_ok=True)
# Group resources by type
type_files = {}
type_writers = {}
try:
for resource in self.stream_resources(input_file):
resource_type = resource.get('resourceType')
if resource_type not in type_writers:
output_path = str(Path(output_dir) / f"{resource_type}.ndjson")
type_files[resource_type] = output_path
type_writers[resource_type] = open(output_path, 'w', encoding='utf-8')
type_writers[resource_type].write(
json.dumps(resource, separators=(',', ':'))
)
type_writers[resource_type].write('\n')
finally:
for writer in type_writers.values():
writer.close()
return type_files
def count_resources(self, file_path: str) -> Dict[str, int]:
"""
Count resources by type in NDJSON file.
"""
counts = {}
for resource in self.stream_resources(file_path):
resource_type = resource.get('resourceType', 'Unknown')
counts[resource_type] = counts.get(resource_type, 0) + 1
return counts
def validate_resources(self,
file_path: str,
validator: Callable[[Dict], List[str]]) -> List[Dict]:
"""
Validate resources and return list of issues.
"""
issues = []
for line_num, resource in enumerate(self.stream_resources(file_path), 1):
resource_issues = validator(resource)
if resource_issues:
issues.append({
'line': line_num,
'resource_type': resource.get('resourceType'),
'resource_id': resource.get('id'),
'issues': resource_issues
})
return issues
# Example usage
if __name__ == "__main__":
processor = NDJSONProcessor()
# Stream and filter patients
for patient in processor.filter_resources(
'patients.ndjson',
lambda p: p.get('gender') == 'female'
):
print(f"Patient: {patient.get('id')}")
# Count resources
counts = processor.count_resources('export.ndjson')
print(f"Resource counts: {counts}")
Related Patterns
- Asynchronous Invocation: Async Invocation handles the long-running nature of bulk export jobs
- Security Strategy: Backend Services strategy provides system-level authentication for bulk access
- De-Identification Adapter: Exported data may be de-identified for research use cases
- Audit & Provenance Chain: All bulk export operations are comprehensively audited
Benefits
- Scalable Extraction: Handle population-level data volumes
- Standards-Based: Built on FHIR Bulk Data specification
- Secure: Uses SMART Backend Services authentication
- Flexible: Support for filtering, transformation, and multiple destinations
- Observable: Progress tracking and monitoring capabilities
Trade-offs
- Complexity: Multiple steps and components to orchestrate
- Latency: Bulk exports can take hours or days
- Resource Intensive: Significant storage and processing requirements
- Coordination: Managing long-running jobs across system restarts
References
- Bulk Data Access IG - Population-level data export specification
- SMART Backend Services - System-to-system authentication for bulk operations
- NDJSON Format - Newline-delimited JSON for streaming
Incremental Exports
Use _since parameter for incremental exports to reduce data volume and processing time. Track the last export timestamp for each export job.