De-Identification Adapter
Intent
Apply consistent de-identification transformations across FHIR and DICOM data for secondary use scenarios, enabling research while preserving privacy.
Forces
- Granular Sharing & Legal Obligations: Healthcare data sharing must respect complex, context-dependent consent and privacy rules.
Structure
The De-Identification Adapter pattern provides configurable de-identification transformations for healthcare data, supporting both FHIR resources and DICOM objects.
Key Components
DeIdentificationEngine
Orchestrates the de-identification process
PolicyResolver
Determines applicable de-identification rules
FHIRTransformer
Applies de-identification to FHIR resources
DICOMTransformer
Applies de-identification to DICOM objects
TokenGenerator
Creates consistent pseudonymization tokens
Behavior
De-Identification Workflow
The following sequence shows how data is de-identified for secondary use:
Transformation Steps
- Load Policy
- Analyze Data
- Apply Rules
- Generate Tokens
- Validate Output
- Record Actions
Implementation Considerations
De-Identification Policy
Configuration schema defining de-identification rules for FHIR and DICOM data including direct identifiers, quasi-identifiers, and cross-modal consistency settings.
# De-identification Policy Configuration
# Defines rules for FHIR and DICOM de-identification
policy:
name: "Research De-identification Policy"
version: "1.0"
compliance:
- "HIPAA Safe Harbor"
- "DICOM Supplement 142"
# FHIR De-identification Rules
fhir:
# Direct identifiers - always remove or replace
direct_identifiers:
- path: "Patient.identifier"
action: "pseudonymize"
algorithm: "sha256-hmac"
- path: "Patient.name"
action: "remove"
- path: "Patient.telecom"
action: "remove"
- path: "Patient.address"
action: "generalize"
retain: ["state", "country"]
- path: "Patient.birthDate"
action: "generalize"
precision: "year"
shift_if_age_over_89: true
# Quasi-identifiers - generalize or perturb
quasi_identifiers:
- path: "Patient.gender"
action: "retain"
- path: "Observation.effectiveDateTime"
action: "shift"
range_days: 30
consistent_per_patient: true
# References - update to pseudonymized IDs
references:
- path: "*.subject"
action: "pseudonymize_reference"
- path: "*.patient"
action: "pseudonymize_reference"
# Resource-specific rules
resources:
DocumentReference:
- path: "content.attachment.url"
action: "remove"
- path: "content.attachment.data"
action: "remove_if_contains_phi"
DiagnosticReport:
- path: "conclusion"
action: "nlp_scrub"
# DICOM De-identification Rules
dicom:
profile: "Clean Pixel Data Option"
# Tag-level actions
tags:
# Patient Module
- tag: "(0010,0010)" # Patient Name
action: "replace"
value: "ANONYMOUS"
- tag: "(0010,0020)" # Patient ID
action: "pseudonymize"
- tag: "(0010,0030)" # Patient Birth Date
action: "generalize_year"
- tag: "(0010,1000)" # Other Patient IDs
action: "remove"
- tag: "(0010,1001)" # Other Patient Names
action: "remove"
# Study/Series identifiers
- tag: "(0020,000D)" # Study Instance UID
action: "rehash"
- tag: "(0020,000E)" # Series Instance UID
action: "rehash"
- tag: "(0008,0018)" # SOP Instance UID
action: "rehash"
# Private tags
private_tags:
action: "remove_all"
exceptions:
- "(0019,xx10)" # Preserve specific private creator
# Pixel data
burned_in_annotation:
detect: true
action: "blackout"
regions:
- top_percent: 10
- bottom_percent: 5
# Cross-modal consistency
consistency:
patient_pseudonym:
algorithm: "deterministic"
salt_source: "project_secret"
date_shift:
consistent_per_patient: true
preserve_intervals: true
uid_mapping:
maintain_references: true
store_mapping: "secure_vault"
FHIR Transformation
Applies de-identification rules to FHIR resources with support for removal, pseudonymization, generalization, and date shifting transformations.
from typing import Dict, List, Any, Optional
from dataclasses import dataclass
from datetime import datetime, timedelta
import hashlib
import hmac
import re
import copy
@dataclass
class DeidentificationRule:
"""Rule for de-identifying a FHIR element"""
path: str
action: str # remove, pseudonymize, generalize, shift, retain
params: Dict[str, Any] = None
class FHIRDeidentifier:
"""
De-identifies FHIR resources according to policy rules.
Supports HIPAA Safe Harbor and custom de-identification profiles.
"""
def __init__(self, policy: Dict, secret_key: str):
self.policy = policy
self.secret_key = secret_key.encode()
self.patient_date_shifts = {} # Consistent shifts per patient
self.pseudonym_cache = {} # Consistent pseudonyms
def deidentify_resource(self, resource: Dict,
patient_id: str = None) -> Dict:
"""
De-identify a single FHIR resource.
Returns a new de-identified copy.
"""
# Deep copy to avoid modifying original
result = copy.deepcopy(resource)
# Get resource type
resource_type = result.get('resourceType')
# Apply general FHIR rules
result = self._apply_direct_identifier_rules(result, patient_id)
result = self._apply_quasi_identifier_rules(result, patient_id)
result = self._apply_reference_rules(result, patient_id)
# Apply resource-specific rules
if resource_type in self.policy.get('fhir', {}).get('resources', {}):
result = self._apply_resource_rules(result, resource_type)
# Add de-identification metadata
result = self._add_security_label(result)
return result
def deidentify_bundle(self, bundle: Dict) -> Dict:
"""De-identify all resources in a FHIR Bundle."""
result = copy.deepcopy(bundle)
# First pass: identify all patients for consistent processing
patient_map = {}
for entry in result.get('entry', []):
resource = entry.get('resource', {})
if resource.get('resourceType') == 'Patient':
original_id = resource.get('id')
patient_map[original_id] = self._generate_pseudonym(
f"Patient/{original_id}"
)
# Second pass: de-identify all resources
for entry in result.get('entry', []):
resource = entry.get('resource', {})
patient_ref = self._extract_patient_reference(resource)
entry['resource'] = self.deidentify_resource(resource, patient_ref)
return result
def _apply_direct_identifier_rules(self, resource: Dict,
patient_id: str) -> Dict:
"""Apply rules for direct identifiers."""
rules = self.policy.get('fhir', {}).get('direct_identifiers', [])
for rule in rules:
path = rule['path']
action = rule['action']
if self._path_applies(path, resource):
if action == 'remove':
resource = self._remove_element(resource, path)
elif action == 'pseudonymize':
resource = self._pseudonymize_element(
resource, path, rule.get('algorithm', 'sha256-hmac')
)
elif action == 'generalize':
resource = self._generalize_element(resource, path, rule)
return resource
def _apply_quasi_identifier_rules(self, resource: Dict,
patient_id: str) -> Dict:
"""Apply rules for quasi-identifiers."""
rules = self.policy.get('fhir', {}).get('quasi_identifiers', [])
for rule in rules:
path = rule['path']
action = rule['action']
if self._path_applies(path, resource):
if action == 'shift':
resource = self._shift_date(
resource, path, patient_id,
rule.get('range_days', 30),
rule.get('consistent_per_patient', True)
)
elif action == 'retain':
pass # Keep as-is
return resource
def _apply_reference_rules(self, resource: Dict,
patient_id: str) -> Dict:
"""Update references to use pseudonymized IDs."""
rules = self.policy.get('fhir', {}).get('references', [])
for rule in rules:
path = rule['path']
if rule['action'] == 'pseudonymize_reference':
resource = self._pseudonymize_references(resource, path)
return resource
def _apply_resource_rules(self, resource: Dict,
resource_type: str) -> Dict:
"""Apply resource-specific de-identification rules."""
rules = self.policy['fhir']['resources'].get(resource_type, [])
for rule in rules:
path = rule['path']
action = rule['action']
if action == 'remove':
resource = self._remove_element(resource, path)
elif action == 'nlp_scrub':
resource = self._nlp_scrub(resource, path)
elif action == 'remove_if_contains_phi':
resource = self._remove_if_phi(resource, path)
return resource
def _generate_pseudonym(self, identifier: str) -> str:
"""Generate consistent pseudonym using HMAC."""
if identifier in self.pseudonym_cache:
return self.pseudonym_cache[identifier]
h = hmac.new(self.secret_key, identifier.encode(), hashlib.sha256)
pseudonym = h.hexdigest()[:16]
self.pseudonym_cache[identifier] = pseudonym
return pseudonym
def _get_date_shift(self, patient_id: str, range_days: int) -> int:
"""Get consistent date shift for a patient."""
if patient_id not in self.patient_date_shifts:
# Generate deterministic shift based on patient ID
h = hmac.new(self.secret_key, patient_id.encode(), hashlib.sha256)
shift = int.from_bytes(h.digest()[:4], 'big') % (range_days * 2) - range_days
self.patient_date_shifts[patient_id] = shift
return self.patient_date_shifts[patient_id]
def _shift_date(self, resource: Dict, path: str,
patient_id: str, range_days: int,
consistent: bool) -> Dict:
"""Shift date/datetime values."""
value = self._get_element(resource, path)
if not value:
return resource
if consistent and patient_id:
shift_days = self._get_date_shift(patient_id, range_days)
else:
shift_days = self._get_date_shift(str(id(resource)), range_days)
# Parse and shift the date
try:
if 'T' in value:
dt = datetime.fromisoformat(value.replace('Z', '+00:00'))
shifted = dt + timedelta(days=shift_days)
new_value = shifted.isoformat()
else:
dt = datetime.strptime(value, '%Y-%m-%d')
shifted = dt + timedelta(days=shift_days)
new_value = shifted.strftime('%Y-%m-%d')
return self._set_element(resource, path, new_value)
except:
return resource
def _add_security_label(self, resource: Dict) -> Dict:
"""Add security label indicating de-identification."""
if 'meta' not in resource:
resource['meta'] = {}
if 'security' not in resource['meta']:
resource['meta']['security'] = []
resource['meta']['security'].append({
"system": "http://terminology.hl7.org/CodeSystem/v3-ObservationValue",
"code": "ANONYED",
"display": "anonymized"
})
return resource
# Helper methods for path navigation
def _path_applies(self, path: str, resource: Dict) -> bool:
"""Check if path applies to this resource."""
if path.startswith('*.'):
return True
parts = path.split('.')
return parts[0] == resource.get('resourceType')
def _get_element(self, resource: Dict, path: str) -> Any:
"""Get element value at path."""
parts = path.split('.')[1:] # Skip resource type
current = resource
for part in parts:
if isinstance(current, dict):
current = current.get(part)
else:
return None
return current
def _set_element(self, resource: Dict, path: str, value: Any) -> Dict:
"""Set element value at path."""
parts = path.split('.')[1:]
current = resource
for part in parts[:-1]:
current = current.setdefault(part, {})
current[parts[-1]] = value
return resource
def _remove_element(self, resource: Dict, path: str) -> Dict:
"""Remove element at path."""
parts = path.split('.')[1:]
current = resource
for part in parts[:-1]:
if part not in current:
return resource
current = current[part]
if parts[-1] in current:
del current[parts[-1]]
return resource
def _pseudonymize_element(self, resource: Dict, path: str,
algorithm: str) -> Dict:
"""Replace element with pseudonym."""
value = self._get_element(resource, path)
if value:
pseudonym = self._generate_pseudonym(str(value))
return self._set_element(resource, path, pseudonym)
return resource
def _generalize_element(self, resource: Dict, path: str,
rule: Dict) -> Dict:
"""Generalize element (e.g., keep only year of date)."""
value = self._get_element(resource, path)
if not value:
return resource
precision = rule.get('precision', 'year')
if precision == 'year' and isinstance(value, str):
# Keep only year
new_value = value[:4]
return self._set_element(resource, path, new_value)
return resource
def _pseudonymize_references(self, resource: Dict, path: str) -> Dict:
"""Pseudonymize reference elements."""
# Implementation would walk the resource and update references
return resource
def _nlp_scrub(self, resource: Dict, path: str) -> Dict:
"""Use NLP to remove PHI from text."""
value = self._get_element(resource, path)
if value and isinstance(value, str):
# Simple pattern-based scrubbing (real impl would use NLP)
scrubbed = self._simple_phi_scrub(value)
return self._set_element(resource, path, scrubbed)
return resource
def _simple_phi_scrub(self, text: str) -> str:
"""Simple pattern-based PHI removal."""
# Remove potential names (capitalized words)
# Remove dates, phone numbers, etc.
patterns = [
(r'\b\d{3}[-.]?\d{3}[-.]?\d{4}\b', '[PHONE]'),
(r'\b[A-Z][a-z]+ [A-Z][a-z]+\b', '[NAME]'),
(r'\b\d{1,2}/\d{1,2}/\d{2,4}\b', '[DATE]'),
]
result = text
for pattern, replacement in patterns:
result = re.sub(pattern, replacement, result)
return result
def _remove_if_phi(self, resource: Dict, path: str) -> Dict:
"""Remove element if it appears to contain PHI."""
value = self._get_element(resource, path)
if value and self._contains_phi(str(value)):
return self._remove_element(resource, path)
return resource
def _contains_phi(self, text: str) -> bool:
"""Check if text appears to contain PHI."""
# Simple heuristic checks
phi_patterns = [
r'\b\d{3}[-.]?\d{2}[-.]?\d{4}\b', # SSN
r'\b\d{3}[-.]?\d{3}[-.]?\d{4}\b', # Phone
]
return any(re.search(p, text) for p in phi_patterns)
def _extract_patient_reference(self, resource: Dict) -> Optional[str]:
"""Extract patient reference from resource."""
for field in ['subject', 'patient']:
ref = resource.get(field, {}).get('reference', '')
if ref.startswith('Patient/'):
return ref.replace('Patient/', '')
return None
DICOM Transformation
De-identifies DICOM datasets following Supplement 142 profiles with tag-level actions, UID re-hashing, and burned-in annotation detection.
from typing import Dict, List, Tuple, Optional, BinaryIO
from dataclasses import dataclass
from enum import Enum
import hashlib
import hmac
class DICOMAction(Enum):
REMOVE = "remove"
REPLACE = "replace"
PSEUDONYMIZE = "pseudonymize"
GENERALIZE = "generalize"
REHASH = "rehash"
RETAIN = "retain"
@dataclass
class TagRule:
"""Rule for processing a DICOM tag"""
tag: Tuple[int, int]
action: DICOMAction
replacement_value: Optional[str] = None
class DICOMDeidentifier:
"""
De-identifies DICOM datasets according to DICOM Supplement 142
and custom de-identification profiles.
"""
def __init__(self, policy: Dict, secret_key: str):
self.policy = policy
self.secret_key = secret_key.encode()
self.uid_mapping = {} # Consistent UID re-hashing
# Parse tag rules from policy
self.tag_rules = self._parse_tag_rules()
def _parse_tag_rules(self) -> Dict[Tuple[int, int], TagRule]:
"""Parse tag rules from policy configuration."""
rules = {}
for rule_config in self.policy.get('dicom', {}).get('tags', []):
tag = self._parse_tag(rule_config['tag'])
action = DICOMAction(rule_config['action'])
rules[tag] = TagRule(
tag=tag,
action=action,
replacement_value=rule_config.get('value')
)
return rules
def _parse_tag(self, tag_str: str) -> Tuple[int, int]:
"""Parse DICOM tag string like '(0010,0010)' to tuple."""
tag_str = tag_str.strip('()')
group, element = tag_str.split(',')
return (int(group, 16), int(element, 16))
def deidentify_dataset(self, dataset) -> None:
"""
De-identify a DICOM dataset in place.
Uses pydicom Dataset object.
"""
# Get patient ID for consistent processing
patient_id = str(getattr(dataset, 'PatientID', 'unknown'))
# Apply tag-level rules
for tag, rule in self.tag_rules.items():
self._apply_tag_rule(dataset, tag, rule, patient_id)
# Handle private tags
self._process_private_tags(dataset)
# Handle UIDs
self._process_uids(dataset)
# Add de-identification method
self._add_deidentification_method(dataset)
def _apply_tag_rule(self, dataset, tag: Tuple[int, int],
rule: TagRule, patient_id: str):
"""Apply a single tag rule to the dataset."""
try:
# Check if tag exists
if tag not in dataset:
return
if rule.action == DICOMAction.REMOVE:
del dataset[tag]
elif rule.action == DICOMAction.REPLACE:
dataset[tag].value = rule.replacement_value
elif rule.action == DICOMAction.PSEUDONYMIZE:
original = str(dataset[tag].value)
pseudonym = self._generate_pseudonym(original, patient_id)
dataset[tag].value = pseudonym
elif rule.action == DICOMAction.GENERALIZE:
# For dates, keep only year
original = str(dataset[tag].value)
if len(original) >= 4:
dataset[tag].value = original[:4] + '0101'
elif rule.action == DICOMAction.REHASH:
original = str(dataset[tag].value)
new_uid = self._rehash_uid(original)
dataset[tag].value = new_uid
except Exception as e:
# Log error but continue processing
print(f"Error processing tag {tag}: {e}")
def _process_private_tags(self, dataset):
"""Remove or process private tags based on policy."""
private_policy = self.policy.get('dicom', {}).get('private_tags', {})
action = private_policy.get('action', 'remove_all')
exceptions = private_policy.get('exceptions', [])
if action == 'remove_all':
# Find all private tags (odd group numbers)
private_tags = [
tag for tag in dataset.keys()
if tag.group % 2 == 1
]
for tag in private_tags:
# Check exceptions
tag_str = f"({tag.group:04X},{tag.element:04X})"
if not any(self._matches_pattern(tag_str, exc) for exc in exceptions):
del dataset[tag]
def _process_uids(self, dataset):
"""Re-hash all UI (UID) type elements for consistency."""
uid_tags = [
(0x0020, 0x000D), # Study Instance UID
(0x0020, 0x000E), # Series Instance UID
(0x0008, 0x0018), # SOP Instance UID
(0x0008, 0x0016), # SOP Class UID (usually retain)
]
for tag in uid_tags:
if tag in dataset:
original_uid = str(dataset[tag].value)
new_uid = self._rehash_uid(original_uid)
dataset[tag].value = new_uid
def _generate_pseudonym(self, value: str, patient_id: str) -> str:
"""Generate consistent pseudonym."""
key_material = f"{patient_id}:{value}"
h = hmac.new(self.secret_key, key_material.encode(), hashlib.sha256)
return h.hexdigest()[:16].upper()
def _rehash_uid(self, original_uid: str) -> str:
"""
Generate new UID from original using consistent hashing.
Maintains UID format requirements.
"""
if original_uid in self.uid_mapping:
return self.uid_mapping[original_uid]
# Generate hash
h = hmac.new(self.secret_key, original_uid.encode(), hashlib.sha256)
hash_bytes = h.digest()
# Convert to UID format: 2.25.{large integer}
# This uses the UUID-derived UID format from DICOM
uid_int = int.from_bytes(hash_bytes[:16], 'big')
new_uid = f"2.25.{uid_int}"
# Ensure UID doesn't exceed 64 characters
if len(new_uid) > 64:
new_uid = new_uid[:64]
self.uid_mapping[original_uid] = new_uid
return new_uid
def _matches_pattern(self, tag_str: str, pattern: str) -> bool:
"""Check if tag matches pattern (supports xx wildcard)."""
import re
pattern = pattern.replace('xx', '[0-9A-Fa-f]{2}')
return bool(re.match(pattern, tag_str))
def _add_deidentification_method(self, dataset):
"""Add de-identification method code sequence."""
from pydicom.sequence import Sequence
from pydicom.dataset import Dataset
# Patient Identity Removed
dataset.PatientIdentityRemoved = 'YES'
# De-identification Method
dataset.DeidentificationMethod = 'Custom Profile based on DICOM Supplement 142'
# De-identification Method Code Sequence
code_seq = Sequence()
code_item = Dataset()
code_item.CodeValue = '113100'
code_item.CodingSchemeDesignator = 'DCM'
code_item.CodeMeaning = 'Basic Application Confidentiality Profile'
code_seq.append(code_item)
dataset.DeidentificationMethodCodeSequence = code_seq
def process_burned_in_annotations(self, dataset) -> bool:
"""
Detect and handle burned-in annotations in pixel data.
Returns True if annotations were processed.
"""
bia_policy = self.policy.get('dicom', {}).get('burned_in_annotation', {})
if not bia_policy.get('detect', False):
return False
# Check if image likely has burned-in annotations
# This is a simplified check - real implementation would use
# machine learning or more sophisticated detection
modality = getattr(dataset, 'Modality', '')
has_overlay = hasattr(dataset, 'OverlayData')
# Check Burned In Annotation attribute
burned_in = getattr(dataset, 'BurnedInAnnotation', 'NO')
if burned_in == 'YES' or has_overlay:
action = bia_policy.get('action', 'blackout')
if action == 'blackout':
self._blackout_regions(dataset, bia_policy.get('regions', []))
return True
elif action == 'reject':
raise ValueError("Image contains burned-in annotations")
return False
def _blackout_regions(self, dataset, regions: List[Dict]):
"""Black out specified regions in pixel data."""
try:
import numpy as np
pixel_array = dataset.pixel_array
rows, cols = pixel_array.shape[:2]
for region in regions:
if 'top_percent' in region:
height = int(rows * region['top_percent'] / 100)
pixel_array[:height, :] = 0
if 'bottom_percent' in region:
height = int(rows * region['bottom_percent'] / 100)
pixel_array[-height:, :] = 0
dataset.PixelData = pixel_array.tobytes()
except Exception as e:
print(f"Error processing pixel data: {e}")
def get_uid_mapping(self) -> Dict[str, str]:
"""Return the UID mapping for cross-referencing."""
return dict(self.uid_mapping)
Related Patterns
- Population Export Pipeline: De-identification is applied to data extracted via Population Export
- Privacy Enforcement: Privacy rules may trigger de-identification for secondary use
- Imaging Bridge: DICOM images require coordinated de-identification with FHIR metadata
- Audit & Provenance Chain: All de-identification transformations are logged for compliance
Benefits
- Consistent Processing: Same rules applied to FHIR and DICOM
- Policy-Driven: Flexible rules for different use cases
- Pseudonymization: Maintain linkage while protecting identity
- Standards-Based: Follows HIPAA Safe Harbor and Expert Determination
- Auditable: Complete record of transformations applied
Trade-offs
- Data Utility: Some transformations reduce analytical value
- Complexity: Multiple data types and rules to manage
- Performance: Transformation overhead for large datasets
- Re-identification Risk: Balancing privacy with utility
References
- HIPAA Safe Harbor - De-identification standard method
- DICOM PS3.15 Security - DICOM de-identification profiles
- FHIR Anonymization Tools - Open source de-identification tools
Re-identification Risk
Evaluate re-identification risk carefully, especially for rare conditions or small populations. Consider k-anonymity and l-diversity requirements.