Skip to content

De-Identification Adapter

Intent

Apply consistent de-identification transformations across FHIR and DICOM data for secondary use scenarios, enabling research while preserving privacy.

Forces

  • Granular Sharing & Legal Obligations: Healthcare data sharing must respect complex, context-dependent consent and privacy rules.

Structure

The De-Identification Adapter pattern provides configurable de-identification transformations for healthcare data, supporting both FHIR resources and DICOM objects.

De-Identification Adapter Architecture

Key Components

DeIdentificationEngine

Orchestrates the de-identification process

PolicyResolver

Determines applicable de-identification rules

FHIRTransformer

Applies de-identification to FHIR resources

DICOMTransformer

Applies de-identification to DICOM objects

TokenGenerator

Creates consistent pseudonymization tokens

Behavior

De-Identification Workflow

The following sequence shows how data is de-identified for secondary use:

De-Identification Sequence

Transformation Steps

  1. Load Policy
  2. Analyze Data
  3. Apply Rules
  4. Generate Tokens
  5. Validate Output
  6. Record Actions

Implementation Considerations

De-Identification Policy

Configuration schema defining de-identification rules for FHIR and DICOM data including direct identifiers, quasi-identifiers, and cross-modal consistency settings.

De-Identification Policy
# De-identification Policy Configuration
# Defines rules for FHIR and DICOM de-identification

policy:
  name: "Research De-identification Policy"
  version: "1.0"
  compliance:
    - "HIPAA Safe Harbor"
    - "DICOM Supplement 142"

  # FHIR De-identification Rules
  fhir:
    # Direct identifiers - always remove or replace
    direct_identifiers:
      - path: "Patient.identifier"
        action: "pseudonymize"
        algorithm: "sha256-hmac"

      - path: "Patient.name"
        action: "remove"

      - path: "Patient.telecom"
        action: "remove"

      - path: "Patient.address"
        action: "generalize"
        retain: ["state", "country"]

      - path: "Patient.birthDate"
        action: "generalize"
        precision: "year"
        shift_if_age_over_89: true

    # Quasi-identifiers - generalize or perturb
    quasi_identifiers:
      - path: "Patient.gender"
        action: "retain"

      - path: "Observation.effectiveDateTime"
        action: "shift"
        range_days: 30
        consistent_per_patient: true

    # References - update to pseudonymized IDs
    references:
      - path: "*.subject"
        action: "pseudonymize_reference"

      - path: "*.patient"
        action: "pseudonymize_reference"

    # Resource-specific rules
    resources:
      DocumentReference:
        - path: "content.attachment.url"
          action: "remove"
        - path: "content.attachment.data"
          action: "remove_if_contains_phi"

      DiagnosticReport:
        - path: "conclusion"
          action: "nlp_scrub"

  # DICOM De-identification Rules  
  dicom:
    profile: "Clean Pixel Data Option"

    # Tag-level actions
    tags:
      # Patient Module
      - tag: "(0010,0010)"  # Patient Name
        action: "replace"
        value: "ANONYMOUS"

      - tag: "(0010,0020)"  # Patient ID
        action: "pseudonymize"

      - tag: "(0010,0030)"  # Patient Birth Date
        action: "generalize_year"

      - tag: "(0010,1000)"  # Other Patient IDs
        action: "remove"

      - tag: "(0010,1001)"  # Other Patient Names
        action: "remove"

      # Study/Series identifiers
      - tag: "(0020,000D)"  # Study Instance UID
        action: "rehash"

      - tag: "(0020,000E)"  # Series Instance UID
        action: "rehash"

      - tag: "(0008,0018)"  # SOP Instance UID
        action: "rehash"

    # Private tags
    private_tags:
      action: "remove_all"
      exceptions:
        - "(0019,xx10)"  # Preserve specific private creator

    # Pixel data
    burned_in_annotation:
      detect: true
      action: "blackout"
      regions:
        - top_percent: 10
        - bottom_percent: 5

  # Cross-modal consistency
  consistency:
    patient_pseudonym:
      algorithm: "deterministic"
      salt_source: "project_secret"

    date_shift:
      consistent_per_patient: true
      preserve_intervals: true

    uid_mapping:
      maintain_references: true
      store_mapping: "secure_vault"

FHIR Transformation

Applies de-identification rules to FHIR resources with support for removal, pseudonymization, generalization, and date shifting transformations.

FHIR Transformation
from typing import Dict, List, Any, Optional
from dataclasses import dataclass
from datetime import datetime, timedelta
import hashlib
import hmac
import re
import copy

@dataclass
class DeidentificationRule:
    """Rule for de-identifying a FHIR element"""
    path: str
    action: str  # remove, pseudonymize, generalize, shift, retain
    params: Dict[str, Any] = None

class FHIRDeidentifier:
    """
    De-identifies FHIR resources according to policy rules.
    Supports HIPAA Safe Harbor and custom de-identification profiles.
    """

    def __init__(self, policy: Dict, secret_key: str):
        self.policy = policy
        self.secret_key = secret_key.encode()
        self.patient_date_shifts = {}  # Consistent shifts per patient
        self.pseudonym_cache = {}  # Consistent pseudonyms

    def deidentify_resource(self, resource: Dict, 
                           patient_id: str = None) -> Dict:
        """
        De-identify a single FHIR resource.
        Returns a new de-identified copy.
        """
        # Deep copy to avoid modifying original
        result = copy.deepcopy(resource)

        # Get resource type
        resource_type = result.get('resourceType')

        # Apply general FHIR rules
        result = self._apply_direct_identifier_rules(result, patient_id)
        result = self._apply_quasi_identifier_rules(result, patient_id)
        result = self._apply_reference_rules(result, patient_id)

        # Apply resource-specific rules
        if resource_type in self.policy.get('fhir', {}).get('resources', {}):
            result = self._apply_resource_rules(result, resource_type)

        # Add de-identification metadata
        result = self._add_security_label(result)

        return result

    def deidentify_bundle(self, bundle: Dict) -> Dict:
        """De-identify all resources in a FHIR Bundle."""
        result = copy.deepcopy(bundle)

        # First pass: identify all patients for consistent processing
        patient_map = {}
        for entry in result.get('entry', []):
            resource = entry.get('resource', {})
            if resource.get('resourceType') == 'Patient':
                original_id = resource.get('id')
                patient_map[original_id] = self._generate_pseudonym(
                    f"Patient/{original_id}"
                )

        # Second pass: de-identify all resources
        for entry in result.get('entry', []):
            resource = entry.get('resource', {})
            patient_ref = self._extract_patient_reference(resource)
            entry['resource'] = self.deidentify_resource(resource, patient_ref)

        return result

    def _apply_direct_identifier_rules(self, resource: Dict, 
                                       patient_id: str) -> Dict:
        """Apply rules for direct identifiers."""
        rules = self.policy.get('fhir', {}).get('direct_identifiers', [])

        for rule in rules:
            path = rule['path']
            action = rule['action']

            if self._path_applies(path, resource):
                if action == 'remove':
                    resource = self._remove_element(resource, path)
                elif action == 'pseudonymize':
                    resource = self._pseudonymize_element(
                        resource, path, rule.get('algorithm', 'sha256-hmac')
                    )
                elif action == 'generalize':
                    resource = self._generalize_element(resource, path, rule)

        return resource

    def _apply_quasi_identifier_rules(self, resource: Dict,
                                      patient_id: str) -> Dict:
        """Apply rules for quasi-identifiers."""
        rules = self.policy.get('fhir', {}).get('quasi_identifiers', [])

        for rule in rules:
            path = rule['path']
            action = rule['action']

            if self._path_applies(path, resource):
                if action == 'shift':
                    resource = self._shift_date(
                        resource, path, patient_id,
                        rule.get('range_days', 30),
                        rule.get('consistent_per_patient', True)
                    )
                elif action == 'retain':
                    pass  # Keep as-is

        return resource

    def _apply_reference_rules(self, resource: Dict, 
                               patient_id: str) -> Dict:
        """Update references to use pseudonymized IDs."""
        rules = self.policy.get('fhir', {}).get('references', [])

        for rule in rules:
            path = rule['path']
            if rule['action'] == 'pseudonymize_reference':
                resource = self._pseudonymize_references(resource, path)

        return resource

    def _apply_resource_rules(self, resource: Dict, 
                             resource_type: str) -> Dict:
        """Apply resource-specific de-identification rules."""
        rules = self.policy['fhir']['resources'].get(resource_type, [])

        for rule in rules:
            path = rule['path']
            action = rule['action']

            if action == 'remove':
                resource = self._remove_element(resource, path)
            elif action == 'nlp_scrub':
                resource = self._nlp_scrub(resource, path)
            elif action == 'remove_if_contains_phi':
                resource = self._remove_if_phi(resource, path)

        return resource

    def _generate_pseudonym(self, identifier: str) -> str:
        """Generate consistent pseudonym using HMAC."""
        if identifier in self.pseudonym_cache:
            return self.pseudonym_cache[identifier]

        h = hmac.new(self.secret_key, identifier.encode(), hashlib.sha256)
        pseudonym = h.hexdigest()[:16]
        self.pseudonym_cache[identifier] = pseudonym

        return pseudonym

    def _get_date_shift(self, patient_id: str, range_days: int) -> int:
        """Get consistent date shift for a patient."""
        if patient_id not in self.patient_date_shifts:
            # Generate deterministic shift based on patient ID
            h = hmac.new(self.secret_key, patient_id.encode(), hashlib.sha256)
            shift = int.from_bytes(h.digest()[:4], 'big') % (range_days * 2) - range_days
            self.patient_date_shifts[patient_id] = shift

        return self.patient_date_shifts[patient_id]

    def _shift_date(self, resource: Dict, path: str, 
                   patient_id: str, range_days: int,
                   consistent: bool) -> Dict:
        """Shift date/datetime values."""
        value = self._get_element(resource, path)
        if not value:
            return resource

        if consistent and patient_id:
            shift_days = self._get_date_shift(patient_id, range_days)
        else:
            shift_days = self._get_date_shift(str(id(resource)), range_days)

        # Parse and shift the date
        try:
            if 'T' in value:
                dt = datetime.fromisoformat(value.replace('Z', '+00:00'))
                shifted = dt + timedelta(days=shift_days)
                new_value = shifted.isoformat()
            else:
                dt = datetime.strptime(value, '%Y-%m-%d')
                shifted = dt + timedelta(days=shift_days)
                new_value = shifted.strftime('%Y-%m-%d')

            return self._set_element(resource, path, new_value)
        except:
            return resource

    def _add_security_label(self, resource: Dict) -> Dict:
        """Add security label indicating de-identification."""
        if 'meta' not in resource:
            resource['meta'] = {}

        if 'security' not in resource['meta']:
            resource['meta']['security'] = []

        resource['meta']['security'].append({
            "system": "http://terminology.hl7.org/CodeSystem/v3-ObservationValue",
            "code": "ANONYED",
            "display": "anonymized"
        })

        return resource

    # Helper methods for path navigation
    def _path_applies(self, path: str, resource: Dict) -> bool:
        """Check if path applies to this resource."""
        if path.startswith('*.'):
            return True
        parts = path.split('.')
        return parts[0] == resource.get('resourceType')

    def _get_element(self, resource: Dict, path: str) -> Any:
        """Get element value at path."""
        parts = path.split('.')[1:]  # Skip resource type
        current = resource
        for part in parts:
            if isinstance(current, dict):
                current = current.get(part)
            else:
                return None
        return current

    def _set_element(self, resource: Dict, path: str, value: Any) -> Dict:
        """Set element value at path."""
        parts = path.split('.')[1:]
        current = resource
        for part in parts[:-1]:
            current = current.setdefault(part, {})
        current[parts[-1]] = value
        return resource

    def _remove_element(self, resource: Dict, path: str) -> Dict:
        """Remove element at path."""
        parts = path.split('.')[1:]
        current = resource
        for part in parts[:-1]:
            if part not in current:
                return resource
            current = current[part]
        if parts[-1] in current:
            del current[parts[-1]]
        return resource

    def _pseudonymize_element(self, resource: Dict, path: str, 
                             algorithm: str) -> Dict:
        """Replace element with pseudonym."""
        value = self._get_element(resource, path)
        if value:
            pseudonym = self._generate_pseudonym(str(value))
            return self._set_element(resource, path, pseudonym)
        return resource

    def _generalize_element(self, resource: Dict, path: str, 
                           rule: Dict) -> Dict:
        """Generalize element (e.g., keep only year of date)."""
        value = self._get_element(resource, path)
        if not value:
            return resource

        precision = rule.get('precision', 'year')
        if precision == 'year' and isinstance(value, str):
            # Keep only year
            new_value = value[:4]
            return self._set_element(resource, path, new_value)

        return resource

    def _pseudonymize_references(self, resource: Dict, path: str) -> Dict:
        """Pseudonymize reference elements."""
        # Implementation would walk the resource and update references
        return resource

    def _nlp_scrub(self, resource: Dict, path: str) -> Dict:
        """Use NLP to remove PHI from text."""
        value = self._get_element(resource, path)
        if value and isinstance(value, str):
            # Simple pattern-based scrubbing (real impl would use NLP)
            scrubbed = self._simple_phi_scrub(value)
            return self._set_element(resource, path, scrubbed)
        return resource

    def _simple_phi_scrub(self, text: str) -> str:
        """Simple pattern-based PHI removal."""
        # Remove potential names (capitalized words)
        # Remove dates, phone numbers, etc.
        patterns = [
            (r'\b\d{3}[-.]?\d{3}[-.]?\d{4}\b', '[PHONE]'),
            (r'\b[A-Z][a-z]+ [A-Z][a-z]+\b', '[NAME]'),
            (r'\b\d{1,2}/\d{1,2}/\d{2,4}\b', '[DATE]'),
        ]

        result = text
        for pattern, replacement in patterns:
            result = re.sub(pattern, replacement, result)

        return result

    def _remove_if_phi(self, resource: Dict, path: str) -> Dict:
        """Remove element if it appears to contain PHI."""
        value = self._get_element(resource, path)
        if value and self._contains_phi(str(value)):
            return self._remove_element(resource, path)
        return resource

    def _contains_phi(self, text: str) -> bool:
        """Check if text appears to contain PHI."""
        # Simple heuristic checks
        phi_patterns = [
            r'\b\d{3}[-.]?\d{2}[-.]?\d{4}\b',  # SSN
            r'\b\d{3}[-.]?\d{3}[-.]?\d{4}\b',  # Phone
        ]
        return any(re.search(p, text) for p in phi_patterns)

    def _extract_patient_reference(self, resource: Dict) -> Optional[str]:
        """Extract patient reference from resource."""
        for field in ['subject', 'patient']:
            ref = resource.get(field, {}).get('reference', '')
            if ref.startswith('Patient/'):
                return ref.replace('Patient/', '')
        return None

DICOM Transformation

De-identifies DICOM datasets following Supplement 142 profiles with tag-level actions, UID re-hashing, and burned-in annotation detection.

DICOM Transformation
from typing import Dict, List, Tuple, Optional, BinaryIO
from dataclasses import dataclass
from enum import Enum
import hashlib
import hmac

class DICOMAction(Enum):
    REMOVE = "remove"
    REPLACE = "replace"
    PSEUDONYMIZE = "pseudonymize"
    GENERALIZE = "generalize"
    REHASH = "rehash"
    RETAIN = "retain"

@dataclass
class TagRule:
    """Rule for processing a DICOM tag"""
    tag: Tuple[int, int]
    action: DICOMAction
    replacement_value: Optional[str] = None

class DICOMDeidentifier:
    """
    De-identifies DICOM datasets according to DICOM Supplement 142
    and custom de-identification profiles.
    """

    def __init__(self, policy: Dict, secret_key: str):
        self.policy = policy
        self.secret_key = secret_key.encode()
        self.uid_mapping = {}  # Consistent UID re-hashing

        # Parse tag rules from policy
        self.tag_rules = self._parse_tag_rules()

    def _parse_tag_rules(self) -> Dict[Tuple[int, int], TagRule]:
        """Parse tag rules from policy configuration."""
        rules = {}

        for rule_config in self.policy.get('dicom', {}).get('tags', []):
            tag = self._parse_tag(rule_config['tag'])
            action = DICOMAction(rule_config['action'])

            rules[tag] = TagRule(
                tag=tag,
                action=action,
                replacement_value=rule_config.get('value')
            )

        return rules

    def _parse_tag(self, tag_str: str) -> Tuple[int, int]:
        """Parse DICOM tag string like '(0010,0010)' to tuple."""
        tag_str = tag_str.strip('()')
        group, element = tag_str.split(',')
        return (int(group, 16), int(element, 16))

    def deidentify_dataset(self, dataset) -> None:
        """
        De-identify a DICOM dataset in place.
        Uses pydicom Dataset object.
        """
        # Get patient ID for consistent processing
        patient_id = str(getattr(dataset, 'PatientID', 'unknown'))

        # Apply tag-level rules
        for tag, rule in self.tag_rules.items():
            self._apply_tag_rule(dataset, tag, rule, patient_id)

        # Handle private tags
        self._process_private_tags(dataset)

        # Handle UIDs
        self._process_uids(dataset)

        # Add de-identification method
        self._add_deidentification_method(dataset)

    def _apply_tag_rule(self, dataset, tag: Tuple[int, int], 
                       rule: TagRule, patient_id: str):
        """Apply a single tag rule to the dataset."""
        try:
            # Check if tag exists
            if tag not in dataset:
                return

            if rule.action == DICOMAction.REMOVE:
                del dataset[tag]

            elif rule.action == DICOMAction.REPLACE:
                dataset[tag].value = rule.replacement_value

            elif rule.action == DICOMAction.PSEUDONYMIZE:
                original = str(dataset[tag].value)
                pseudonym = self._generate_pseudonym(original, patient_id)
                dataset[tag].value = pseudonym

            elif rule.action == DICOMAction.GENERALIZE:
                # For dates, keep only year
                original = str(dataset[tag].value)
                if len(original) >= 4:
                    dataset[tag].value = original[:4] + '0101'

            elif rule.action == DICOMAction.REHASH:
                original = str(dataset[tag].value)
                new_uid = self._rehash_uid(original)
                dataset[tag].value = new_uid

        except Exception as e:
            # Log error but continue processing
            print(f"Error processing tag {tag}: {e}")

    def _process_private_tags(self, dataset):
        """Remove or process private tags based on policy."""
        private_policy = self.policy.get('dicom', {}).get('private_tags', {})
        action = private_policy.get('action', 'remove_all')
        exceptions = private_policy.get('exceptions', [])

        if action == 'remove_all':
            # Find all private tags (odd group numbers)
            private_tags = [
                tag for tag in dataset.keys()
                if tag.group % 2 == 1
            ]

            for tag in private_tags:
                # Check exceptions
                tag_str = f"({tag.group:04X},{tag.element:04X})"
                if not any(self._matches_pattern(tag_str, exc) for exc in exceptions):
                    del dataset[tag]

    def _process_uids(self, dataset):
        """Re-hash all UI (UID) type elements for consistency."""
        uid_tags = [
            (0x0020, 0x000D),  # Study Instance UID
            (0x0020, 0x000E),  # Series Instance UID
            (0x0008, 0x0018),  # SOP Instance UID
            (0x0008, 0x0016),  # SOP Class UID (usually retain)
        ]

        for tag in uid_tags:
            if tag in dataset:
                original_uid = str(dataset[tag].value)
                new_uid = self._rehash_uid(original_uid)
                dataset[tag].value = new_uid

    def _generate_pseudonym(self, value: str, patient_id: str) -> str:
        """Generate consistent pseudonym."""
        key_material = f"{patient_id}:{value}"
        h = hmac.new(self.secret_key, key_material.encode(), hashlib.sha256)
        return h.hexdigest()[:16].upper()

    def _rehash_uid(self, original_uid: str) -> str:
        """
        Generate new UID from original using consistent hashing.
        Maintains UID format requirements.
        """
        if original_uid in self.uid_mapping:
            return self.uid_mapping[original_uid]

        # Generate hash
        h = hmac.new(self.secret_key, original_uid.encode(), hashlib.sha256)
        hash_bytes = h.digest()

        # Convert to UID format: 2.25.{large integer}
        # This uses the UUID-derived UID format from DICOM
        uid_int = int.from_bytes(hash_bytes[:16], 'big')
        new_uid = f"2.25.{uid_int}"

        # Ensure UID doesn't exceed 64 characters
        if len(new_uid) > 64:
            new_uid = new_uid[:64]

        self.uid_mapping[original_uid] = new_uid
        return new_uid

    def _matches_pattern(self, tag_str: str, pattern: str) -> bool:
        """Check if tag matches pattern (supports xx wildcard)."""
        import re
        pattern = pattern.replace('xx', '[0-9A-Fa-f]{2}')
        return bool(re.match(pattern, tag_str))

    def _add_deidentification_method(self, dataset):
        """Add de-identification method code sequence."""
        from pydicom.sequence import Sequence
        from pydicom.dataset import Dataset

        # Patient Identity Removed
        dataset.PatientIdentityRemoved = 'YES'

        # De-identification Method
        dataset.DeidentificationMethod = 'Custom Profile based on DICOM Supplement 142'

        # De-identification Method Code Sequence
        code_seq = Sequence()

        code_item = Dataset()
        code_item.CodeValue = '113100'
        code_item.CodingSchemeDesignator = 'DCM'
        code_item.CodeMeaning = 'Basic Application Confidentiality Profile'
        code_seq.append(code_item)

        dataset.DeidentificationMethodCodeSequence = code_seq

    def process_burned_in_annotations(self, dataset) -> bool:
        """
        Detect and handle burned-in annotations in pixel data.
        Returns True if annotations were processed.
        """
        bia_policy = self.policy.get('dicom', {}).get('burned_in_annotation', {})

        if not bia_policy.get('detect', False):
            return False

        # Check if image likely has burned-in annotations
        # This is a simplified check - real implementation would use
        # machine learning or more sophisticated detection

        modality = getattr(dataset, 'Modality', '')
        has_overlay = hasattr(dataset, 'OverlayData')

        # Check Burned In Annotation attribute
        burned_in = getattr(dataset, 'BurnedInAnnotation', 'NO')

        if burned_in == 'YES' or has_overlay:
            action = bia_policy.get('action', 'blackout')

            if action == 'blackout':
                self._blackout_regions(dataset, bia_policy.get('regions', []))
                return True
            elif action == 'reject':
                raise ValueError("Image contains burned-in annotations")

        return False

    def _blackout_regions(self, dataset, regions: List[Dict]):
        """Black out specified regions in pixel data."""
        try:
            import numpy as np

            pixel_array = dataset.pixel_array
            rows, cols = pixel_array.shape[:2]

            for region in regions:
                if 'top_percent' in region:
                    height = int(rows * region['top_percent'] / 100)
                    pixel_array[:height, :] = 0

                if 'bottom_percent' in region:
                    height = int(rows * region['bottom_percent'] / 100)
                    pixel_array[-height:, :] = 0

            dataset.PixelData = pixel_array.tobytes()

        except Exception as e:
            print(f"Error processing pixel data: {e}")

    def get_uid_mapping(self) -> Dict[str, str]:
        """Return the UID mapping for cross-referencing."""
        return dict(self.uid_mapping)

Benefits

  • Consistent Processing: Same rules applied to FHIR and DICOM
  • Policy-Driven: Flexible rules for different use cases
  • Pseudonymization: Maintain linkage while protecting identity
  • Standards-Based: Follows HIPAA Safe Harbor and Expert Determination
  • Auditable: Complete record of transformations applied

Trade-offs

  • Data Utility: Some transformations reduce analytical value
  • Complexity: Multiple data types and rules to manage
  • Performance: Transformation overhead for large datasets
  • Re-identification Risk: Balancing privacy with utility

References


Re-identification Risk

Evaluate re-identification risk carefully, especially for rare conditions or small populations. Consider k-anonymity and l-diversity requirements.