Skip to content

Population Export Pipeline

Intent

Orchestrate large-scale data extraction using FHIR Bulk Data APIs with proper authentication and transformation, enabling research and analytics use cases.

Forces

  • Network Latency & Reliability: Healthcare networks often span institutions with varying connectivity quality.

Structure

The Population Export Pipeline pattern orchestrates FHIR Bulk Data export operations with authentication, transformation, and monitoring capabilities.

Population Export Pipeline Architecture

Key Components

ExportOrchestrator

Coordinates the overall export workflow

BulkDataClient

Interacts with FHIR Bulk Data API endpoints

BackendAuthenticator

Handles SMART Backend Services authentication

TransformationPipeline

Applies transformations to exported data

ProgressMonitor

Tracks export job status and progress

Behavior

Bulk Export Workflow

The following sequence shows the complete bulk data export process:

Population Export Sequence

Export Steps

  1. Authenticate
  2. Initiate Export
  3. Monitor Progress
  4. Download Data
  5. Transform Data
  6. Load Destination

Implementation Considerations

Export Request

Client for initiating FHIR Bulk Data export operations with support for system, group, and patient-level exports with filtering parameters.

Export Request
from typing import Dict, List, Optional, Any
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum

class ExportType(Enum):
    SYSTEM = "system"      # $export on system level
    GROUP = "group"        # $export on Group resource
    PATIENT = "patient"    # $export on Patient resource

class OutputFormat(Enum):
    NDJSON = "application/fhir+ndjson"
    PARQUET = "application/x-parquet"

@dataclass
class ExportRequest:
    """Configuration for a FHIR Bulk Data Export request"""
    export_type: ExportType
    output_format: OutputFormat = OutputFormat.NDJSON
    since: Optional[datetime] = None
    resource_types: List[str] = field(default_factory=list)
    type_filters: Dict[str, str] = field(default_factory=dict)
    group_id: Optional[str] = None
    patient_id: Optional[str] = None

class BulkExportClient:
    """
    Client for initiating and managing FHIR Bulk Data Export operations.
    Implements the FHIR Bulk Data Access (Flat FHIR) specification.
    """

    def __init__(self, fhir_base_url: str, auth_token: str):
        self.base_url = fhir_base_url.rstrip('/')
        self.auth_token = auth_token
        self.session = None  # Would be aiohttp.ClientSession in real impl

    async def initiate_export(self, request: ExportRequest) -> str:
        """
        Initiate a bulk export operation.
        Returns the content-location URL for polling status.
        """
        # Build export URL based on type
        url = self._build_export_url(request)

        # Build query parameters
        params = self._build_export_params(request)

        # Make the kick-off request
        headers = {
            'Authorization': f'Bearer {self.auth_token}',
            'Accept': 'application/fhir+json',
            'Prefer': 'respond-async'
        }

        # In real implementation:
        # response = await self.session.get(url, params=params, headers=headers)
        # 
        # if response.status == 202:
        #     return response.headers['Content-Location']
        # else:
        #     raise ExportError(f"Export initiation failed: {response.status}")

        # Placeholder return
        return f"{self.base_url}/$export-poll-status/job-12345"

    def _build_export_url(self, request: ExportRequest) -> str:
        """Build the export endpoint URL based on export type."""
        if request.export_type == ExportType.SYSTEM:
            return f"{self.base_url}/$export"

        elif request.export_type == ExportType.GROUP:
            if not request.group_id:
                raise ValueError("group_id required for Group export")
            return f"{self.base_url}/Group/{request.group_id}/$export"

        elif request.export_type == ExportType.PATIENT:
            if request.patient_id:
                return f"{self.base_url}/Patient/{request.patient_id}/$export"
            else:
                return f"{self.base_url}/Patient/$export"

        raise ValueError(f"Unknown export type: {request.export_type}")

    def _build_export_params(self, request: ExportRequest) -> Dict[str, str]:
        """Build query parameters for export request."""
        params = {}

        # Output format
        params['_outputFormat'] = request.output_format.value

        # Since parameter for incremental exports
        if request.since:
            params['_since'] = request.since.isoformat() + 'Z'

        # Resource types to include
        if request.resource_types:
            params['_type'] = ','.join(request.resource_types)

        # Type filters (e.g., Patient?gender=female)
        if request.type_filters:
            type_filters = []
            for resource_type, filter_expr in request.type_filters.items():
                type_filters.append(f"{resource_type}?{filter_expr}")
            params['_typeFilter'] = ','.join(type_filters)

        return params

    async def create_system_export(self, 
                                   resource_types: List[str] = None,
                                   since: datetime = None) -> str:
        """
        Convenience method for system-level export.
        """
        request = ExportRequest(
            export_type=ExportType.SYSTEM,
            resource_types=resource_types or [],
            since=since
        )
        return await self.initiate_export(request)

    async def create_group_export(self,
                                  group_id: str,
                                  resource_types: List[str] = None,
                                  since: datetime = None) -> str:
        """
        Convenience method for group-level export.
        Exports data for all patients in the specified Group.
        """
        request = ExportRequest(
            export_type=ExportType.GROUP,
            group_id=group_id,
            resource_types=resource_types or [],
            since=since
        )
        return await self.initiate_export(request)

    async def create_patient_export(self,
                                   patient_id: str = None,
                                   resource_types: List[str] = None,
                                   since: datetime = None) -> str:
        """
        Convenience method for patient-level export.
        If patient_id is None, exports all patients.
        """
        request = ExportRequest(
            export_type=ExportType.PATIENT,
            patient_id=patient_id,
            resource_types=resource_types or [],
            since=since
        )
        return await self.initiate_export(request)


# Example usage
async def example_exports():
    client = BulkExportClient(
        fhir_base_url='https://fhir.example.org/r4',
        auth_token='eyJhbGciOiJSUzI1NiIs...'
    )

    # System-level export of all data
    status_url = await client.create_system_export()
    print(f"System export started: {status_url}")

    # Export specific resource types since last week
    from datetime import timedelta
    last_week = datetime.utcnow() - timedelta(days=7)

    status_url = await client.create_system_export(
        resource_types=['Patient', 'Observation', 'Condition'],
        since=last_week
    )
    print(f"Incremental export started: {status_url}")

    # Export for a specific cohort (Group)
    status_url = await client.create_group_export(
        group_id='diabetes-cohort-2024',
        resource_types=['Patient', 'Observation', 'MedicationRequest']
    )
    print(f"Cohort export started: {status_url}")

Progress Monitoring

Monitors long-running export jobs with polling, exponential backoff, progress callbacks, and timeout handling.

Progress Monitoring
from typing import Dict, List, Optional, Callable
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
import asyncio
import time

class ExportStatus(Enum):
    PENDING = "pending"
    IN_PROGRESS = "in-progress"
    COMPLETED = "completed"
    ERROR = "error"
    CANCELLED = "cancelled"

@dataclass
class ExportProgress:
    """Progress information for a bulk export job"""
    status: ExportStatus
    progress_percent: float = 0.0
    resources_exported: int = 0
    estimated_completion: Optional[datetime] = None
    current_resource_type: Optional[str] = None
    output_files: List[Dict] = field(default_factory=list)
    error_message: Optional[str] = None
    transaction_time: Optional[datetime] = None

class ExportMonitor:
    """
    Monitors FHIR Bulk Data Export jobs with polling and callbacks.
    """

    def __init__(self, 
                 fhir_client,
                 poll_interval: float = 5.0,
                 max_poll_interval: float = 60.0,
                 backoff_factor: float = 1.5):
        self.fhir_client = fhir_client
        self.poll_interval = poll_interval
        self.max_poll_interval = max_poll_interval
        self.backoff_factor = backoff_factor
        self.active_jobs = {}

    async def monitor_export(self,
                            status_url: str,
                            on_progress: Callable[[ExportProgress], None] = None,
                            on_complete: Callable[[ExportProgress], None] = None,
                            on_error: Callable[[ExportProgress], None] = None,
                            timeout: float = 3600) -> ExportProgress:
        """
        Monitor an export job until completion or timeout.
        """
        start_time = time.time()
        current_interval = self.poll_interval

        while True:
            # Check timeout
            elapsed = time.time() - start_time
            if elapsed > timeout:
                raise TimeoutError(f"Export timed out after {timeout} seconds")

            # Poll status
            progress = await self._poll_status(status_url)

            # Callback for progress updates
            if on_progress:
                on_progress(progress)

            # Check completion states
            if progress.status == ExportStatus.COMPLETED:
                if on_complete:
                    on_complete(progress)
                return progress

            elif progress.status == ExportStatus.ERROR:
                if on_error:
                    on_error(progress)
                raise ExportError(progress.error_message or "Export failed")

            elif progress.status == ExportStatus.CANCELLED:
                raise ExportCancelled("Export was cancelled")

            # Wait before next poll (with exponential backoff)
            await asyncio.sleep(current_interval)
            current_interval = min(
                current_interval * self.backoff_factor,
                self.max_poll_interval
            )

    async def _poll_status(self, status_url: str) -> ExportProgress:
        """
        Poll the export status endpoint.
        """
        headers = {
            'Accept': 'application/json'
        }

        # In real implementation:
        # response = await self.fhir_client.get(status_url, headers=headers)
        #
        # if response.status == 202:
        #     # Still in progress
        #     progress_header = response.headers.get('X-Progress', '0%')
        #     return ExportProgress(
        #         status=ExportStatus.IN_PROGRESS,
        #         progress_percent=self._parse_progress(progress_header)
        #     )
        # elif response.status == 200:
        #     # Completed
        #     result = await response.json()
        #     return self._parse_completion(result)
        # else:
        #     # Error
        #     return ExportProgress(
        #         status=ExportStatus.ERROR,
        #         error_message=f"Status check failed: {response.status}"
        #     )

        # Placeholder for demo
        return ExportProgress(
            status=ExportStatus.IN_PROGRESS,
            progress_percent=50.0,
            resources_exported=1000
        )

    def _parse_progress(self, progress_header: str) -> float:
        """Parse X-Progress header value."""
        try:
            return float(progress_header.rstrip('%'))
        except ValueError:
            return 0.0

    def _parse_completion(self, result: Dict) -> ExportProgress:
        """Parse completed export response."""
        output_files = []

        for output in result.get('output', []):
            output_files.append({
                'type': output.get('type'),
                'url': output.get('url'),
                'count': output.get('count')
            })

        return ExportProgress(
            status=ExportStatus.COMPLETED,
            progress_percent=100.0,
            output_files=output_files,
            transaction_time=datetime.fromisoformat(
                result.get('transactionTime', '').replace('Z', '+00:00')
            ) if result.get('transactionTime') else None
        )

    async def cancel_export(self, status_url: str) -> bool:
        """
        Request cancellation of an export job.
        """
        # In real implementation:
        # response = await self.fhir_client.delete(status_url)
        # return response.status == 202

        return True

    def create_progress_callback(self, 
                                callback: Callable[[float, str], None]
                                ) -> Callable[[ExportProgress], None]:
        """
        Create a simple progress callback wrapper.
        """
        def wrapper(progress: ExportProgress):
            message = f"Status: {progress.status.value}"
            if progress.current_resource_type:
                message += f" ({progress.current_resource_type})"
            if progress.resources_exported:
                message += f" - {progress.resources_exported} resources"

            callback(progress.progress_percent, message)

        return wrapper


class ExportError(Exception):
    """Error during bulk export"""
    pass

class ExportCancelled(Exception):
    """Export was cancelled"""
    pass


# Example usage with progress bar
async def run_monitored_export():
    from tqdm import tqdm

    # Assume we have fhir_client and status_url
    fhir_client = None
    status_url = "https://fhir.example.org/$export-poll/job-123"

    monitor = ExportMonitor(fhir_client)

    # Create progress bar
    pbar = tqdm(total=100, desc="Exporting", unit="%")
    last_progress = 0

    def update_progress(progress: ExportProgress):
        nonlocal last_progress
        delta = progress.progress_percent - last_progress
        pbar.update(delta)
        last_progress = progress.progress_percent
        pbar.set_postfix({
            'status': progress.status.value,
            'resources': progress.resources_exported
        })

    try:
        result = await monitor.monitor_export(
            status_url,
            on_progress=update_progress,
            timeout=7200  # 2 hour timeout
        )

        pbar.close()

        print(f"\nExport completed!")
        print(f"Output files: {len(result.output_files)}")
        for f in result.output_files:
            print(f"  - {f['type']}: {f['url']} ({f.get('count', '?')} resources)")

    except ExportError as e:
        pbar.close()
        print(f"\nExport failed: {e}")

    except TimeoutError:
        pbar.close()
        print("\nExport timed out")

NDJSON Processing

Processes NDJSON (Newline Delimited JSON) export files with streaming, filtering, transformation, and memory-efficient batch operations.

NDJSON Processing
from typing import Dict, List, Iterator, Any, Optional, Callable
from dataclasses import dataclass
import json
import gzip
from pathlib import Path
import asyncio
from concurrent.futures import ThreadPoolExecutor

@dataclass
class NDJSONFile:
    """Metadata about an NDJSON file from bulk export"""
    resource_type: str
    url: str
    count: Optional[int] = None

class NDJSONProcessor:
    """
    Processes NDJSON (Newline Delimited JSON) files from FHIR Bulk Export.
    Supports streaming, filtering, and transformation.
    """

    def __init__(self, max_workers: int = 4):
        self.max_workers = max_workers
        self.executor = ThreadPoolExecutor(max_workers=max_workers)

    def stream_resources(self, file_path: str) -> Iterator[Dict]:
        """
        Stream resources from an NDJSON file.
        Memory efficient - processes one resource at a time.
        """
        opener = gzip.open if file_path.endswith('.gz') else open

        with opener(file_path, 'rt', encoding='utf-8') as f:
            for line_num, line in enumerate(f, 1):
                line = line.strip()
                if not line:
                    continue

                try:
                    yield json.loads(line)
                except json.JSONDecodeError as e:
                    # Log error but continue processing
                    print(f"JSON parse error at line {line_num}: {e}")
                    continue

    def filter_resources(self, 
                        file_path: str,
                        predicate: Callable[[Dict], bool]) -> Iterator[Dict]:
        """
        Stream and filter resources based on predicate function.
        """
        for resource in self.stream_resources(file_path):
            if predicate(resource):
                yield resource

    def transform_resources(self,
                           file_path: str,
                           transformer: Callable[[Dict], Dict]) -> Iterator[Dict]:
        """
        Stream and transform resources.
        """
        for resource in self.stream_resources(file_path):
            yield transformer(resource)

    async def process_export_files(self,
                                   files: List[NDJSONFile],
                                   processor: Callable[[Dict], None],
                                   download_dir: str = './downloads') -> Dict:
        """
        Process multiple NDJSON files from a bulk export.
        Downloads and processes files in parallel.
        """
        stats = {
            'files_processed': 0,
            'resources_processed': 0,
            'errors': []
        }

        # Download files (in real impl, would use aiohttp)
        local_files = await self._download_files(files, download_dir)

        # Process each file
        for file_info, local_path in zip(files, local_files):
            try:
                count = 0
                for resource in self.stream_resources(local_path):
                    processor(resource)
                    count += 1

                stats['files_processed'] += 1
                stats['resources_processed'] += count

            except Exception as e:
                stats['errors'].append({
                    'file': file_info.url,
                    'error': str(e)
                })

        return stats

    async def _download_files(self, 
                             files: List[NDJSONFile],
                             download_dir: str) -> List[str]:
        """Download NDJSON files from URLs."""
        Path(download_dir).mkdir(parents=True, exist_ok=True)

        local_paths = []
        for file_info in files:
            # In real implementation, would download from URL
            # Here we just generate expected local path
            filename = f"{file_info.resource_type}.ndjson"
            local_path = str(Path(download_dir) / filename)
            local_paths.append(local_path)

        return local_paths

    def write_ndjson(self,
                    output_path: str,
                    resources: Iterator[Dict],
                    compress: bool = True):
        """
        Write resources to NDJSON file.
        """
        if compress and not output_path.endswith('.gz'):
            output_path += '.gz'

        opener = gzip.open if output_path.endswith('.gz') else open

        with opener(output_path, 'wt', encoding='utf-8') as f:
            for resource in resources:
                f.write(json.dumps(resource, separators=(',', ':')))
                f.write('\n')

    def merge_files(self,
                   input_files: List[str],
                   output_file: str,
                   transformer: Callable[[Dict], Dict] = None):
        """
        Merge multiple NDJSON files into one.
        Optionally transform resources during merge.
        """
        def resource_generator():
            for file_path in input_files:
                for resource in self.stream_resources(file_path):
                    if transformer:
                        yield transformer(resource)
                    else:
                        yield resource

        self.write_ndjson(output_file, resource_generator())

    def split_by_type(self,
                     input_file: str,
                     output_dir: str) -> Dict[str, str]:
        """
        Split NDJSON file by resource type.
        Returns mapping of resource type to output file.
        """
        Path(output_dir).mkdir(parents=True, exist_ok=True)

        # Group resources by type
        type_files = {}
        type_writers = {}

        try:
            for resource in self.stream_resources(input_file):
                resource_type = resource.get('resourceType')

                if resource_type not in type_writers:
                    output_path = str(Path(output_dir) / f"{resource_type}.ndjson")
                    type_files[resource_type] = output_path
                    type_writers[resource_type] = open(output_path, 'w', encoding='utf-8')

                type_writers[resource_type].write(
                    json.dumps(resource, separators=(',', ':'))
                )
                type_writers[resource_type].write('\n')

        finally:
            for writer in type_writers.values():
                writer.close()

        return type_files

    def count_resources(self, file_path: str) -> Dict[str, int]:
        """
        Count resources by type in NDJSON file.
        """
        counts = {}

        for resource in self.stream_resources(file_path):
            resource_type = resource.get('resourceType', 'Unknown')
            counts[resource_type] = counts.get(resource_type, 0) + 1

        return counts

    def validate_resources(self,
                          file_path: str,
                          validator: Callable[[Dict], List[str]]) -> List[Dict]:
        """
        Validate resources and return list of issues.
        """
        issues = []

        for line_num, resource in enumerate(self.stream_resources(file_path), 1):
            resource_issues = validator(resource)
            if resource_issues:
                issues.append({
                    'line': line_num,
                    'resource_type': resource.get('resourceType'),
                    'resource_id': resource.get('id'),
                    'issues': resource_issues
                })

        return issues


# Example usage
if __name__ == "__main__":
    processor = NDJSONProcessor()

    # Stream and filter patients
    for patient in processor.filter_resources(
        'patients.ndjson',
        lambda p: p.get('gender') == 'female'
    ):
        print(f"Patient: {patient.get('id')}")

    # Count resources
    counts = processor.count_resources('export.ndjson')
    print(f"Resource counts: {counts}")

Benefits

  • Scalable Extraction: Handle population-level data volumes
  • Standards-Based: Built on FHIR Bulk Data specification
  • Secure: Uses SMART Backend Services authentication
  • Flexible: Support for filtering, transformation, and multiple destinations
  • Observable: Progress tracking and monitoring capabilities

Trade-offs

  • Complexity: Multiple steps and components to orchestrate
  • Latency: Bulk exports can take hours or days
  • Resource Intensive: Significant storage and processing requirements
  • Coordination: Managing long-running jobs across system restarts

References


Incremental Exports

Use _since parameter for incremental exports to reduce data volume and processing time. Track the last export timestamp for each export job.