Asynchronous Invocation
Intent
Handle long-running FHIR operations through asynchronous job management and polling, enabling non-blocking execution of time-consuming processes.
Forces
- Network Latency & Reliability: Healthcare networks often span institutions with varying connectivity quality.
Structure
The Asynchronous Invocation pattern implements job-based processing where long-running operations are submitted as jobs and clients poll for completion status.
Key Components
AsyncInvoker
Main interface for submitting and monitoring asynchronous operations
Job
Represents a long-running operation with status tracking and progress information
JobQueue
Manages queuing and prioritization of pending jobs
JobExecutor
Executes jobs and updates progress information
JobStore
Persists job state and provides job lookup capabilities
Behavior
FHIR Bulk Export Example
The following diagram shows the asynchronous flow for bulk data export:
Implementation Considerations
Job Management
Core job management system with status tracking, progress updates, and lifecycle management for long-running asynchronous operations.
from dataclasses import dataclass, field
from enum import Enum
from typing import Dict, Any, Optional, List
from datetime import datetime, timedelta
import asyncio
import json
import uuid
class JobStatus(Enum):
SUBMITTED = "submitted"
QUEUED = "queued"
IN_PROGRESS = "in_progress"
COMPLETED = "completed"
FAILED = "failed"
CANCELLED = "cancelled"
@dataclass
class ProgressInfo:
current_step: str
total_steps: int
completed_steps: int
percentage: float
estimated_remaining: Optional[timedelta] = None
details: Dict[str, Any] = field(default_factory=dict)
@dataclass
class Job:
id: str
operation_type: str
parameters: Dict[str, Any]
submitter_id: str
status: JobStatus
submitted_at: datetime
started_at: Optional[datetime] = None
completed_at: Optional[datetime] = None
progress: ProgressInfo = field(default_factory=lambda: ProgressInfo("", 1, 0, 0.0))
result: Optional[Dict[str, Any]] = None
error_details: Optional[str] = None
estimated_completion: Optional[datetime] = None
priority: int = 5 # 1=highest, 10=lowest
class AsyncInvoker:
def __init__(self, job_store, job_queue, executor_pool):
self.job_store = job_store
self.job_queue = job_queue
self.executor_pool = executor_pool
self.running = False
async def start(self):
"""Start the async job processing system"""
self.running = True
# Start job processor tasks
for i in range(self.executor_pool.size):
asyncio.create_task(self._process_jobs())
async def submit_job(self, operation_type: str, parameters: Dict[str, Any],
submitter_id: str, priority: int = 5) -> str:
"""Submit new asynchronous job"""
job = Job(
id=str(uuid.uuid4()),
operation_type=operation_type,
parameters=parameters,
submitter_id=submitter_id,
status=JobStatus.SUBMITTED,
submitted_at=datetime.utcnow(),
priority=priority
)
# Estimate completion time based on operation type
job.estimated_completion = self._estimate_completion(job)
# Store job
await self.job_store.save(job)
# Queue for processing
await self.job_queue.enqueue(job)
return job.id
async def get_job_status(self, job_id: str) -> Optional[Job]:
"""Get current job status and progress"""
return await self.job_store.find_by_id(job_id)
async def cancel_job(self, job_id: str, requester_id: str) -> bool:
"""Cancel pending or running job"""
job = await self.job_store.find_by_id(job_id)
if not job:
return False
# Check authorization
if job.submitter_id != requester_id:
raise UnauthorizedError("Cannot cancel job submitted by another user")
# Can only cancel submitted, queued, or in-progress jobs
if job.status in [JobStatus.SUBMITTED, JobStatus.QUEUED, JobStatus.IN_PROGRESS]:
job.status = JobStatus.CANCELLED
job.completed_at = datetime.utcnow()
await self.job_store.save(job)
return True
return False
async def _process_jobs(self):
"""Background job processing loop"""
while self.running:
try:
# Get next job from queue
job = await self.job_queue.dequeue()
if not job:
await asyncio.sleep(1)
continue
# Update status
job.status = JobStatus.IN_PROGRESS
job.started_at = datetime.utcnow()
await self.job_store.save(job)
# Execute job
await self._execute_job(job)
except Exception as e:
if job:
await self._handle_job_error(job, e)
async def _execute_job(self, job: Job):
"""Execute specific job based on operation type"""
try:
if job.operation_type == "bulk_export":
result = await self._execute_bulk_export(job)
elif job.operation_type == "validation":
result = await self._execute_validation(job)
elif job.operation_type == "batch_operation":
result = await self._execute_batch_operation(job)
else:
raise ValueError(f"Unknown operation type: {job.operation_type}")
# Job completed successfully
job.status = JobStatus.COMPLETED
job.result = result
job.completed_at = datetime.utcnow()
job.progress.percentage = 100.0
job.progress.current_step = "Completed"
except Exception as e:
# Job failed
job.status = JobStatus.FAILED
job.error_details = str(e)
job.completed_at = datetime.utcnow()
finally:
await self.job_store.save(job)
async def _execute_bulk_export(self, job: Job) -> Dict[str, Any]:
"""Execute FHIR bulk export operation"""
# Update progress
await self._update_progress(job, "Authenticating", 0, 10.0)
# Get authentication token
auth_token = await self._get_system_token(job.parameters['fhir_server'])
await self._update_progress(job, "Starting export", 1, 20.0)
# Initiate export
export_url = await self._start_fhir_export(
job.parameters['fhir_server'],
job.parameters['export_type'],
job.parameters.get('resource_types', []),
auth_token
)
await self._update_progress(job, "Monitoring export", 2, 30.0)
# Poll for completion
manifest = await self._monitor_fhir_export(job, export_url, auth_token)
await self._update_progress(job, "Processing data", 8, 90.0)
# Process exported files
processed_urls = await self._process_export_files(job, manifest, auth_token)
return {
"export_manifest": manifest,
"processed_files": processed_urls,
"completion_time": datetime.utcnow().isoformat()
}
async def _update_progress(self, job: Job, step: str, completed: int, percentage: float):
"""Update job progress information"""
job.progress.current_step = step
job.progress.completed_steps = completed
job.progress.percentage = percentage
# Estimate remaining time
if job.started_at and percentage > 0:
elapsed = datetime.utcnow() - job.started_at
total_estimated = elapsed * (100.0 / percentage)
remaining = total_estimated - elapsed
job.progress.estimated_remaining = remaining
await self.job_store.save(job)
class JobQueue:
def __init__(self):
self.pending_jobs = asyncio.PriorityQueue()
self.job_lookup = {}
async def enqueue(self, job: Job):
"""Add job to queue with priority ordering"""
# Use priority and submission time for ordering
priority_score = (job.priority, job.submitted_at.timestamp())
await self.pending_jobs.put((priority_score, job.id))
job.status = JobStatus.QUEUED
async def dequeue(self) -> Optional[Job]:
"""Get next job from queue"""
try:
priority_score, job_id = await asyncio.wait_for(
self.pending_jobs.get(), timeout=1.0
)
return await self._get_job_by_id(job_id)
except asyncio.TimeoutError:
return None
async def _get_job_by_id(self, job_id: str) -> Job:
# Implementation depends on job store
pass
class JobStore:
def __init__(self):
self.jobs: Dict[str, Job] = {}
async def save(self, job: Job):
"""Persist job state"""
self.jobs[job.id] = job
async def find_by_id(self, job_id: str) -> Optional[Job]:
"""Find job by ID"""
return self.jobs.get(job_id)
async def find_by_status(self, status: JobStatus) -> List[Job]:
"""Find jobs with specific status"""
return [job for job in self.jobs.values() if job.status == status]
async def find_by_submitter(self, submitter_id: str) -> List[Job]:
"""Find jobs submitted by specific user"""
return [job for job in self.jobs.values() if job.submitter_id == submitter_id]
REST API Implementation
RESTful endpoints for job submission, status polling, and result retrieval following FHIR async patterns with proper HTTP status codes.
from fastapi import FastAPI, HTTPException, BackgroundTasks
from pydantic import BaseModel
from typing import Optional
app = FastAPI()
class JobSubmission(BaseModel):
operation_type: str
parameters: Dict[str, Any]
priority: Optional[int] = 5
class JobResponse(BaseModel):
job_id: str
status: str
submitted_at: str
estimated_completion: Optional[str] = None
@app.post("/jobs", status_code=202)
async def submit_job(job_request: JobSubmission, submitter_id: str) -> JobResponse:
"""Submit asynchronous job"""
job_id = await async_invoker.submit_job(
operation_type=job_request.operation_type,
parameters=job_request.parameters,
submitter_id=submitter_id,
priority=job_request.priority
)
job = await async_invoker.get_job_status(job_id)
response_headers = {
"Content-Location": f"/jobs/{job_id}",
"Retry-After": "30"
}
return JobResponse(
job_id=job.id,
status=job.status.value,
submitted_at=job.submitted_at.isoformat(),
estimated_completion=job.estimated_completion.isoformat() if job.estimated_completion else None
), response_headers
@app.get("/jobs/{job_id}")
async def get_job_status(job_id: str):
"""Get job status and progress"""
job = await async_invoker.get_job_status(job_id)
if not job:
raise HTTPException(status_code=404, detail="Job not found")
if job.status == JobStatus.COMPLETED:
return {
"job_id": job.id,
"status": job.status.value,
"result": job.result,
"completed_at": job.completed_at.isoformat()
}
elif job.status == JobStatus.FAILED:
return {
"job_id": job.id,
"status": job.status.value,
"error": job.error_details,
"failed_at": job.completed_at.isoformat()
}
else:
# Still in progress
response = {
"job_id": job.id,
"status": job.status.value,
"progress": {
"current_step": job.progress.current_step,
"percentage": job.progress.percentage,
"estimated_remaining": job.progress.estimated_remaining.total_seconds() if job.progress.estimated_remaining else None
}
}
# Return 202 for in-progress jobs
return response, 202
@app.delete("/jobs/{job_id}")
async def cancel_job(job_id: str, requester_id: str):
"""Cancel running job"""
success = await async_invoker.cancel_job(job_id, requester_id)
if success:
return {"message": "Job cancelled successfully"}
else:
raise HTTPException(status_code=400, detail="Cannot cancel job")
Error Handling and Retry Logic
Implements exponential backoff, retry policies, and error categorization for resilient handling of transient and permanent failures.
class RetryableJobExecutor:
def __init__(self, max_retries=3, base_delay=60):
self.max_retries = max_retries
self.base_delay = base_delay
async def execute_with_retry(self, job: Job):
"""Execute job with retry logic"""
for attempt in range(self.max_retries + 1):
try:
return await self._execute_job(job)
except RetryableError as e:
if attempt < self.max_retries:
# Exponential backoff
delay = self.base_delay * (2 ** attempt)
await self._update_job_retry_info(job, attempt + 1, delay)
await asyncio.sleep(delay)
continue
else:
# Max retries exceeded
raise JobExecutionError(f"Job failed after {self.max_retries} retries: {e}")
except NonRetryableError as e:
# Don't retry for certain types of errors
raise JobExecutionError(f"Job failed with non-retryable error: {e}")
async def _update_job_retry_info(self, job: Job, attempt: int, delay: int):
"""Update job with retry information"""
job.progress.details['retry_attempt'] = attempt
job.progress.details['next_retry_in'] = delay
job.progress.current_step = f"Retrying (attempt {attempt})"
await self.job_store.save(job)
Related Patterns
- Population Export Pipeline: Population Export relies on Async Invocation for long-running bulk exports
- Broker: Broker routes async requests and tracks job status across endpoints
- Audit & Provenance Chain: Async job lifecycle events are captured in audit logs
Benefits
- Non-blocking Operations: Clients don't wait for long-running processes
- Resource Management: Server resources allocated efficiently for long tasks
- Progress Visibility: Real-time status and progress information
- Fault Tolerance: Retry logic and error handling for unreliable operations
- Scalability: Job queue enables horizontal scaling of processing capacity
Trade-offs
- Complexity: More complex than synchronous request/response
- Latency: Additional overhead for job management and polling
- State Management: Persistent job state required across system restarts
- Client Complexity: Clients must implement polling logic
- Resource Usage: Job metadata and intermediate state storage requirements
References
- FHIR Async Pattern - Asynchronous request patterns
- Bulk Data Async - Async pattern for bulk exports
Polling Optimization
Implement intelligent polling intervals that start frequent and back off exponentially. Consider WebSocket or Server-Sent Events for real-time progress updates.
Job Cleanup
Implement job retention policies to prevent unlimited growth of completed job records. Archive or delete old jobs based on age and importance.