Skip to content

Asynchronous Invocation

Intent

Handle long-running FHIR operations through asynchronous job management and polling, enabling non-blocking execution of time-consuming processes.

Forces

  • Network Latency & Reliability: Healthcare networks often span institutions with varying connectivity quality.

Structure

The Asynchronous Invocation pattern implements job-based processing where long-running operations are submitted as jobs and clients poll for completion status.

Asynchronous Invocation Class Diagram

Key Components

AsyncInvoker

Main interface for submitting and monitoring asynchronous operations

Job

Represents a long-running operation with status tracking and progress information

JobQueue

Manages queuing and prioritization of pending jobs

JobExecutor

Executes jobs and updates progress information

JobStore

Persists job state and provides job lookup capabilities

Behavior

FHIR Bulk Export Example

The following diagram shows the asynchronous flow for bulk data export:

Async Invocation Flow

Implementation Considerations

Job Management

Core job management system with status tracking, progress updates, and lifecycle management for long-running asynchronous operations.

Job Management
from dataclasses import dataclass, field
from enum import Enum
from typing import Dict, Any, Optional, List
from datetime import datetime, timedelta
import asyncio
import json
import uuid

class JobStatus(Enum):
    SUBMITTED = "submitted"
    QUEUED = "queued"
    IN_PROGRESS = "in_progress"
    COMPLETED = "completed"
    FAILED = "failed"
    CANCELLED = "cancelled"

@dataclass
class ProgressInfo:
    current_step: str
    total_steps: int
    completed_steps: int
    percentage: float
    estimated_remaining: Optional[timedelta] = None
    details: Dict[str, Any] = field(default_factory=dict)

@dataclass
class Job:
    id: str
    operation_type: str
    parameters: Dict[str, Any]
    submitter_id: str
    status: JobStatus
    submitted_at: datetime
    started_at: Optional[datetime] = None
    completed_at: Optional[datetime] = None
    progress: ProgressInfo = field(default_factory=lambda: ProgressInfo("", 1, 0, 0.0))
    result: Optional[Dict[str, Any]] = None
    error_details: Optional[str] = None
    estimated_completion: Optional[datetime] = None
    priority: int = 5  # 1=highest, 10=lowest

class AsyncInvoker:
    def __init__(self, job_store, job_queue, executor_pool):
        self.job_store = job_store
        self.job_queue = job_queue
        self.executor_pool = executor_pool
        self.running = False

    async def start(self):
        """Start the async job processing system"""
        self.running = True
        # Start job processor tasks
        for i in range(self.executor_pool.size):
            asyncio.create_task(self._process_jobs())

    async def submit_job(self, operation_type: str, parameters: Dict[str, Any], 
                        submitter_id: str, priority: int = 5) -> str:
        """Submit new asynchronous job"""

        job = Job(
            id=str(uuid.uuid4()),
            operation_type=operation_type,
            parameters=parameters,
            submitter_id=submitter_id,
            status=JobStatus.SUBMITTED,
            submitted_at=datetime.utcnow(),
            priority=priority
        )

        # Estimate completion time based on operation type
        job.estimated_completion = self._estimate_completion(job)

        # Store job
        await self.job_store.save(job)

        # Queue for processing
        await self.job_queue.enqueue(job)

        return job.id

    async def get_job_status(self, job_id: str) -> Optional[Job]:
        """Get current job status and progress"""
        return await self.job_store.find_by_id(job_id)

    async def cancel_job(self, job_id: str, requester_id: str) -> bool:
        """Cancel pending or running job"""

        job = await self.job_store.find_by_id(job_id)
        if not job:
            return False

        # Check authorization
        if job.submitter_id != requester_id:
            raise UnauthorizedError("Cannot cancel job submitted by another user")

        # Can only cancel submitted, queued, or in-progress jobs
        if job.status in [JobStatus.SUBMITTED, JobStatus.QUEUED, JobStatus.IN_PROGRESS]:
            job.status = JobStatus.CANCELLED
            job.completed_at = datetime.utcnow()
            await self.job_store.save(job)
            return True

        return False

    async def _process_jobs(self):
        """Background job processing loop"""

        while self.running:
            try:
                # Get next job from queue
                job = await self.job_queue.dequeue()
                if not job:
                    await asyncio.sleep(1)
                    continue

                # Update status
                job.status = JobStatus.IN_PROGRESS
                job.started_at = datetime.utcnow()
                await self.job_store.save(job)

                # Execute job
                await self._execute_job(job)

            except Exception as e:
                if job:
                    await self._handle_job_error(job, e)

    async def _execute_job(self, job: Job):
        """Execute specific job based on operation type"""

        try:
            if job.operation_type == "bulk_export":
                result = await self._execute_bulk_export(job)
            elif job.operation_type == "validation":
                result = await self._execute_validation(job)
            elif job.operation_type == "batch_operation":
                result = await self._execute_batch_operation(job)
            else:
                raise ValueError(f"Unknown operation type: {job.operation_type}")

            # Job completed successfully
            job.status = JobStatus.COMPLETED
            job.result = result
            job.completed_at = datetime.utcnow()
            job.progress.percentage = 100.0
            job.progress.current_step = "Completed"

        except Exception as e:
            # Job failed
            job.status = JobStatus.FAILED
            job.error_details = str(e)
            job.completed_at = datetime.utcnow()

        finally:
            await self.job_store.save(job)

    async def _execute_bulk_export(self, job: Job) -> Dict[str, Any]:
        """Execute FHIR bulk export operation"""

        # Update progress
        await self._update_progress(job, "Authenticating", 0, 10.0)

        # Get authentication token
        auth_token = await self._get_system_token(job.parameters['fhir_server'])

        await self._update_progress(job, "Starting export", 1, 20.0)

        # Initiate export
        export_url = await self._start_fhir_export(
            job.parameters['fhir_server'],
            job.parameters['export_type'],
            job.parameters.get('resource_types', []),
            auth_token
        )

        await self._update_progress(job, "Monitoring export", 2, 30.0)

        # Poll for completion
        manifest = await self._monitor_fhir_export(job, export_url, auth_token)

        await self._update_progress(job, "Processing data", 8, 90.0)

        # Process exported files
        processed_urls = await self._process_export_files(job, manifest, auth_token)

        return {
            "export_manifest": manifest,
            "processed_files": processed_urls,
            "completion_time": datetime.utcnow().isoformat()
        }

    async def _update_progress(self, job: Job, step: str, completed: int, percentage: float):
        """Update job progress information"""

        job.progress.current_step = step
        job.progress.completed_steps = completed
        job.progress.percentage = percentage

        # Estimate remaining time
        if job.started_at and percentage > 0:
            elapsed = datetime.utcnow() - job.started_at
            total_estimated = elapsed * (100.0 / percentage)
            remaining = total_estimated - elapsed
            job.progress.estimated_remaining = remaining

        await self.job_store.save(job)

class JobQueue:
    def __init__(self):
        self.pending_jobs = asyncio.PriorityQueue()
        self.job_lookup = {}

    async def enqueue(self, job: Job):
        """Add job to queue with priority ordering"""

        # Use priority and submission time for ordering
        priority_score = (job.priority, job.submitted_at.timestamp())
        await self.pending_jobs.put((priority_score, job.id))

        job.status = JobStatus.QUEUED

    async def dequeue(self) -> Optional[Job]:
        """Get next job from queue"""

        try:
            priority_score, job_id = await asyncio.wait_for(
                self.pending_jobs.get(), timeout=1.0
            )
            return await self._get_job_by_id(job_id)
        except asyncio.TimeoutError:
            return None

    async def _get_job_by_id(self, job_id: str) -> Job:
        # Implementation depends on job store
        pass

class JobStore:
    def __init__(self):
        self.jobs: Dict[str, Job] = {}

    async def save(self, job: Job):
        """Persist job state"""
        self.jobs[job.id] = job

    async def find_by_id(self, job_id: str) -> Optional[Job]:
        """Find job by ID"""
        return self.jobs.get(job_id)

    async def find_by_status(self, status: JobStatus) -> List[Job]:
        """Find jobs with specific status"""
        return [job for job in self.jobs.values() if job.status == status]

    async def find_by_submitter(self, submitter_id: str) -> List[Job]:
        """Find jobs submitted by specific user"""
        return [job for job in self.jobs.values() if job.submitter_id == submitter_id]

REST API Implementation

RESTful endpoints for job submission, status polling, and result retrieval following FHIR async patterns with proper HTTP status codes.

REST API Implementation
from fastapi import FastAPI, HTTPException, BackgroundTasks
from pydantic import BaseModel
from typing import Optional

app = FastAPI()

class JobSubmission(BaseModel):
    operation_type: str
    parameters: Dict[str, Any]
    priority: Optional[int] = 5

class JobResponse(BaseModel):
    job_id: str
    status: str
    submitted_at: str
    estimated_completion: Optional[str] = None

@app.post("/jobs", status_code=202)
async def submit_job(job_request: JobSubmission, submitter_id: str) -> JobResponse:
    """Submit asynchronous job"""

    job_id = await async_invoker.submit_job(
        operation_type=job_request.operation_type,
        parameters=job_request.parameters,
        submitter_id=submitter_id,
        priority=job_request.priority
    )

    job = await async_invoker.get_job_status(job_id)

    response_headers = {
        "Content-Location": f"/jobs/{job_id}",
        "Retry-After": "30"
    }

    return JobResponse(
        job_id=job.id,
        status=job.status.value,
        submitted_at=job.submitted_at.isoformat(),
        estimated_completion=job.estimated_completion.isoformat() if job.estimated_completion else None
    ), response_headers

@app.get("/jobs/{job_id}")
async def get_job_status(job_id: str):
    """Get job status and progress"""

    job = await async_invoker.get_job_status(job_id)

    if not job:
        raise HTTPException(status_code=404, detail="Job not found")

    if job.status == JobStatus.COMPLETED:
        return {
            "job_id": job.id,
            "status": job.status.value,
            "result": job.result,
            "completed_at": job.completed_at.isoformat()
        }
    elif job.status == JobStatus.FAILED:
        return {
            "job_id": job.id,
            "status": job.status.value,
            "error": job.error_details,
            "failed_at": job.completed_at.isoformat()
        }
    else:
        # Still in progress
        response = {
            "job_id": job.id,
            "status": job.status.value,
            "progress": {
                "current_step": job.progress.current_step,
                "percentage": job.progress.percentage,
                "estimated_remaining": job.progress.estimated_remaining.total_seconds() if job.progress.estimated_remaining else None
            }
        }

        # Return 202 for in-progress jobs
        return response, 202

@app.delete("/jobs/{job_id}")
async def cancel_job(job_id: str, requester_id: str):
    """Cancel running job"""

    success = await async_invoker.cancel_job(job_id, requester_id)

    if success:
        return {"message": "Job cancelled successfully"}
    else:
        raise HTTPException(status_code=400, detail="Cannot cancel job")

Error Handling and Retry Logic

Implements exponential backoff, retry policies, and error categorization for resilient handling of transient and permanent failures.

Error Handling and Retry Logic
class RetryableJobExecutor:
    def __init__(self, max_retries=3, base_delay=60):
        self.max_retries = max_retries
        self.base_delay = base_delay

    async def execute_with_retry(self, job: Job):
        """Execute job with retry logic"""

        for attempt in range(self.max_retries + 1):
            try:
                return await self._execute_job(job)

            except RetryableError as e:
                if attempt < self.max_retries:
                    # Exponential backoff
                    delay = self.base_delay * (2 ** attempt)

                    await self._update_job_retry_info(job, attempt + 1, delay)
                    await asyncio.sleep(delay)
                    continue
                else:
                    # Max retries exceeded
                    raise JobExecutionError(f"Job failed after {self.max_retries} retries: {e}")

            except NonRetryableError as e:
                # Don't retry for certain types of errors
                raise JobExecutionError(f"Job failed with non-retryable error: {e}")

    async def _update_job_retry_info(self, job: Job, attempt: int, delay: int):
        """Update job with retry information"""

        job.progress.details['retry_attempt'] = attempt
        job.progress.details['next_retry_in'] = delay
        job.progress.current_step = f"Retrying (attempt {attempt})"

        await self.job_store.save(job)

Benefits

  • Non-blocking Operations: Clients don't wait for long-running processes
  • Resource Management: Server resources allocated efficiently for long tasks
  • Progress Visibility: Real-time status and progress information
  • Fault Tolerance: Retry logic and error handling for unreliable operations
  • Scalability: Job queue enables horizontal scaling of processing capacity

Trade-offs

  • Complexity: More complex than synchronous request/response
  • Latency: Additional overhead for job management and polling
  • State Management: Persistent job state required across system restarts
  • Client Complexity: Clients must implement polling logic
  • Resource Usage: Job metadata and intermediate state storage requirements

References


Polling Optimization

Implement intelligent polling intervals that start frequent and back off exponentially. Consider WebSocket or Server-Sent Events for real-time progress updates.

Job Cleanup

Implement job retention policies to prevent unlimited growth of completed job records. Archive or delete old jobs based on age and importance.