Skip to content

Lab 32: Privacy Impact Assessment

Chapter: 56 -- Privacy Engineering | 13 -- Security Governance, Privacy & Risk | 55 -- Threat Modeling Operations Difficulty: ⭐⭐⭐⭐☆ Advanced Estimated Time: 4 hours Prerequisites: Chapter 56, Chapter 13, Chapter 55, familiarity with GDPR fundamentals, basic Python scripting, understanding of data protection principles


Overview

In this lab you will:

  1. Define a realistic AI employee monitoring scenario -- building data flow diagrams with Mermaid, identifying processing purposes, establishing legal basis under GDPR, and documenting the complete data lifecycle from collection through deletion
  2. Conduct a comprehensive data inventory and classification -- cataloging all personal data categories, classifying by sensitivity level (standard, special category, children's data), mapping retention periods, and running automated PII discovery scripts against synthetic datasets
  3. Execute a full DPIA under GDPR Article 35 -- applying systematic evaluation criteria, assessing necessity and proportionality, scoring risks with a likelihood-impact matrix, and documenting findings in a structured DPIA template
  4. Implement privacy-preserving technical controls -- deploying pseudonymization, encryption at rest and in transit, role-based access controls, data minimization pipelines, and automated retention enforcement with Python code for each control
  5. Perform LINDDUN threat analysis -- applying all seven LINDDUN categories (Linkability, Identifiability, Non-repudiation, Detectability, Disclosure of information, Unawareness, Non-compliance) to the AI monitoring system and mapping threats to privacy patterns
  6. Validate compliance end-to-end -- verifying GDPR Article 25 (Data Protection by Design), Article 30 (Records of Processing Activities), and Article 35 (DPIA completion), generating compliance reports, and defining ongoing monitoring metrics

Synthetic Data Only

All data in this lab is 100% synthetic and fictional. All IP addresses use RFC 5737 (192.0.2.x, 198.51.100.x, 203.0.113.x) or RFC 1918 (10.x, 172.16.x, 192.168.x) reserved ranges. All domains use *.example or *.example.com. All credentials are testuser/REDACTED. All personal data is entirely fictitious and does not represent real individuals. This lab is for defensive education only -- never process real personal data without proper authorization, legal basis, and data protection safeguards.


Scenario

You are the Data Protection Engineer at Helios Analytics Corp (a fictional EU-based organization with 2,400 employees across Germany, France, and the Netherlands). The Chief Information Officer has proposed deploying WorkSight AI -- an AI-powered employee monitoring system that tracks:

  • Keystroke patterns and application usage
  • Email metadata (not content) and communication frequency
  • Badge access logs and physical movement within offices
  • Screen activity snapshots (blurred, taken every 15 minutes)
  • Productivity scoring based on ML models

The Data Protection Officer (DPO) has flagged this as high-risk processing under GDPR Article 35(3) and mandated a full Data Protection Impact Assessment (DPIA) before any deployment can proceed. The Works Council (employee representative body required under German law) has raised concerns about proportionality and employee surveillance.

Your task is to conduct the DPIA end-to-end, implement privacy controls, validate compliance, and produce a report suitable for submission to the supervisory authority.

Environment:

Asset Details
DPIA Server dpia-server.internal.example.com (10.60.1.100)
Data Catalog catalog.internal.example.com (10.60.1.101)
Privacy Platform privacy-platform.internal.example.com (10.60.1.102)
HR System hr-system.internal.example.com (10.60.1.103)
WorkSight AI Engine worksight-ai.internal.example.com (10.60.1.110)
Badge Access Controller badge-ctrl.internal.example.com (10.60.1.120)
Email Gateway mail-gw.internal.example.com (10.60.1.130)
Encryption Key Vault keyvault.internal.example.com (10.60.1.140)
Log Aggregator siem.internal.example.com (10.60.1.150)
Auth testuser / REDACTED

Stakeholders:

Role Name (Fictional) Responsibility
Data Protection Officer Dr. Elke Krause DPIA oversight, supervisory authority liaison
CISO Marcus Brandt Security controls, risk acceptance
CIO Sarah Okonkwo Business sponsor, system owner
Works Council Lead Jan de Vries Employee representation, consent issues
Legal Counsel Amelie Fontaine Legal basis determination, GDPR interpretation
ML Engineer Raj Patel AI model transparency, bias assessment
HR Director Thomas Mueller Employee data governance, policy alignment

Phase 1: Scenario Setup -- Data Flow Mapping

Step 1.1: Define Processing Activities

First, document every processing activity that WorkSight AI will perform. Create a processing activity register that feeds into the DPIA.

Why This Matters

GDPR Article 30 requires controllers to maintain Records of Processing Activities (ROPA). This register becomes the foundation for your DPIA and must be maintained throughout the system lifecycle. Without a complete ROPA, you cannot demonstrate compliance under the accountability principle (Article 5(2)).

Create a file called processing_activities.py:

"""
WorkSight AI -- Processing Activity Register Generator
Generates GDPR Article 30 compliant processing activity records.
All data is 100% synthetic for educational purposes.
"""

import json
from datetime import datetime, timedelta
from typing import Dict, List, Optional
from dataclasses import dataclass, field, asdict
from enum import Enum


class LegalBasis(Enum):
    """GDPR Article 6(1) legal bases for processing."""
    CONSENT = "6(1)(a) - Consent"
    CONTRACT = "6(1)(b) - Contract performance"
    LEGAL_OBLIGATION = "6(1)(c) - Legal obligation"
    VITAL_INTEREST = "6(1)(d) - Vital interests"
    PUBLIC_INTEREST = "6(1)(e) - Public interest"
    LEGITIMATE_INTEREST = "6(1)(f) - Legitimate interest"


class DataCategory(Enum):
    """Personal data categories under GDPR."""
    STANDARD = "Standard personal data"
    SPECIAL_ART9 = "Special category (Article 9)"
    CRIMINAL_ART10 = "Criminal conviction data (Article 10)"
    CHILDREN = "Children's data (Article 8)"


class SensitivityLevel(Enum):
    """Data sensitivity classification."""
    LOW = "Low"
    MEDIUM = "Medium"
    HIGH = "High"
    CRITICAL = "Critical"


@dataclass
class DataElement:
    """Individual data element within a processing activity."""
    name: str
    description: str
    category: DataCategory
    sensitivity: SensitivityLevel
    retention_days: int
    legal_basis: LegalBasis
    special_category_condition: Optional[str] = None
    pseudonymized: bool = False
    encrypted: bool = False


@dataclass
class ProcessingActivity:
    """GDPR Article 30 compliant processing activity record."""
    activity_id: str
    name: str
    purpose: str
    controller: str
    dpo_contact: str
    legal_basis: LegalBasis
    legitimate_interest_assessment: Optional[str] = None
    data_subjects: List[str] = field(default_factory=list)
    data_elements: List[DataElement] = field(default_factory=list)
    recipients: List[str] = field(default_factory=list)
    third_country_transfers: List[str] = field(default_factory=list)
    retention_period: str = ""
    security_measures: List[str] = field(default_factory=list)
    dpia_required: bool = False
    dpia_reference: Optional[str] = None
    automated_decision_making: bool = False
    profiling: bool = False
    date_created: str = field(
        default_factory=lambda: datetime.now().strftime("%Y-%m-%d")
    )
    last_reviewed: str = field(
        default_factory=lambda: datetime.now().strftime("%Y-%m-%d")
    )


def build_worksight_register() -> List[ProcessingActivity]:
    """Build the complete processing activity register for WorkSight AI."""

    activities = []

    # PA-001: Keystroke Pattern Analysis
    pa_001 = ProcessingActivity(
        activity_id="PA-001",
        name="Keystroke Pattern Analysis",
        purpose="Measure typing speed and application switching frequency "
                "to generate productivity metrics for team-level reporting",
        controller="Helios Analytics Corp, Friedrichstrasse 42, "
                   "10117 Berlin, Germany",
        dpo_contact="Dr. Elke Krause, dpo@helios-analytics.example.com",
        legal_basis=LegalBasis.LEGITIMATE_INTEREST,
        legitimate_interest_assessment=(
            "Legitimate interest: improving workforce productivity. "
            "Balanced against employee privacy rights. "
            "Works Council consulted per BetrVG Section 87(1)(6). "
            "Individual-level monitoring REJECTED as disproportionate -- "
            "only team-level aggregates permitted."
        ),
        data_subjects=["Employees (EU)", "Contractors (EU)"],
        data_elements=[
            DataElement(
                name="Keystroke timing patterns",
                description="Inter-key intervals and typing rhythm (NOT "
                            "actual keystrokes or content)",
                category=DataCategory.STANDARD,
                sensitivity=SensitivityLevel.HIGH,
                retention_days=90,
                legal_basis=LegalBasis.LEGITIMATE_INTEREST,
                pseudonymized=True,
                encrypted=True,
            ),
            DataElement(
                name="Application usage logs",
                description="Application names and window focus duration",
                category=DataCategory.STANDARD,
                sensitivity=SensitivityLevel.MEDIUM,
                retention_days=90,
                legal_basis=LegalBasis.LEGITIMATE_INTEREST,
                pseudonymized=True,
                encrypted=True,
            ),
            DataElement(
                name="Employee identifier (pseudonymized)",
                description="SHA-256 hash of employee ID, rotated monthly",
                category=DataCategory.STANDARD,
                sensitivity=SensitivityLevel.MEDIUM,
                retention_days=90,
                legal_basis=LegalBasis.LEGITIMATE_INTEREST,
                pseudonymized=True,
                encrypted=True,
            ),
        ],
        recipients=[
            "WorkSight AI Engine (internal processor)",
            "Team managers (aggregated reports only)",
            "HR Department (anonymized trend reports)",
        ],
        third_country_transfers=[],
        retention_period="90 days from collection, then irreversibly "
                         "aggregated to team-level statistics",
        security_measures=[
            "AES-256-GCM encryption at rest",
            "TLS 1.3 in transit",
            "Pseudonymization with rotating keys",
            "Role-based access control (RBAC)",
            "Audit logging of all access",
            "Network segmentation (VLAN 60)",
        ],
        dpia_required=True,
        dpia_reference="DPIA-WS-2026-001",
        automated_decision_making=False,
        profiling=True,
    )
    activities.append(pa_001)

    # PA-002: Email Metadata Analysis
    pa_002 = ProcessingActivity(
        activity_id="PA-002",
        name="Email Metadata Analysis",
        purpose="Analyze email send/receive frequency and response times "
                "to identify communication bottlenecks at team level",
        controller="Helios Analytics Corp, Friedrichstrasse 42, "
                   "10117 Berlin, Germany",
        dpo_contact="Dr. Elke Krause, dpo@helios-analytics.example.com",
        legal_basis=LegalBasis.LEGITIMATE_INTEREST,
        legitimate_interest_assessment=(
            "Legitimate interest: optimizing team communication workflows. "
            "Email CONTENT is never accessed -- metadata only. "
            "Works Council agreement obtained. "
            "Individual attribution removed before analysis."
        ),
        data_subjects=["Employees (EU)"],
        data_elements=[
            DataElement(
                name="Email send/receive timestamps",
                description="Date and time of email events (no content)",
                category=DataCategory.STANDARD,
                sensitivity=SensitivityLevel.MEDIUM,
                retention_days=60,
                legal_basis=LegalBasis.LEGITIMATE_INTEREST,
                pseudonymized=True,
                encrypted=True,
            ),
            DataElement(
                name="Response time metrics",
                description="Average reply latency per team (aggregated)",
                category=DataCategory.STANDARD,
                sensitivity=SensitivityLevel.LOW,
                retention_days=180,
                legal_basis=LegalBasis.LEGITIMATE_INTEREST,
                pseudonymized=True,
                encrypted=False,
            ),
            DataElement(
                name="Communication frequency counts",
                description="Number of emails per day per team",
                category=DataCategory.STANDARD,
                sensitivity=SensitivityLevel.LOW,
                retention_days=180,
                legal_basis=LegalBasis.LEGITIMATE_INTEREST,
                pseudonymized=True,
                encrypted=False,
            ),
        ],
        recipients=[
            "WorkSight AI Engine (internal processor)",
            "Team managers (aggregated reports only)",
        ],
        third_country_transfers=[],
        retention_period="60 days raw metadata, 180 days aggregated metrics",
        security_measures=[
            "AES-256-GCM encryption at rest for raw data",
            "TLS 1.3 in transit",
            "Pseudonymization before analysis",
            "No email content access (metadata only)",
            "Audit logging",
        ],
        dpia_required=True,
        dpia_reference="DPIA-WS-2026-001",
        automated_decision_making=False,
        profiling=True,
    )
    activities.append(pa_002)

    # PA-003: Badge Access & Physical Movement
    pa_003 = ProcessingActivity(
        activity_id="PA-003",
        name="Badge Access & Physical Movement Tracking",
        purpose="Monitor building entry/exit and floor movement patterns "
                "for space utilization analysis and security compliance",
        controller="Helios Analytics Corp, Friedrichstrasse 42, "
                   "10117 Berlin, Germany",
        dpo_contact="Dr. Elke Krause, dpo@helios-analytics.example.com",
        legal_basis=LegalBasis.LEGITIMATE_INTEREST,
        legitimate_interest_assessment=(
            "Dual basis: legitimate interest (facility management) and "
            "legal obligation (workplace safety regulations). "
            "Movement tracking limited to floor-level granularity -- "
            "no room-level or desk-level tracking permitted."
        ),
        data_subjects=["Employees (EU)", "Contractors (EU)", "Visitors"],
        data_elements=[
            DataElement(
                name="Badge swipe events",
                description="Entry/exit timestamps at building and floor "
                            "level access points",
                category=DataCategory.STANDARD,
                sensitivity=SensitivityLevel.MEDIUM,
                retention_days=365,
                legal_basis=LegalBasis.LEGITIMATE_INTEREST,
                pseudonymized=False,
                encrypted=True,
            ),
            DataElement(
                name="Floor presence duration",
                description="Time spent on each floor per day",
                category=DataCategory.STANDARD,
                sensitivity=SensitivityLevel.MEDIUM,
                retention_days=90,
                legal_basis=LegalBasis.LEGITIMATE_INTEREST,
                pseudonymized=True,
                encrypted=True,
            ),
            DataElement(
                name="Visitor badge records",
                description="Visitor name, host employee, entry/exit time",
                category=DataCategory.STANDARD,
                sensitivity=SensitivityLevel.LOW,
                retention_days=30,
                legal_basis=LegalBasis.LEGAL_OBLIGATION,
                pseudonymized=False,
                encrypted=True,
            ),
        ],
        recipients=[
            "Facilities Management",
            "Security Operations Center",
            "WorkSight AI Engine (aggregated patterns only)",
        ],
        third_country_transfers=[],
        retention_period="30-365 days depending on data element",
        security_measures=[
            "AES-256 encryption at rest",
            "TLS 1.3 in transit",
            "Physical security of badge readers",
            "Access limited to Facilities and Security teams",
            "Audit logging of all queries",
        ],
        dpia_required=True,
        dpia_reference="DPIA-WS-2026-001",
        automated_decision_making=False,
        profiling=False,
    )
    activities.append(pa_003)

    # PA-004: Screen Activity Capture
    pa_004 = ProcessingActivity(
        activity_id="PA-004",
        name="Screen Activity Capture",
        purpose="Capture blurred screen snapshots at 15-minute intervals "
                "to verify policy compliance and detect shadow IT usage",
        controller="Helios Analytics Corp, Friedrichstrasse 42, "
                   "10117 Berlin, Germany",
        dpo_contact="Dr. Elke Krause, dpo@helios-analytics.example.com",
        legal_basis=LegalBasis.LEGITIMATE_INTEREST,
        legitimate_interest_assessment=(
            "HIGH RISK: Screen captures inherently invasive. "
            "Gaussian blur (sigma=15) applied at capture time. "
            "Only application window titles are extracted in cleartext. "
            "Original unblurred images are NEVER stored. "
            "Works Council has CONDITIONAL approval pending DPIA outcome."
        ),
        data_subjects=["Employees (EU)"],
        data_elements=[
            DataElement(
                name="Blurred screen snapshots",
                description="15-minute interval screenshots with Gaussian "
                            "blur applied at capture -- content unreadable",
                category=DataCategory.STANDARD,
                sensitivity=SensitivityLevel.CRITICAL,
                retention_days=7,
                legal_basis=LegalBasis.LEGITIMATE_INTEREST,
                pseudonymized=True,
                encrypted=True,
            ),
            DataElement(
                name="Active window titles",
                description="Application name and window title text",
                category=DataCategory.STANDARD,
                sensitivity=SensitivityLevel.HIGH,
                retention_days=30,
                legal_basis=LegalBasis.LEGITIMATE_INTEREST,
                pseudonymized=True,
                encrypted=True,
            ),
        ],
        recipients=[
            "WorkSight AI Engine (automated classification only)",
            "Security team (only on policy violation alert)",
        ],
        third_country_transfers=[],
        retention_period="7 days snapshots, 30 days window titles",
        security_measures=[
            "Blur applied at capture (irreversible)",
            "AES-256-GCM encryption at rest",
            "TLS 1.3 in transit",
            "Strict RBAC -- security team only",
            "No bulk export capability",
            "Audit logging with tamper detection",
        ],
        dpia_required=True,
        dpia_reference="DPIA-WS-2026-001",
        automated_decision_making=True,
        profiling=True,
    )
    activities.append(pa_004)

    # PA-005: AI Productivity Scoring
    pa_005 = ProcessingActivity(
        activity_id="PA-005",
        name="AI Productivity Scoring",
        purpose="Generate team-level productivity scores using ML models "
                "trained on aggregated behavioral data from PA-001 through "
                "PA-004 to identify workflow optimization opportunities",
        controller="Helios Analytics Corp, Friedrichstrasse 42, "
                   "10117 Berlin, Germany",
        dpo_contact="Dr. Elke Krause, dpo@helios-analytics.example.com",
        legal_basis=LegalBasis.LEGITIMATE_INTEREST,
        legitimate_interest_assessment=(
            "CRITICAL RISK: ML-based scoring of employees. "
            "Individual scores PROHIBITED -- team-level only. "
            "No automated decisions affecting employment. "
            "Human review required for any management action. "
            "Bias audit conducted quarterly. "
            "Works Council has RIGHT TO OBJECT at any time."
        ),
        data_subjects=["Employees (EU)"],
        data_elements=[
            DataElement(
                name="Team productivity score",
                description="ML-generated 0-100 score per team per week",
                category=DataCategory.STANDARD,
                sensitivity=SensitivityLevel.HIGH,
                retention_days=180,
                legal_basis=LegalBasis.LEGITIMATE_INTEREST,
                pseudonymized=True,
                encrypted=True,
            ),
            DataElement(
                name="ML model features (aggregated)",
                description="Aggregated input features used for scoring -- "
                            "no individual-level data retained",
                category=DataCategory.STANDARD,
                sensitivity=SensitivityLevel.MEDIUM,
                retention_days=90,
                legal_basis=LegalBasis.LEGITIMATE_INTEREST,
                pseudonymized=True,
                encrypted=True,
            ),
            DataElement(
                name="Model bias audit results",
                description="Quarterly bias assessment across protected "
                            "characteristics (gender, age, nationality)",
                category=DataCategory.SPECIAL_ART9,
                sensitivity=SensitivityLevel.CRITICAL,
                retention_days=730,
                legal_basis=LegalBasis.LEGAL_OBLIGATION,
                special_category_condition="Article 9(2)(b) -- employment "
                                           "law obligation (bias monitoring)",
                pseudonymized=True,
                encrypted=True,
            ),
        ],
        recipients=[
            "Team managers (scores only, no underlying data)",
            "HR Department (trend analysis)",
            "Works Council (quarterly summary)",
        ],
        third_country_transfers=[],
        retention_period="90-730 days depending on element",
        security_measures=[
            "Model explainability (SHAP values)",
            "Quarterly bias audits",
            "No individual scoring",
            "Human-in-the-loop for any action",
            "AES-256-GCM encryption",
            "Strict RBAC",
        ],
        dpia_required=True,
        dpia_reference="DPIA-WS-2026-001",
        automated_decision_making=True,
        profiling=True,
    )
    activities.append(pa_005)

    return activities


def generate_register_report(activities: List[ProcessingActivity]) -> str:
    """Generate a human-readable processing activity report."""
    lines = []
    lines.append("=" * 72)
    lines.append("PROCESSING ACTIVITY REGISTER -- WorkSight AI")
    lines.append(f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
    lines.append(f"Controller: Helios Analytics Corp")
    lines.append(f"DPO: Dr. Elke Krause (dpo@helios-analytics.example.com)")
    lines.append(f"Total Activities: {len(activities)}")
    lines.append("=" * 72)

    for pa in activities:
        lines.append("")
        lines.append("-" * 72)
        lines.append(f"[{pa.activity_id}] {pa.name}")
        lines.append("-" * 72)
        lines.append(f"  Purpose:      {pa.purpose}")
        lines.append(f"  Legal Basis:  {pa.legal_basis.value}")
        if pa.legitimate_interest_assessment:
            lines.append(f"  LIA:          {pa.legitimate_interest_assessment}")
        lines.append(f"  Subjects:     {', '.join(pa.data_subjects)}")
        lines.append(f"  Retention:    {pa.retention_period}")
        lines.append(f"  DPIA Req:     {'YES' if pa.dpia_required else 'No'}")
        lines.append(f"  Profiling:    {'YES' if pa.profiling else 'No'}")
        lines.append(f"  Auto-Decision:{' YES' if pa.automated_decision_making else ' No'}")
        lines.append(f"  Transfers:    {', '.join(pa.third_country_transfers) or 'None'}")
        lines.append(f"  Data Elements:")
        for de in pa.data_elements:
            lines.append(f"    - {de.name}")
            lines.append(f"      Category:    {de.category.value}")
            lines.append(f"      Sensitivity: {de.sensitivity.value}")
            lines.append(f"      Retention:   {de.retention_days} days")
            lines.append(f"      Pseudonym:   {'Yes' if de.pseudonymized else 'No'}")
            lines.append(f"      Encrypted:   {'Yes' if de.encrypted else 'No'}")
            if de.special_category_condition:
                lines.append(f"      Art 9 Cond:  {de.special_category_condition}")
        lines.append(f"  Recipients:")
        for r in pa.recipients:
            lines.append(f"    - {r}")
        lines.append(f"  Security Measures:")
        for sm in pa.security_measures:
            lines.append(f"    - {sm}")

    lines.append("")
    lines.append("=" * 72)
    lines.append("END OF REGISTER")
    lines.append("=" * 72)
    return "\n".join(lines)


if __name__ == "__main__":
    activities = build_worksight_register()
    report = generate_register_report(activities)
    print(report)

    # Export as JSON for downstream tools
    json_export = json.dumps(
        [
            {
                "activity_id": a.activity_id,
                "name": a.name,
                "purpose": a.purpose,
                "legal_basis": a.legal_basis.value,
                "data_subjects": a.data_subjects,
                "dpia_required": a.dpia_required,
                "profiling": a.profiling,
                "automated_decision_making": a.automated_decision_making,
                "data_elements": [
                    {
                        "name": d.name,
                        "category": d.category.value,
                        "sensitivity": d.sensitivity.value,
                        "retention_days": d.retention_days,
                        "pseudonymized": d.pseudonymized,
                        "encrypted": d.encrypted,
                    }
                    for d in a.data_elements
                ],
            }
            for a in activities
        ],
        indent=2,
    )
    with open("processing_activities.json", "w") as f:
        f.write(json_export)
    print(f"\nJSON export written to processing_activities.json")

Expected output (truncated):

========================================================================
PROCESSING ACTIVITY REGISTER -- WorkSight AI
Generated: 2026-04-13 10:30:00
Controller: Helios Analytics Corp
DPO: Dr. Elke Krause (dpo@helios-analytics.example.com)
Total Activities: 5
========================================================================

------------------------------------------------------------------------
[PA-001] Keystroke Pattern Analysis
------------------------------------------------------------------------
  Purpose:      Measure typing speed and application switching frequency ...
  Legal Basis:  6(1)(f) - Legitimate interest
  LIA:          Legitimate interest: improving workforce productivity...
  Subjects:     Employees (EU), Contractors (EU)
  Retention:    90 days from collection, then irreversibly aggregated...
  DPIA Req:     YES
  Profiling:    YES
  ...

Step 1.2: Build the Data Flow Diagram

Map the complete data flow for WorkSight AI using Mermaid. This diagram shows how data moves from collection points through processing to storage and output.

Data Flow Diagram -- WorkSight AI System

flowchart TD
    subgraph Collection["Data Collection Layer"]
        KS["Keystroke Agent<br/>10.60.2.x endpoints"]
        EM["Email Gateway<br/>mail-gw.internal.example.com<br/>10.60.1.130"]
        BA["Badge Readers<br/>badge-ctrl.internal.example.com<br/>10.60.1.120"]
        SC["Screen Capture Agent<br/>10.60.2.x endpoints"]
    end

    subgraph Processing["Processing Layer (10.60.1.110)"]
        PS["Pseudonymization<br/>Engine"]
        AG["Aggregation<br/>Service"]
        ML["ML Scoring<br/>Engine"]
        BL["Blur Engine<br/>(irreversible)"]
    end

    subgraph Storage["Storage Layer"]
        RAW["Raw Data Store<br/>AES-256-GCM<br/>Retention: 7-90 days"]
        AGG["Aggregated Store<br/>Team-level only<br/>Retention: 180 days"]
        AUDIT["Audit Log Store<br/>Tamper-evident<br/>Retention: 730 days"]
    end

    subgraph Output["Output Layer"]
        TR["Team Reports<br/>(Managers)"]
        HR["HR Trend Reports<br/>(Anonymized)"]
        WC["Works Council Reports<br/>(Quarterly)"]
        SEC["Security Alerts<br/>(Policy violations)"]
    end

    subgraph Controls["Privacy Controls"]
        ENC["Encryption<br/>AES-256-GCM"]
        RBAC["RBAC<br/>Least privilege"]
        RET["Retention<br/>Auto-delete"]
        LOG["Audit Logging<br/>Tamper-evident"]
    end

    KS -->|"TLS 1.3"| PS
    EM -->|"TLS 1.3<br/>Metadata only"| PS
    BA -->|"TLS 1.3"| PS
    SC -->|"TLS 1.3"| BL
    BL -->|"Blurred images"| PS
    PS -->|"Pseudonymized"| RAW
    PS -->|"Pseudonymized"| AG
    AG -->|"Team aggregates"| ML
    ML -->|"Scores"| AGG
    RAW -->|"Auto-purge"| RET
    AGG --> TR
    AGG --> HR
    AGG --> WC
    RAW -->|"Alert trigger"| SEC

    Controls -.->|"Applied to all flows"| Processing
    Controls -.->|"Applied to all stores"| Storage

For each processing activity, evaluate the applicable legal basis and document the analysis:

"""
Legal Basis Analysis for WorkSight AI Processing Activities.
Evaluates GDPR Article 6(1) conditions for each activity.
"""

from dataclasses import dataclass
from typing import List, Optional


@dataclass
class LegalBasisAnalysis:
    """Structured legal basis evaluation."""
    activity_id: str
    activity_name: str
    proposed_basis: str
    lia_purpose: Optional[str] = None
    lia_necessity: Optional[str] = None
    lia_balancing: Optional[str] = None
    lia_safeguards: Optional[str] = None
    lia_conclusion: Optional[str] = None
    consent_feasibility: Optional[str] = None
    alternative_bases_considered: List[str] = None
    dpo_recommendation: str = ""
    risk_level: str = ""


def analyze_legal_bases() -> List[LegalBasisAnalysis]:
    """Perform legal basis analysis for all WorkSight activities."""

    analyses = []

    # PA-001: Keystroke Pattern Analysis
    analyses.append(LegalBasisAnalysis(
        activity_id="PA-001",
        activity_name="Keystroke Pattern Analysis",
        proposed_basis="Article 6(1)(f) -- Legitimate interest",
        lia_purpose=(
            "Helios has a legitimate interest in understanding team "
            "productivity patterns to optimize workflows and resource "
            "allocation. This is a genuine business need supported by "
            "the organization's operational objectives."
        ),
        lia_necessity=(
            "Keystroke timing patterns (NOT content) are necessary to "
            "measure active work time vs idle time at team level. "
            "Less invasive alternatives (self-reporting, project tracking) "
            "were evaluated but produce unreliable data."
        ),
        lia_balancing=(
            "AGAINST: Employees have reasonable expectation of privacy "
            "at workstations. Keystroke monitoring is inherently invasive. "
            "FOR: Only timing patterns collected (no content). Data "
            "pseudonymized immediately. Only team-level aggregates used. "
            "No individual performance decisions based on this data. "
            "Works Council consulted and conditionally approved."
        ),
        lia_safeguards=(
            "1. Pseudonymization at collection point. "
            "2. Aggregation to team level before any reporting. "
            "3. 90-day retention with automatic deletion. "
            "4. Works Council oversight with veto power. "
            "5. Employee right to object (Article 21). "
            "6. Quarterly proportionality reviews."
        ),
        lia_conclusion=(
            "CONDITIONALLY ACCEPTABLE. Legitimate interest applies IF "
            "all safeguards are implemented and maintained. The balancing "
            "test favors the controller only because of the extensive "
            "safeguards in place."
        ),
        consent_feasibility=(
            "Consent REJECTED as legal basis: employment relationship "
            "creates power imbalance (WP29 Guidelines, EDPB Opinion). "
            "Consent cannot be freely given in employer-employee context "
            "for monitoring activities."
        ),
        alternative_bases_considered=[
            "Consent -- rejected (power imbalance)",
            "Contract -- rejected (not necessary for employment contract)",
            "Legal obligation -- partially applicable for security only",
        ],
        dpo_recommendation=(
            "PROCEED WITH CAUTION. Implement all safeguards before "
            "going live. Schedule 90-day review with Works Council."
        ),
        risk_level="HIGH",
    ))

    # PA-004: Screen Activity Capture
    analyses.append(LegalBasisAnalysis(
        activity_id="PA-004",
        activity_name="Screen Activity Capture",
        proposed_basis="Article 6(1)(f) -- Legitimate interest",
        lia_purpose=(
            "Helios claims legitimate interest in detecting shadow IT "
            "and verifying compliance with acceptable use policies."
        ),
        lia_necessity=(
            "Screen captures -- even blurred -- represent the MOST "
            "invasive monitoring method proposed. Alternative methods "
            "exist: application allow-listing, network traffic analysis, "
            "endpoint detection tools. These are LESS invasive and "
            "achieve the same purpose."
        ),
        lia_balancing=(
            "AGAINST: Screen capture is extremely invasive. Even blurred, "
            "it captures the visual context of employee work. Window "
            "titles may reveal sensitive personal activities (medical "
            "searches, union activity, personal communications). "
            "FOR: Gaussian blur prevents content reading. Only titles "
            "extracted. Very short retention (7 days). "
            "CONCLUSION: Balance tips AGAINST the controller."
        ),
        lia_safeguards=(
            "Even with maximum safeguards, screen capture is "
            "disproportionate when less invasive alternatives exist."
        ),
        lia_conclusion=(
            "REJECTED. Screen capture fails the necessity test under "
            "Article 5(1)(c) data minimization. Less invasive "
            "alternatives (application inventory via endpoint agent, "
            "network traffic analysis) achieve the same purpose. "
            "DPO recommends REMOVING this processing activity."
        ),
        consent_feasibility=(
            "Consent rejected for same reasons as PA-001."
        ),
        alternative_bases_considered=[
            "Consent -- rejected (power imbalance)",
            "Contract -- rejected",
            "Legitimate interest -- FAILS balancing test",
        ],
        dpo_recommendation=(
            "REJECT THIS ACTIVITY. Replace with endpoint-based "
            "application inventory that collects installed software "
            "list only -- no screen capture of any kind."
        ),
        risk_level="CRITICAL -- RECOMMENDED REJECTION",
    ))

    # PA-005: AI Productivity Scoring
    analyses.append(LegalBasisAnalysis(
        activity_id="PA-005",
        activity_name="AI Productivity Scoring",
        proposed_basis="Article 6(1)(f) -- Legitimate interest",
        lia_purpose=(
            "Helios claims legitimate interest in using ML to identify "
            "workflow optimization opportunities at team level."
        ),
        lia_necessity=(
            "ML scoring provides pattern recognition beyond manual "
            "analysis. However, the GDPR imposes strict requirements "
            "on automated decision-making (Article 22). While this is "
            "team-level scoring with no individual decisions, the "
            "inputs are derived from individual behavior data."
        ),
        lia_balancing=(
            "AGAINST: ML scoring of employee behavior carries inherent "
            "risks of bias, function creep, and chilling effects. Even "
            "team-level scores may indirectly identify individuals in "
            "small teams (<5 members). "
            "FOR: Team-level only, human-in-the-loop, quarterly bias "
            "audits, Works Council oversight, no employment decisions."
        ),
        lia_safeguards=(
            "1. Minimum team size of 10 for scoring (prevents re-identification). "
            "2. SHAP values for model explainability. "
            "3. Quarterly bias audits across protected characteristics. "
            "4. Human review required before any management action. "
            "5. Works Council right to suspend scoring. "
            "6. Annual external audit of ML model."
        ),
        lia_conclusion=(
            "CONDITIONALLY ACCEPTABLE with minimum team size of 10 "
            "and all safeguards implemented. Must implement Article 22 "
            "safeguards even though team-level scoring may not trigger "
            "Article 22 directly -- precautionary approach."
        ),
        consent_feasibility="Consent rejected (power imbalance).",
        alternative_bases_considered=[
            "Consent -- rejected (power imbalance)",
            "Contract -- rejected",
        ],
        dpo_recommendation=(
            "PROCEED WITH STRICT CONDITIONS. Minimum team size 10. "
            "No individual scores under any circumstances. "
            "External bias audit within 6 months of launch."
        ),
        risk_level="HIGH",
    ))

    return analyses


def print_legal_analysis(analyses: List[LegalBasisAnalysis]) -> None:
    """Print formatted legal basis analysis."""
    for a in analyses:
        print(f"\n{'=' * 60}")
        print(f"LEGAL BASIS ANALYSIS: {a.activity_id} -- {a.activity_name}")
        print(f"{'=' * 60}")
        print(f"Proposed Basis: {a.proposed_basis}")
        print(f"Risk Level:     {a.risk_level}")
        print(f"\nLIA Purpose:    {a.lia_purpose}")
        print(f"\nLIA Necessity:  {a.lia_necessity}")
        print(f"\nLIA Balancing:  {a.lia_balancing}")
        print(f"\nLIA Safeguards: {a.lia_safeguards}")
        print(f"\nLIA Conclusion: {a.lia_conclusion}")
        print(f"\nConsent:        {a.consent_feasibility}")
        print(f"\nDPO Recomm.:    {a.dpo_recommendation}")
        print(f"{'=' * 60}")


if __name__ == "__main__":
    analyses = analyze_legal_bases()
    print_legal_analysis(analyses)

DPO Recommendation: Reject Screen Capture

The legal basis analysis reveals that PA-004 (Screen Activity Capture) fails the necessity and proportionality tests under GDPR Article 5(1)(c). The DPO recommends removing this processing activity entirely and replacing it with a less invasive endpoint-based application inventory. This is a realistic outcome -- DPIAs frequently result in the rejection or modification of proposed processing activities.

Step 1.4: Document the Processing Context

Create a structured context document that captures the organizational and regulatory environment:

Context Element Details
Controller Helios Analytics Corp, Friedrichstrasse 42, 10117 Berlin, Germany
DPO Dr. Elke Krause, dpo@helios-analytics.example.com
Lead Supervisory Authority Berliner Beauftragte fuer Datenschutz und Informationsfreiheit (Berlin DPA)
Applicable Law GDPR, BDSG (German Federal Data Protection Act), BetrVG (Works Constitution Act)
Data Subjects ~2,400 employees across DE, FR, NL; contractors; visitors
Processing Start Date Not before DPIA approval
Prior Consultation Required? Potentially -- if residual risk remains high after controls (Article 36)
Works Council Involvement Mandatory under BetrVG Section 87(1)(6) -- co-determination right

Phase 2: Data Inventory & Classification

Step 2.1: Build the Data Inventory

Create a comprehensive inventory of all personal data elements across all processing activities:

"""
Data Inventory & Classification Engine for WorkSight AI DPIA.
Identifies, classifies, and maps all personal data elements.
All data is 100% synthetic for educational purposes.
"""

import json
from dataclasses import dataclass, field
from typing import List, Dict, Optional, Tuple
from enum import Enum
from datetime import datetime


class GDPRCategory(Enum):
    """GDPR data categories."""
    STANDARD = "Standard personal data (Art. 6)"
    SPECIAL_HEALTH = "Health data (Art. 9(1))"
    SPECIAL_BIOMETRIC = "Biometric data (Art. 9(1))"
    SPECIAL_POLITICAL = "Political opinions (Art. 9(1))"
    SPECIAL_UNION = "Trade union membership (Art. 9(1))"
    SPECIAL_ETHNIC = "Racial/ethnic origin (Art. 9(1))"
    SPECIAL_RELIGIOUS = "Religious beliefs (Art. 9(1))"
    SPECIAL_SEXUAL = "Sexual orientation (Art. 9(1))"
    SPECIAL_GENETIC = "Genetic data (Art. 9(1))"
    CRIMINAL = "Criminal conviction data (Art. 10)"
    CHILDREN = "Children's data (Art. 8)"


class DataLifecyclePhase(Enum):
    """Data lifecycle phases."""
    COLLECTION = "Collection"
    TRANSMISSION = "Transmission"
    PROCESSING = "Processing"
    STORAGE = "Storage"
    USAGE = "Usage/Access"
    SHARING = "Sharing/Disclosure"
    ARCHIVAL = "Archival"
    DELETION = "Deletion"


class RiskIndicator(Enum):
    """Privacy risk indicators for data elements."""
    HIGH_VOLUME = "High volume (>1000 records/day)"
    SENSITIVE_CONTEXT = "Sensitive context (employment)"
    BEHAVIORAL = "Behavioral tracking"
    PROFILING = "Used for profiling"
    AUTOMATED = "Automated processing"
    VULNERABLE_SUBJECTS = "Vulnerable data subjects"
    CROSS_REFERENCE = "Cross-referenceable"
    BIOMETRIC_ADJACENT = "Biometric-adjacent data"


@dataclass
class DataInventoryItem:
    """Single data element in the inventory."""
    element_id: str
    name: str
    description: str
    source_system: str
    source_ip: str
    gdpr_category: GDPRCategory
    sensitivity_score: int  # 1-10
    volume_per_day: int
    retention_days: int
    lifecycle_phases: List[DataLifecyclePhase]
    risk_indicators: List[RiskIndicator]
    pseudonymized: bool
    encrypted_at_rest: bool
    encrypted_in_transit: bool
    access_roles: List[str]
    deletion_method: str
    legal_basis: str
    cross_border_transfer: bool = False
    third_party_access: bool = False
    automated_decision: bool = False
    notes: str = ""


def build_data_inventory() -> List[DataInventoryItem]:
    """Build complete data inventory for WorkSight AI."""

    inventory = []

    # Keystroke timing data
    inventory.append(DataInventoryItem(
        element_id="DI-001",
        name="Keystroke timing intervals",
        description="Time between successive keystrokes in milliseconds. "
                    "Does NOT capture actual key values or text content. "
                    "Used for typing pattern analysis only.",
        source_system="WorkSight Endpoint Agent",
        source_ip="10.60.2.0/24 (employee workstations)",
        gdpr_category=GDPRCategory.STANDARD,
        sensitivity_score=7,
        volume_per_day=2400000,  # ~1000 per employee per day
        retention_days=90,
        lifecycle_phases=[
            DataLifecyclePhase.COLLECTION,
            DataLifecyclePhase.TRANSMISSION,
            DataLifecyclePhase.PROCESSING,
            DataLifecyclePhase.STORAGE,
            DataLifecyclePhase.DELETION,
        ],
        risk_indicators=[
            RiskIndicator.HIGH_VOLUME,
            RiskIndicator.BEHAVIORAL,
            RiskIndicator.BIOMETRIC_ADJACENT,
            RiskIndicator.SENSITIVE_CONTEXT,
        ],
        pseudonymized=True,
        encrypted_at_rest=True,
        encrypted_in_transit=True,
        access_roles=["privacy-engineer", "ml-engineer"],
        deletion_method="Cryptographic erasure (key destruction)",
        legal_basis="Article 6(1)(f) -- Legitimate interest",
        notes="CRITICAL: Keystroke TIMING may constitute biometric data "
              "under GDPR Art 9(1) if used for identification. Current "
              "use (productivity patterns) is NOT identification, but "
              "this must be continuously monitored for function creep.",
    ))

    # Application usage data
    inventory.append(DataInventoryItem(
        element_id="DI-002",
        name="Application focus duration",
        description="Application name and time spent with that application "
                    "in foreground focus, measured in seconds.",
        source_system="WorkSight Endpoint Agent",
        source_ip="10.60.2.0/24 (employee workstations)",
        gdpr_category=GDPRCategory.STANDARD,
        sensitivity_score=6,
        volume_per_day=48000,  # ~20 per employee per day
        retention_days=90,
        lifecycle_phases=[
            DataLifecyclePhase.COLLECTION,
            DataLifecyclePhase.TRANSMISSION,
            DataLifecyclePhase.PROCESSING,
            DataLifecyclePhase.STORAGE,
            DataLifecyclePhase.USAGE,
            DataLifecyclePhase.DELETION,
        ],
        risk_indicators=[
            RiskIndicator.BEHAVIORAL,
            RiskIndicator.PROFILING,
            RiskIndicator.SENSITIVE_CONTEXT,
        ],
        pseudonymized=True,
        encrypted_at_rest=True,
        encrypted_in_transit=True,
        access_roles=["privacy-engineer", "ml-engineer", "team-manager"],
        deletion_method="Secure overwrite (3-pass)",
        legal_basis="Article 6(1)(f) -- Legitimate interest",
        notes="Application names may reveal sensitive information "
              "(e.g., health apps, union communication tools). "
              "Implement category-based filtering to exclude sensitive "
              "application categories.",
    ))

    # Email metadata
    inventory.append(DataInventoryItem(
        element_id="DI-003",
        name="Email metadata",
        description="Email send/receive timestamps, sender/recipient "
                    "pseudonymized IDs, message size. NO email content, "
                    "subject lines, or attachments.",
        source_system="Email Gateway",
        source_ip="10.60.1.130 (mail-gw.internal.example.com)",
        gdpr_category=GDPRCategory.STANDARD,
        sensitivity_score=5,
        volume_per_day=72000,  # ~30 per employee per day
        retention_days=60,
        lifecycle_phases=[
            DataLifecyclePhase.COLLECTION,
            DataLifecyclePhase.TRANSMISSION,
            DataLifecyclePhase.PROCESSING,
            DataLifecyclePhase.STORAGE,
            DataLifecyclePhase.USAGE,
            DataLifecyclePhase.DELETION,
        ],
        risk_indicators=[
            RiskIndicator.HIGH_VOLUME,
            RiskIndicator.BEHAVIORAL,
            RiskIndicator.CROSS_REFERENCE,
        ],
        pseudonymized=True,
        encrypted_at_rest=True,
        encrypted_in_transit=True,
        access_roles=["privacy-engineer"],
        deletion_method="Cryptographic erasure",
        legal_basis="Article 6(1)(f) -- Legitimate interest",
    ))

    # Badge access data
    inventory.append(DataInventoryItem(
        element_id="DI-004",
        name="Badge swipe events",
        description="Employee badge ID, reader location (building/floor), "
                    "timestamp, access granted/denied status.",
        source_system="Badge Access Controller",
        source_ip="10.60.1.120 (badge-ctrl.internal.example.com)",
        gdpr_category=GDPRCategory.STANDARD,
        sensitivity_score=5,
        volume_per_day=12000,  # ~5 per employee per day
        retention_days=365,
        lifecycle_phases=[
            DataLifecyclePhase.COLLECTION,
            DataLifecyclePhase.TRANSMISSION,
            DataLifecyclePhase.PROCESSING,
            DataLifecyclePhase.STORAGE,
            DataLifecyclePhase.USAGE,
            DataLifecyclePhase.SHARING,
            DataLifecyclePhase.DELETION,
        ],
        risk_indicators=[
            RiskIndicator.BEHAVIORAL,
            RiskIndicator.CROSS_REFERENCE,
        ],
        pseudonymized=False,  # badge ID is direct identifier
        encrypted_at_rest=True,
        encrypted_in_transit=True,
        access_roles=[
            "facilities-manager", "security-team", "privacy-engineer"
        ],
        deletion_method="Secure overwrite after retention expiry",
        legal_basis="Article 6(1)(f) -- Legitimate interest + "
                    "Article 6(1)(c) -- Legal obligation (safety)",
    ))

    # Screen capture data (flagged for rejection)
    inventory.append(DataInventoryItem(
        element_id="DI-005",
        name="Blurred screen snapshots",
        description="Screen captures with Gaussian blur (sigma=15) "
                    "applied at capture time. Content unreadable. "
                    "Captured every 15 minutes during active sessions.",
        source_system="WorkSight Screen Agent",
        source_ip="10.60.2.0/24 (employee workstations)",
        gdpr_category=GDPRCategory.STANDARD,
        sensitivity_score=9,
        volume_per_day=38400,  # ~16 per employee per 8-hour day
        retention_days=7,
        lifecycle_phases=[
            DataLifecyclePhase.COLLECTION,
            DataLifecyclePhase.TRANSMISSION,
            DataLifecyclePhase.PROCESSING,
            DataLifecyclePhase.STORAGE,
            DataLifecyclePhase.DELETION,
        ],
        risk_indicators=[
            RiskIndicator.HIGH_VOLUME,
            RiskIndicator.BEHAVIORAL,
            RiskIndicator.SENSITIVE_CONTEXT,
            RiskIndicator.AUTOMATED,
        ],
        pseudonymized=True,
        encrypted_at_rest=True,
        encrypted_in_transit=True,
        access_roles=["security-team"],
        deletion_method="Secure overwrite (7-pass DOD standard)",
        legal_basis="Article 6(1)(f) -- Legitimate interest (REJECTED BY DPO)",
        notes="DPO RECOMMENDATION: REJECT. Disproportionate. Replace with "
              "endpoint application inventory.",
    ))

    # Window title data (associated with screen capture)
    inventory.append(DataInventoryItem(
        element_id="DI-006",
        name="Active window titles",
        description="Text content of the active window title bar, "
                    "captured alongside screen snapshots.",
        source_system="WorkSight Screen Agent",
        source_ip="10.60.2.0/24 (employee workstations)",
        gdpr_category=GDPRCategory.STANDARD,
        sensitivity_score=8,
        volume_per_day=38400,
        retention_days=30,
        lifecycle_phases=[
            DataLifecyclePhase.COLLECTION,
            DataLifecyclePhase.TRANSMISSION,
            DataLifecyclePhase.PROCESSING,
            DataLifecyclePhase.STORAGE,
            DataLifecyclePhase.DELETION,
        ],
        risk_indicators=[
            RiskIndicator.BEHAVIORAL,
            RiskIndicator.SENSITIVE_CONTEXT,
            RiskIndicator.CROSS_REFERENCE,
        ],
        pseudonymized=True,
        encrypted_at_rest=True,
        encrypted_in_transit=True,
        access_roles=["security-team"],
        deletion_method="Cryptographic erasure",
        legal_basis="Article 6(1)(f) -- Legitimate interest (REJECTED BY DPO)",
        notes="Window titles may reveal: medical conditions (health app "
              "titles), union membership (union portal titles), political "
              "views (news site titles), personal relationships (messaging "
              "app titles). This is SPECIAL CATEGORY data by inference.",
    ))

    # ML productivity scores
    inventory.append(DataInventoryItem(
        element_id="DI-007",
        name="Team productivity scores",
        description="ML-generated productivity score (0-100) calculated "
                    "per team per week. No individual scores.",
        source_system="WorkSight AI Engine",
        source_ip="10.60.1.110 (worksight-ai.internal.example.com)",
        gdpr_category=GDPRCategory.STANDARD,
        sensitivity_score=6,
        volume_per_day=34,  # ~34 teams, once per week
        retention_days=180,
        lifecycle_phases=[
            DataLifecyclePhase.PROCESSING,
            DataLifecyclePhase.STORAGE,
            DataLifecyclePhase.USAGE,
            DataLifecyclePhase.SHARING,
            DataLifecyclePhase.DELETION,
        ],
        risk_indicators=[
            RiskIndicator.PROFILING,
            RiskIndicator.AUTOMATED,
        ],
        pseudonymized=True,
        encrypted_at_rest=True,
        encrypted_in_transit=True,
        access_roles=["team-manager", "hr-director", "works-council"],
        deletion_method="Secure overwrite",
        legal_basis="Article 6(1)(f) -- Legitimate interest",
        notes="Minimum team size of 10 required to prevent "
              "re-identification of individuals.",
    ))

    # Bias audit data
    inventory.append(DataInventoryItem(
        element_id="DI-008",
        name="ML bias audit results",
        description="Quarterly bias assessment results measuring model "
                    "performance variance across protected characteristics.",
        source_system="WorkSight AI Engine",
        source_ip="10.60.1.110 (worksight-ai.internal.example.com)",
        gdpr_category=GDPRCategory.SPECIAL_ETHNIC,
        sensitivity_score=10,
        volume_per_day=1,  # Quarterly
        retention_days=730,
        lifecycle_phases=[
            DataLifecyclePhase.PROCESSING,
            DataLifecyclePhase.STORAGE,
            DataLifecyclePhase.USAGE,
            DataLifecyclePhase.ARCHIVAL,
        ],
        risk_indicators=[
            RiskIndicator.SENSITIVE_CONTEXT,
            RiskIndicator.VULNERABLE_SUBJECTS,
        ],
        pseudonymized=True,
        encrypted_at_rest=True,
        encrypted_in_transit=True,
        access_roles=["dpo", "external-auditor"],
        deletion_method="Cryptographic erasure with audit trail",
        legal_basis="Article 9(2)(b) -- Employment law obligation",
        notes="Contains aggregated statistics across protected "
              "characteristics. Individual-level data NEVER stored. "
              "Access restricted to DPO and external auditor only.",
    ))

    return inventory


def classify_inventory(inventory: List[DataInventoryItem]) -> Dict:
    """Classify inventory by sensitivity and category."""
    classification = {
        "total_elements": len(inventory),
        "by_category": {},
        "by_sensitivity": {
            "critical": [],
            "high": [],
            "medium": [],
            "low": [],
        },
        "risk_summary": {},
        "rejected_elements": [],
        "special_category_elements": [],
    }

    for item in inventory:
        # Categorize
        cat = item.gdpr_category.value
        if cat not in classification["by_category"]:
            classification["by_category"][cat] = []
        classification["by_category"][cat].append(item.element_id)

        # Sensitivity classification
        if item.sensitivity_score >= 9:
            classification["by_sensitivity"]["critical"].append(
                item.element_id
            )
        elif item.sensitivity_score >= 7:
            classification["by_sensitivity"]["high"].append(item.element_id)
        elif item.sensitivity_score >= 4:
            classification["by_sensitivity"]["medium"].append(item.element_id)
        else:
            classification["by_sensitivity"]["low"].append(item.element_id)

        # Track risk indicators
        for ri in item.risk_indicators:
            ri_name = ri.value
            if ri_name not in classification["risk_summary"]:
                classification["risk_summary"][ri_name] = []
            classification["risk_summary"][ri_name].append(item.element_id)

        # Track rejected elements
        if "REJECTED" in item.legal_basis:
            classification["rejected_elements"].append(item.element_id)

        # Track special category
        if item.gdpr_category != GDPRCategory.STANDARD:
            classification["special_category_elements"].append(
                item.element_id
            )

    return classification


def print_classification_report(
    inventory: List[DataInventoryItem],
    classification: Dict,
) -> None:
    """Print classification report."""
    print("=" * 60)
    print("DATA INVENTORY CLASSIFICATION REPORT")
    print(f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
    print(f"Total Data Elements: {classification['total_elements']}")
    print("=" * 60)

    print("\n--- Sensitivity Distribution ---")
    for level, items in classification["by_sensitivity"].items():
        print(f"  {level.upper():10s}: {len(items)} elements -- {items}")

    print("\n--- GDPR Category Distribution ---")
    for cat, items in classification["by_category"].items():
        print(f"  {cat}: {items}")

    print("\n--- Risk Indicator Coverage ---")
    for ri, items in classification["risk_summary"].items():
        print(f"  {ri}: {len(items)} elements")

    if classification["rejected_elements"]:
        print(f"\n--- REJECTED Elements ---")
        for eid in classification["rejected_elements"]:
            item = next(i for i in inventory if i.element_id == eid)
            print(f"  {eid}: {item.name}")
            print(f"    Reason: {item.notes}")

    if classification["special_category_elements"]:
        print(f"\n--- Special Category (Art. 9) Elements ---")
        for eid in classification["special_category_elements"]:
            item = next(i for i in inventory if i.element_id == eid)
            print(f"  {eid}: {item.name} -- {item.gdpr_category.value}")


if __name__ == "__main__":
    inventory = build_data_inventory()
    classification = classify_inventory(inventory)
    print_classification_report(inventory, classification)

Expected output:

============================================================
DATA INVENTORY CLASSIFICATION REPORT
Generated: 2026-04-13 10:45:00
Total Data Elements: 8
============================================================

--- Sensitivity Distribution ---
  CRITICAL  : 2 elements -- ['DI-005', 'DI-008']
  HIGH      : 2 elements -- ['DI-001', 'DI-006']
  MEDIUM    : 4 elements -- ['DI-002', 'DI-003', 'DI-004', 'DI-007']
  LOW       : 0 elements -- []

--- GDPR Category Distribution ---
  Standard personal data (Art. 6): ['DI-001', 'DI-002', ...]
  Racial/ethnic origin (Art. 9(1)): ['DI-008']

--- Risk Indicator Coverage ---
  High volume (>1000 records/day): 3 elements
  Behavioral tracking: 6 elements
  Used for profiling: 2 elements
  ...

--- REJECTED Elements ---
  DI-005: Blurred screen snapshots
    Reason: DPO RECOMMENDATION: REJECT. Disproportionate...
  DI-006: Active window titles
    Reason: Window titles may reveal: medical conditions...

--- Special Category (Art. 9) Elements ---
  DI-008: ML bias audit results -- Racial/ethnic origin (Art. 9(1))

Step 2.2: Automated PII Discovery

Run an automated PII discovery scan against synthetic datasets to identify any personal data not captured in the manual inventory:

"""
Automated PII Discovery Scanner for DPIA.
Scans synthetic datasets for personally identifiable information.
All data is 100% synthetic -- no real personal data.
"""

import re
import json
from dataclasses import dataclass
from typing import List, Dict, Tuple, Optional
from enum import Enum


class PIIType(Enum):
    """Types of personally identifiable information."""
    EMAIL = "Email Address"
    PHONE = "Phone Number"
    NAME = "Person Name"
    ADDRESS = "Physical Address"
    SSN = "Social Security / National ID"
    CREDIT_CARD = "Credit Card Number"
    IP_ADDRESS = "IP Address"
    DATE_OF_BIRTH = "Date of Birth"
    EMPLOYEE_ID = "Employee Identifier"
    IBAN = "Bank Account (IBAN)"
    PASSPORT = "Passport Number"
    HEALTH_DATA = "Health Information"
    BIOMETRIC = "Biometric Data"
    LOCATION = "Location Data"
    DEVICE_ID = "Device Identifier"


@dataclass
class PIIFinding:
    """A single PII discovery finding."""
    finding_id: str
    pii_type: PIIType
    field_name: str
    source_file: str
    sample_value: str  # redacted/masked
    confidence: float  # 0.0 to 1.0
    in_inventory: bool
    remediation: str


# PII detection patterns (simplified for educational purposes)
PII_PATTERNS = {
    PIIType.EMAIL: re.compile(
        r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'
    ),
    PIIType.PHONE: re.compile(
        r'\b(?:\+\d{1,3}[-.\s]?)?\(?\d{3}\)?[-.\s]?\d{3}[-.\s]?\d{4}\b'
    ),
    PIIType.SSN: re.compile(
        r'\b\d{3}-\d{2}-\d{4}\b'
    ),
    PIIType.CREDIT_CARD: re.compile(
        r'\b(?:\d{4}[-\s]?){3}\d{4}\b'
    ),
    PIIType.IP_ADDRESS: re.compile(
        r'\b(?:\d{1,3}\.){3}\d{1,3}\b'
    ),
    PIIType.DATE_OF_BIRTH: re.compile(
        r'\b(?:19|20)\d{2}-(?:0[1-9]|1[0-2])-(?:0[1-9]|[12]\d|3[01])\b'
    ),
    PIIType.IBAN: re.compile(
        r'\b[A-Z]{2}\d{2}[A-Z0-9]{4}\d{7}([A-Z0-9]?){0,16}\b'
    ),
    PIIType.EMPLOYEE_ID: re.compile(
        r'\bEMP-\d{5,8}\b'
    ),
    PIIType.DEVICE_ID: re.compile(
        r'\b[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-'
        r'[0-9a-fA-F]{4}-[0-9a-fA-F]{12}\b'
    ),
}

# Synthetic test data (100% fictional)
SYNTHETIC_DATASETS = {
    "worksight_keystroke_log.csv": [
        "timestamp,employee_hash,interval_ms,app_name",
        "2026-04-13T10:00:00Z,a1b2c3d4e5f6,120,vscode",
        "2026-04-13T10:00:01Z,a1b2c3d4e5f6,95,vscode",
        "2026-04-13T10:05:00Z,f6e5d4c3b2a1,110,outlook",
    ],
    "worksight_email_metadata.csv": [
        "timestamp,sender_hash,recipient_hash,size_bytes",
        "2026-04-13T09:00:00Z,hash001,hash002,15234",
        "2026-04-13T09:05:00Z,hash002,hash001,8921",
        "# WARNING: developer left debug data below",
        "2026-04-13T09:10:00Z,jan.devries@helios-analytics.example.com,"
        "elke.krause@helios-analytics.example.com,22100",
    ],
    "worksight_badge_log.csv": [
        "timestamp,badge_id,reader_location,access_status",
        "2026-04-13T08:00:00Z,EMP-00142,BUILDING-A-FLOOR-3,GRANTED",
        "2026-04-13T08:01:00Z,EMP-00143,BUILDING-A-FLOOR-1,GRANTED",
        "2026-04-13T08:30:00Z,EMP-00142,BUILDING-A-FLOOR-5,DENIED",
    ],
    "worksight_hr_export.json": [
        '{"employee_id": "EMP-00142", "name": "Jan de Vries", '
        '"email": "jan.devries@helios-analytics.example.com", '
        '"department": "Engineering", "dob": "1985-03-15", '
        '"phone": "+49-30-555-0142", '
        '"address": "Musterstrasse 42, 10115 Berlin, Germany", '
        '"iban": "DE89370400440532013000"}',
    ],
    "worksight_debug_dump.log": [
        "2026-04-13T10:00:00Z [DEBUG] Processing employee EMP-00142",
        "2026-04-13T10:00:01Z [DEBUG] Source IP: 10.60.2.42",
        "2026-04-13T10:00:02Z [ERROR] Auth failed for testuser/REDACTED",
        "2026-04-13T10:00:03Z [DEBUG] SSN check: 123-45-6789 (SYNTHETIC)",
        "2026-04-13T10:00:04Z [DEBUG] Device: "
        "550e8400-e29b-41d4-a716-446655440000",
    ],
}


def scan_for_pii(datasets: Dict[str, List[str]]) -> List[PIIFinding]:
    """Scan synthetic datasets for PII."""
    findings = []
    finding_counter = 0

    # Known inventory items (already documented)
    known_fields = {
        "employee_hash", "sender_hash", "recipient_hash",
        "interval_ms", "app_name", "timestamp", "size_bytes",
        "reader_location", "access_status",
    }

    for filename, lines in datasets.items():
        full_text = "\n".join(lines)

        for pii_type, pattern in PII_PATTERNS.items():
            matches = pattern.findall(full_text)
            if matches:
                finding_counter += 1
                # Mask the sample value
                sample = matches[0]
                if len(sample) > 4:
                    masked = sample[:2] + "*" * (len(sample) - 4) + sample[-2:]
                else:
                    masked = "****"

                # Determine if this was in the inventory
                in_inv = False
                if pii_type == PIIType.EMPLOYEE_ID:
                    in_inv = True  # badge_id is documented
                elif pii_type == PIIType.IP_ADDRESS:
                    in_inv = True  # known infrastructure IPs

                # Determine remediation
                if pii_type in (PIIType.SSN, PIIType.CREDIT_CARD):
                    remediation = (
                        "CRITICAL: Remove immediately. This data should "
                        "never appear in monitoring logs. Investigate "
                        "how it entered the pipeline."
                    )
                elif pii_type == PIIType.EMAIL and "debug" not in filename:
                    remediation = (
                        "HIGH: Email addresses found in production data. "
                        "Replace with pseudonymized hashes. Update "
                        "collection pipeline to hash at source."
                    )
                elif pii_type == PIIType.EMAIL and "debug" in filename:
                    remediation = (
                        "HIGH: Email addresses in debug logs. Implement "
                        "PII scrubbing in logging framework. Add "
                        "pre-commit hook to scan for PII in logs."
                    )
                elif pii_type in (PIIType.DATE_OF_BIRTH, PIIType.IBAN):
                    remediation = (
                        "HIGH: Sensitive personal data in HR export. "
                        "Ensure encryption at rest and strict RBAC. "
                        "Remove from WorkSight data pipeline."
                    )
                else:
                    remediation = (
                        "MEDIUM: Review data element and ensure it is "
                        "documented in the data inventory."
                    )

                findings.append(PIIFinding(
                    finding_id=f"PII-{finding_counter:03d}",
                    pii_type=pii_type,
                    field_name=filename,
                    source_file=filename,
                    sample_value=masked,
                    confidence=0.85 if len(matches) > 1 else 0.70,
                    in_inventory=in_inv,
                    remediation=remediation,
                ))

    return findings


def print_pii_report(findings: List[PIIFinding]) -> None:
    """Print PII discovery report."""
    print("=" * 60)
    print("PII DISCOVERY SCAN REPORT")
    print(f"Datasets Scanned: {len(SYNTHETIC_DATASETS)}")
    print(f"Total Findings: {len(findings)}")
    print(f"Undocumented Findings: "
          f"{sum(1 for f in findings if not f.in_inventory)}")
    print("=" * 60)

    # Group by severity
    critical = [f for f in findings if "CRITICAL" in f.remediation]
    high = [f for f in findings if "HIGH" in f.remediation]
    medium = [f for f in findings
              if "CRITICAL" not in f.remediation
              and "HIGH" not in f.remediation]

    for severity, group in [
        ("CRITICAL", critical), ("HIGH", high), ("MEDIUM", medium)
    ]:
        if group:
            print(f"\n--- {severity} Findings ---")
            for f in group:
                print(f"\n  [{f.finding_id}] {f.pii_type.value}")
                print(f"    Source:      {f.source_file}")
                print(f"    Sample:      {f.sample_value}")
                print(f"    Confidence:  {f.confidence:.0%}")
                print(f"    In Inventory: {'Yes' if f.in_inventory else 'NO'}")
                print(f"    Remediation: {f.remediation}")


if __name__ == "__main__":
    findings = scan_for_pii(SYNTHETIC_DATASETS)
    print_pii_report(findings)

PII Discovery Findings

The automated scan reveals several critical findings:

  1. Unhashed email addresses in worksight_email_metadata.csv -- a developer left debug data with cleartext emails that bypassed the pseudonymization pipeline
  2. SSN-format data in debug logs -- synthetic test data leaked into log files
  3. Cleartext employee names, DOB, IBAN in the HR export -- this data should never enter the WorkSight pipeline

These findings demonstrate why automated PII scanning is essential before and during DPIA execution. Manual inventories miss data leaks and pipeline failures.

Step 2.3: Data Lifecycle Mapping

Map each data element through its complete lifecycle with a Mermaid diagram:

Data Lifecycle -- Keystroke Timing Data (DI-001)

stateDiagram-v2
    [*] --> Collection: Endpoint agent captures\ninter-key intervals
    Collection --> Transmission: TLS 1.3 encrypted\ntransfer to 10.60.1.110
    Transmission --> Pseudonymization: Employee ID replaced\nwith rotating SHA-256 hash
    Pseudonymization --> Processing: ML feature extraction\n(team-level aggregation)
    Processing --> EncryptedStorage: AES-256-GCM at rest\nRetention: 90 days
    EncryptedStorage --> TeamReporting: Aggregated metrics\nonly (no individual data)
    EncryptedStorage --> AutoDeletion: Cryptographic erasure\nafter 90 days
    TeamReporting --> [*]
    AutoDeletion --> [*]

    note right of Collection
        Risk: Raw timing data exists\nbefore pseudonymization\nfor ~100ms in memory
    end note

    note right of Pseudonymization
        Control: Key rotation monthly\nOld keys securely destroyed
    end note

    note right of AutoDeletion
        Verification: Deletion log\naudited quarterly
    end note

Phase 3: DPIA Execution

Step 3.1: Article 35 Threshold Assessment

Before conducting a full DPIA, verify that one is actually required by assessing against the Article 35 criteria and the EDPB guidelines:

"""
DPIA Threshold Assessment -- Article 35 Criteria Checker.
Determines whether a DPIA is mandatory for the processing activities.
"""

from dataclasses import dataclass
from typing import List, Dict


@dataclass
class ThresholdCriterion:
    """Individual DPIA threshold criterion."""
    criterion_id: str
    description: str
    source: str
    met: bool
    evidence: str


def assess_dpia_threshold() -> Dict:
    """Assess whether DPIA is required under Art 35 + EDPB criteria."""

    # EDPB Guidelines: if >= 2 criteria met, DPIA is mandatory
    criteria = []

    # 1. Evaluation/Scoring
    criteria.append(ThresholdCriterion(
        criterion_id="C1",
        description="Evaluation or scoring, including profiling and "
                    "predicting (EDPB WP248 rev.01, Criterion 1)",
        source="EDPB Guidelines on DPIAs",
        met=True,
        evidence="WorkSight AI generates productivity scores using ML "
                 "models. This constitutes profiling under Article 4(4) "
                 "and evaluation/scoring of employee performance.",
    ))

    # 2. Automated decision-making with legal/significant effects
    criteria.append(ThresholdCriterion(
        criterion_id="C2",
        description="Automated decision-making with legal or similarly "
                    "significant effect (EDPB Criterion 2)",
        source="EDPB Guidelines on DPIAs",
        met=False,
        evidence="Team-level scoring only. No individual automated "
                 "decisions. No employment consequences without human "
                 "review. However, this is BORDERLINE -- individual "
                 "data feeds the team model.",
    ))

    # 3. Systematic monitoring
    criteria.append(ThresholdCriterion(
        criterion_id="C3",
        description="Systematic monitoring of a publicly accessible area "
                    "or systematic observation (EDPB Criterion 3)",
        source="EDPB Guidelines on DPIAs",
        met=True,
        evidence="WorkSight AI systematically monitors employee behavior "
                 "through keystroke patterns, email metadata, badge "
                 "access, and screen captures. This is systematic "
                 "monitoring of employees in the workplace.",
    ))

    # 4. Sensitive data or highly personal
    criteria.append(ThresholdCriterion(
        criterion_id="C4",
        description="Sensitive data or data of a highly personal nature "
                    "(EDPB Criterion 4)",
        source="EDPB Guidelines on DPIAs",
        met=True,
        evidence="Keystroke timing data may qualify as biometric data "
                 "(Art 9(1)). Bias audit data involves protected "
                 "characteristics. Window titles may reveal health "
                 "data, political opinions, or union membership.",
    ))

    # 5. Large scale
    criteria.append(ThresholdCriterion(
        criterion_id="C5",
        description="Data processed on a large scale (EDPB Criterion 5)",
        source="EDPB Guidelines on DPIAs",
        met=True,
        evidence="2,400 employees monitored continuously. Keystroke data "
                 "alone generates ~2.4 million records per day. Total "
                 "data volume across all streams exceeds 2.5 million "
                 "records per day.",
    ))

    # 6. Matching or combining datasets
    criteria.append(ThresholdCriterion(
        criterion_id="C6",
        description="Matching or combining datasets (EDPB Criterion 6)",
        source="EDPB Guidelines on DPIAs",
        met=True,
        evidence="WorkSight AI combines data from 5 different sources "
                 "(keystroke, email, badge, screen, HR) to generate "
                 "productivity scores. This cross-referencing creates "
                 "comprehensive employee profiles.",
    ))

    # 7. Vulnerable data subjects
    criteria.append(ThresholdCriterion(
        criterion_id="C7",
        description="Data concerning vulnerable data subjects "
                    "(EDPB Criterion 7)",
        source="EDPB Guidelines on DPIAs",
        met=True,
        evidence="Employees are considered vulnerable data subjects "
                 "due to the power imbalance inherent in the employment "
                 "relationship (WP29/EDPB guidance). They cannot freely "
                 "refuse monitoring without risking employment.",
    ))

    # 8. Innovative use or new technology
    criteria.append(ThresholdCriterion(
        criterion_id="C8",
        description="Innovative use or applying new technological or "
                    "organisational solutions (EDPB Criterion 8)",
        source="EDPB Guidelines on DPIAs",
        met=True,
        evidence="AI/ML-based behavioral analysis for productivity "
                 "scoring is an innovative application combining "
                 "multiple data streams with machine learning.",
    ))

    # 9. Prevents data subjects from exercising a right
    criteria.append(ThresholdCriterion(
        criterion_id="C9",
        description="Processing that prevents data subjects from "
                    "exercising a right or using a service or contract "
                    "(EDPB Criterion 9)",
        source="EDPB Guidelines on DPIAs",
        met=False,
        evidence="Employees retain right to object (Art 21). Works "
                 "Council has veto power. Processing does not prevent "
                 "exercise of rights, though practical ability to "
                 "object may be limited by power imbalance.",
    ))

    # Assessment result
    criteria_met = sum(1 for c in criteria if c.met)
    dpia_required = criteria_met >= 2

    # Also check Art 35(3) specific triggers
    art35_3_triggers = {
        "35(3)(a) -- Systematic evaluation of personal aspects by "
        "automated processing, including profiling": True,
        "35(3)(b) -- Large-scale processing of special categories": True,
        "35(3)(c) -- Systematic monitoring of publicly accessible "
        "area on large scale": False,
    }

    result = {
        "criteria": criteria,
        "criteria_met": criteria_met,
        "total_criteria": len(criteria),
        "dpia_required": dpia_required,
        "art35_3_triggers": art35_3_triggers,
        "conclusion": (
            f"DPIA IS MANDATORY. {criteria_met} of {len(criteria)} EDPB "
            f"criteria met (threshold: 2). Additionally, Article 35(3)(a) "
            f"and 35(3)(b) are directly triggered. The Berlin DPA's "
            f"positive list (must-do DPIA list per Art 35(4)) includes "
            f"employee monitoring systems."
        ),
    }

    return result


def print_threshold_report(result: Dict) -> None:
    """Print threshold assessment report."""
    print("=" * 60)
    print("DPIA THRESHOLD ASSESSMENT")
    print("=" * 60)

    for c in result["criteria"]:
        status = "MET" if c.met else "NOT MET"
        print(f"\n  [{c.criterion_id}] [{status}] {c.description}")
        print(f"      Evidence: {c.evidence}")

    print(f"\n{'=' * 60}")
    print(f"Criteria Met: {result['criteria_met']} / "
          f"{result['total_criteria']}")
    print(f"DPIA Required: {'YES' if result['dpia_required'] else 'No'}")
    print(f"\nConclusion: {result['conclusion']}")
    print(f"{'=' * 60}")


if __name__ == "__main__":
    result = assess_dpia_threshold()
    print_threshold_report(result)

Expected output:

============================================================
DPIA THRESHOLD ASSESSMENT
============================================================

  [C1] [MET] Evaluation or scoring, including profiling ...
      Evidence: WorkSight AI generates productivity scores ...

  [C2] [NOT MET] Automated decision-making with legal ...
      Evidence: Team-level scoring only ...

  [C3] [MET] Systematic monitoring ...
      Evidence: WorkSight AI systematically monitors ...
  ...

============================================================
Criteria Met: 7 / 9
DPIA Required: YES

Conclusion: DPIA IS MANDATORY. 7 of 9 EDPB criteria met ...
============================================================

Step 3.2: Risk Assessment -- Likelihood x Impact Matrix

Score each identified risk using a structured likelihood-impact matrix:

"""
DPIA Risk Assessment Engine.
Scores privacy risks using likelihood x impact matrix.
"""

from dataclasses import dataclass, field
from typing import List, Dict, Optional, Tuple
from enum import IntEnum


class Likelihood(IntEnum):
    """Risk likelihood levels (1-5)."""
    RARE = 1        # < 5% probability in 12 months
    UNLIKELY = 2    # 5-20% probability
    POSSIBLE = 3    # 20-50% probability
    LIKELY = 4      # 50-80% probability
    ALMOST_CERTAIN = 5  # > 80% probability


class Impact(IntEnum):
    """Risk impact levels (1-5)."""
    NEGLIGIBLE = 1  # Minor inconvenience
    LIMITED = 2     # Significant inconvenience
    SIGNIFICANT = 3 # Serious consequences
    MAXIMUM = 4     # Irreversible consequences
    CATASTROPHIC = 5  # Threat to life/liberty


@dataclass
class PrivacyRisk:
    """Individual privacy risk assessment."""
    risk_id: str
    title: str
    description: str
    affected_data: List[str]
    affected_subjects: str
    threat_source: str
    likelihood: Likelihood
    impact: Impact
    risk_score: int = 0  # calculated
    risk_level: str = ""  # calculated
    existing_controls: List[str] = field(default_factory=list)
    residual_likelihood: Optional[Likelihood] = None
    residual_impact: Optional[Impact] = None
    residual_score: int = 0  # calculated
    residual_level: str = ""  # calculated
    additional_controls: List[str] = field(default_factory=list)
    risk_owner: str = ""
    review_date: str = ""

    def __post_init__(self):
        self.risk_score = self.likelihood * self.impact
        self.risk_level = self._classify_risk(self.risk_score)
        if self.residual_likelihood and self.residual_impact:
            self.residual_score = (
                self.residual_likelihood * self.residual_impact
            )
            self.residual_level = self._classify_risk(self.residual_score)

    @staticmethod
    def _classify_risk(score: int) -> str:
        if score >= 15:
            return "CRITICAL"
        elif score >= 10:
            return "HIGH"
        elif score >= 5:
            return "MEDIUM"
        else:
            return "LOW"


def assess_risks() -> List[PrivacyRisk]:
    """Assess all privacy risks for WorkSight AI."""

    risks = []

    # R-001: Unauthorized access to raw monitoring data
    risks.append(PrivacyRisk(
        risk_id="R-001",
        title="Unauthorized access to raw employee monitoring data",
        description=(
            "An attacker (external or insider) gains access to raw "
            "keystroke, email, or badge data before pseudonymization "
            "or aggregation, enabling individual employee surveillance."
        ),
        affected_data=["DI-001", "DI-002", "DI-003", "DI-004"],
        affected_subjects="All 2,400 employees",
        threat_source="External attacker, malicious insider, "
                      "compromised admin account",
        likelihood=Likelihood.POSSIBLE,
        impact=Impact.MAXIMUM,
        existing_controls=[
            "AES-256-GCM encryption at rest",
            "TLS 1.3 in transit",
            "RBAC with least privilege",
            "Network segmentation (VLAN 60)",
        ],
        residual_likelihood=Likelihood.UNLIKELY,
        residual_impact=Impact.MAXIMUM,
        additional_controls=[
            "Hardware security module for key management",
            "Privileged access management (PAM)",
            "Real-time anomaly detection on data access",
            "Break-glass procedure with dual approval",
        ],
        risk_owner="Marcus Brandt (CISO)",
        review_date="2026-07-13",
    ))

    # R-002: Function creep -- individual scoring
    risks.append(PrivacyRisk(
        risk_id="R-002",
        title="Function creep from team to individual scoring",
        description=(
            "Management pressure leads to the use of WorkSight data "
            "for individual employee performance assessment, violating "
            "the stated purpose limitation and Works Council agreement."
        ),
        affected_data=["DI-007", "DI-001", "DI-002"],
        affected_subjects="All 2,400 employees",
        threat_source="Management pressure, scope expansion, "
                      "inadequate access controls",
        likelihood=Likelihood.LIKELY,
        impact=Impact.SIGNIFICANT,
        existing_controls=[
            "Team-level aggregation enforced in code",
            "Works Council oversight",
            "Minimum team size of 10",
        ],
        residual_likelihood=Likelihood.POSSIBLE,
        residual_impact=Impact.SIGNIFICANT,
        additional_controls=[
            "Technical enforcement: queries returning <10 individuals blocked",
            "Quarterly audit of all data access patterns",
            "Automated alert if individual-level queries detected",
            "Annual privacy audit by external DPO",
            "Works Council access to audit logs",
        ],
        risk_owner="Dr. Elke Krause (DPO)",
        review_date="2026-07-13",
    ))

    # R-003: Bias in ML scoring
    risks.append(PrivacyRisk(
        risk_id="R-003",
        title="Discriminatory bias in ML productivity scoring",
        description=(
            "The ML model produces systematically different scores for "
            "teams with certain demographic compositions, leading to "
            "indirect discrimination based on protected characteristics "
            "(gender, age, nationality, disability)."
        ),
        affected_data=["DI-007", "DI-008"],
        affected_subjects="Employees in underrepresented groups",
        threat_source="Training data bias, feature selection bias, "
                      "proxy variables for protected characteristics",
        likelihood=Likelihood.LIKELY,
        impact=Impact.MAXIMUM,
        existing_controls=[
            "Quarterly bias audits",
            "SHAP explainability",
            "No individual scoring",
        ],
        residual_likelihood=Likelihood.POSSIBLE,
        residual_impact=Impact.SIGNIFICANT,
        additional_controls=[
            "Pre-deployment bias testing with synthetic protected data",
            "Fairness constraints in model training (demographic parity)",
            "External algorithmic audit annually",
            "Bias incident response procedure",
            "Works Council bias review board",
        ],
        risk_owner="Raj Patel (ML Engineer) + Dr. Elke Krause (DPO)",
        review_date="2026-07-13",
    ))

    # R-004: Re-identification of pseudonymized data
    risks.append(PrivacyRisk(
        risk_id="R-004",
        title="Re-identification of pseudonymized employees",
        description=(
            "Pseudonymized monitoring data is re-identified through "
            "correlation with other datasets (HR records, badge logs, "
            "organizational charts) or through behavioral uniqueness "
            "in keystroke patterns."
        ),
        affected_data=["DI-001", "DI-002", "DI-003"],
        affected_subjects="All 2,400 employees",
        threat_source="Data analyst with access to multiple systems, "
                      "motivated individual, external researcher",
        likelihood=Likelihood.POSSIBLE,
        impact=Impact.SIGNIFICANT,
        existing_controls=[
            "Pseudonymization with rotating keys",
            "Data separation (different systems)",
            "Access controls per system",
        ],
        residual_likelihood=Likelihood.UNLIKELY,
        residual_impact=Impact.SIGNIFICANT,
        additional_controls=[
            "k-anonymity (k>=10) for all published datasets",
            "Differential privacy noise injection for analytics",
            "Cross-system access monitoring",
            "Re-identification risk assessment annually",
            "Data separation enforcement (no joins across systems)",
        ],
        risk_owner="Marcus Brandt (CISO)",
        review_date="2026-07-13",
    ))

    # R-005: Data breach notification failure
    risks.append(PrivacyRisk(
        risk_id="R-005",
        title="Failure to detect or notify data breach within 72 hours",
        description=(
            "A breach of monitoring data is not detected in time to "
            "meet the 72-hour notification requirement under GDPR "
            "Article 33, or affected employees are not informed per "
            "Article 34."
        ),
        affected_data=["DI-001", "DI-002", "DI-003", "DI-004"],
        affected_subjects="All 2,400 employees + supervisory authority",
        threat_source="Insufficient monitoring, unclear procedures, "
                      "delayed forensic analysis",
        likelihood=Likelihood.POSSIBLE,
        impact=Impact.MAXIMUM,
        existing_controls=[
            "SIEM monitoring (siem.internal.example.com)",
            "Incident response plan",
            "DPO on-call rotation",
        ],
        residual_likelihood=Likelihood.UNLIKELY,
        residual_impact=Impact.SIGNIFICANT,
        additional_controls=[
            "Automated breach detection rules for monitoring data stores",
            "Pre-drafted notification templates",
            "Quarterly breach simulation exercises",
            "72-hour countdown automation with escalation",
            "Employee communication channel pre-established",
        ],
        risk_owner="Marcus Brandt (CISO) + Dr. Elke Krause (DPO)",
        review_date="2026-07-13",
    ))

    # R-006: Chilling effect on employee behavior
    risks.append(PrivacyRisk(
        risk_id="R-006",
        title="Chilling effect on legitimate employee activities",
        description=(
            "Awareness of monitoring causes employees to avoid "
            "legitimate activities: accessing union resources, "
            "researching employment rights, using mental health "
            "support tools, or whistleblowing channels."
        ),
        affected_data=["DI-001", "DI-002", "DI-006"],
        affected_subjects="All 2,400 employees",
        threat_source="Inherent nature of monitoring systems",
        likelihood=Likelihood.ALMOST_CERTAIN,
        impact=Impact.SIGNIFICANT,
        existing_controls=[
            "Transparent privacy notice to employees",
            "Works Council communication",
            "Category-based exclusions",
        ],
        residual_likelihood=Likelihood.LIKELY,
        residual_impact=Impact.LIMITED,
        additional_controls=[
            "Whitelist of protected categories excluded from monitoring",
            "Union, health, legal, and whistleblower sites excluded",
            "Annual employee privacy awareness survey",
            "Independent ombudsperson for monitoring complaints",
            "Clear communication that monitoring is team-level only",
        ],
        risk_owner="Thomas Mueller (HR Director)",
        review_date="2026-07-13",
    ))

    # R-007: Excessive data retention
    risks.append(PrivacyRisk(
        risk_id="R-007",
        title="Data retained beyond necessary period",
        description=(
            "Technical failure, configuration error, or business "
            "pressure leads to monitoring data being retained beyond "
            "the documented retention periods, violating the storage "
            "limitation principle (Article 5(1)(e))."
        ),
        affected_data=["DI-001", "DI-002", "DI-003", "DI-005"],
        affected_subjects="All 2,400 employees",
        threat_source="Configuration drift, backup retention, "
                      "archive without deletion",
        likelihood=Likelihood.LIKELY,
        impact=Impact.LIMITED,
        existing_controls=[
            "Automated retention policies",
            "Cryptographic erasure",
        ],
        residual_likelihood=Likelihood.UNLIKELY,
        residual_impact=Impact.LIMITED,
        additional_controls=[
            "Monthly retention compliance scan",
            "Backup retention aligned with data retention",
            "Automated deletion verification with audit log",
            "Quarterly retention report to DPO",
        ],
        risk_owner="Marcus Brandt (CISO)",
        review_date="2026-07-13",
    ))

    return risks


def generate_risk_matrix(risks: List[PrivacyRisk]) -> str:
    """Generate text-based risk matrix."""
    # Build 5x5 matrix
    matrix = {}
    for l in Likelihood:
        for i in Impact:
            matrix[(l, i)] = []

    for r in risks:
        matrix[(r.likelihood, r.impact)].append(r.risk_id)

    lines = []
    lines.append("\nINHERENT RISK MATRIX (before additional controls)")
    lines.append("-" * 55)
    lines.append(f"{'':12s} | {'NEGL':8s} | {'LIMIT':8s} | "
                 f"{'SIGNIF':8s} | {'MAX':8s} | {'CATASTR':8s}")
    lines.append("-" * 55)

    for l in reversed(list(Likelihood)):
        row = f"{l.name:12s} |"
        for i in Impact:
            cell = matrix.get((l, i), [])
            cell_str = ",".join(cell) if cell else "."
            row += f" {cell_str:8s} |"
        lines.append(row)
    lines.append("-" * 55)

    # Residual risk matrix
    residual_matrix = {}
    for l in Likelihood:
        for i in Impact:
            residual_matrix[(l, i)] = []

    for r in risks:
        if r.residual_likelihood and r.residual_impact:
            residual_matrix[
                (r.residual_likelihood, r.residual_impact)
            ].append(r.risk_id)

    lines.append("\nRESIDUAL RISK MATRIX (after additional controls)")
    lines.append("-" * 55)
    lines.append(f"{'':12s} | {'NEGL':8s} | {'LIMIT':8s} | "
                 f"{'SIGNIF':8s} | {'MAX':8s} | {'CATASTR':8s}")
    lines.append("-" * 55)

    for l in reversed(list(Likelihood)):
        row = f"{l.name:12s} |"
        for i in Impact:
            cell = residual_matrix.get((l, i), [])
            cell_str = ",".join(cell) if cell else "."
            row += f" {cell_str:8s} |"
        lines.append(row)
    lines.append("-" * 55)

    return "\n".join(lines)


def print_risk_report(risks: List[PrivacyRisk]) -> None:
    """Print full risk assessment report."""
    print("=" * 60)
    print("DPIA RISK ASSESSMENT REPORT")
    print(f"Total Risks Identified: {len(risks)}")
    print("=" * 60)

    for r in sorted(risks, key=lambda x: x.risk_score, reverse=True):
        print(f"\n{'~' * 60}")
        print(f"[{r.risk_id}] {r.title}")
        print(f"{'~' * 60}")
        print(f"  Inherent:  L={r.likelihood.name} x I={r.impact.name} "
              f"= {r.risk_score} ({r.risk_level})")
        if r.residual_score:
            print(f"  Residual:  L={r.residual_likelihood.name} x "
                  f"I={r.residual_impact.name} = {r.residual_score} "
                  f"({r.residual_level})")
        print(f"  Description: {r.description}")
        print(f"  Affected Data: {', '.join(r.affected_data)}")
        print(f"  Threat Source: {r.threat_source}")
        print(f"  Existing Controls:")
        for c in r.existing_controls:
            print(f"    - {c}")
        print(f"  Additional Controls Recommended:")
        for c in r.additional_controls:
            print(f"    + {c}")
        print(f"  Risk Owner: {r.risk_owner}")

    print(generate_risk_matrix(risks))

    # Summary statistics
    inherent_critical = sum(1 for r in risks if r.risk_level == "CRITICAL")
    inherent_high = sum(1 for r in risks if r.risk_level == "HIGH")
    residual_critical = sum(
        1 for r in risks if r.residual_level == "CRITICAL"
    )
    residual_high = sum(1 for r in risks if r.residual_level == "HIGH")

    print(f"\n--- Risk Summary ---")
    print(f"  Inherent:  {inherent_critical} CRITICAL, "
          f"{inherent_high} HIGH")
    print(f"  Residual:  {residual_critical} CRITICAL, "
          f"{residual_high} HIGH")
    if residual_critical > 0:
        print(f"  *** PRIOR CONSULTATION (Art 36) may be required ***")


if __name__ == "__main__":
    risks = assess_risks()
    print_risk_report(risks)

Expected output (truncated):

============================================================
DPIA RISK ASSESSMENT REPORT
Total Risks Identified: 7
============================================================

~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
[R-006] Chilling effect on legitimate employee activities
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
  Inherent:  L=ALMOST_CERTAIN x I=SIGNIFICANT = 15 (CRITICAL)
  Residual:  L=LIKELY x I=LIMITED = 8 (MEDIUM)
  ...

~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
[R-001] Unauthorized access to raw employee monitoring data
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
  Inherent:  L=POSSIBLE x I=MAXIMUM = 12 (HIGH)
  Residual:  L=UNLIKELY x I=MAXIMUM = 8 (MEDIUM)
  ...

INHERENT RISK MATRIX (before additional controls)
-------------------------------------------------------
             | NEGL     | LIMIT    | SIGNIF   | MAX      | CATASTR
-------------------------------------------------------
ALMOST_CERT  | .        | .        | R-006    | .        | .
LIKELY       | .        | R-007    | R-002,R-003 | .     | .
POSSIBLE     | .        | .        | R-004    | R-001,R-005 | .
...

Step 3.3: DPIA Template

Document the complete DPIA findings using a structured template:

DPIA Document Template

Copy and complete this template for your organization's DPIA:

================================================================
DATA PROTECTION IMPACT ASSESSMENT (DPIA)
================================================================
Reference:       DPIA-WS-2026-001
Version:         1.0 (Draft)
Date:            2026-04-13
Status:          Under Review
Classification:  CONFIDENTIAL

----------------------------------------------------------------
SECTION 1: PROCESSING DESCRIPTION
----------------------------------------------------------------
1.1 System Name:     WorkSight AI Employee Monitoring System
1.2 Controller:      Helios Analytics Corp
1.3 DPO:             Dr. Elke Krause
1.4 System Owner:    Sarah Okonkwo (CIO)
1.5 Purpose:         Team-level productivity analysis using
                     AI/ML models applied to aggregated
                     employee behavioral data
1.6 Legal Basis:     Article 6(1)(f) -- Legitimate interest
                     (with LIA documented per activity)
1.7 Data Subjects:   ~2,400 employees (DE, FR, NL)
1.8 Data Categories: Keystroke timing, email metadata,
                     badge access, screen captures (REJECTED),
                     ML productivity scores, bias audit data
1.9 Retention:       7 to 730 days depending on data element
1.10 Recipients:     Team managers, HR, Works Council,
                     Security team (alerts only)
1.11 Transfers:      No third-country transfers
1.12 Processors:     None (all processing in-house)

----------------------------------------------------------------
SECTION 2: NECESSITY & PROPORTIONALITY
----------------------------------------------------------------
2.1 Necessity Assessment:
    - Keystroke timing: NECESSARY (no less invasive alternative
      for measuring active work patterns at team level)
    - Email metadata: NECESSARY (metadata only, no content)
    - Badge access: NECESSARY (dual purpose: safety + analytics)
    - Screen capture: NOT NECESSARY (less invasive alternatives
      exist -- REJECTED BY DPO)
    - ML scoring: NECESSARY (manual analysis insufficient for
      2,400+ employees)

2.2 Proportionality Assessment:
    - Team-level only (no individual scores): PROPORTIONATE
    - Pseudonymization at collection: PROPORTIONATE
    - Minimum team size 10: PROPORTIONATE
    - Screen capture: DISPROPORTIONATE -- REMOVED
    - 90-day retention for raw data: PROPORTIONATE
    - Works Council oversight: PROPORTIONATE

2.3 Data Minimization Measures:
    - Only timing patterns collected (not keystrokes)
    - Only email metadata (not content)
    - Only floor-level movement (not room/desk)
    - Screen capture REMOVED entirely
    - Aggregation to team level before reporting
    - Automatic deletion at retention expiry

2.4 Purpose Limitation:
    - Data ONLY used for team-level productivity insights
    - Individual performance assessment PROHIBITED
    - Employment decisions based on this data PROHIBITED
    - Law enforcement access only with court order
    - Marketing or profiling for other purposes PROHIBITED

----------------------------------------------------------------
SECTION 3: RISK ASSESSMENT SUMMARY
----------------------------------------------------------------
3.1 Total Risks Identified: 7
3.2 Inherent Risk Profile:
    - CRITICAL: 1 (chilling effect)
    - HIGH: 4 (unauthorized access, function creep,
      bias, breach notification)
    - MEDIUM: 2 (re-identification, retention)

3.3 Residual Risk Profile (after controls):
    - CRITICAL: 0
    - HIGH: 0
    - MEDIUM: 5
    - LOW: 2

3.4 Prior Consultation Required? NO -- residual risks
    reduced to MEDIUM/LOW with additional controls.
    (If any residual risk remained CRITICAL or HIGH,
    prior consultation under Art 36 would be mandatory.)

----------------------------------------------------------------
SECTION 4: CONTROLS & MITIGATIONS
----------------------------------------------------------------
[See Phase 4 for detailed control implementation]

----------------------------------------------------------------
SECTION 5: DPO RECOMMENDATIONS
----------------------------------------------------------------
5.1 REMOVE screen capture (PA-004) entirely
5.2 Implement minimum team size of 10 for ML scoring
5.3 Deploy all additional controls before go-live
5.4 Quarterly review with Works Council
5.5 External bias audit within 6 months
5.6 Annual DPIA review and update

----------------------------------------------------------------
SECTION 6: APPROVAL
----------------------------------------------------------------
DPO Approval:     [  ] Approved  [X] Conditionally Approved
                  Condition: Screen capture removed +
                  all Phase 4 controls implemented
CISO Approval:    [  ] Pending
CIO Approval:     [  ] Pending
Works Council:    [  ] Pending
Date:             ____________________
================================================================

DPIA Review Cycle

A DPIA is not a one-time document. GDPR requires review whenever processing changes materially. Schedule reviews:

  • Quarterly: Lightweight review of risk scores and control effectiveness
  • Annually: Full DPIA review with updated risk assessment
  • On change: Any modification to data flows, purposes, or ML models triggers a DPIA review
  • On incident: Any privacy incident triggers an immediate DPIA review

Phase 4: Privacy Control Implementation

Step 4.1: Pseudonymization Engine

Implement a pseudonymization system that replaces direct identifiers with pseudonyms while maintaining data utility:

"""
Pseudonymization Engine for WorkSight AI.
Implements GDPR-compliant pseudonymization with key rotation.
All data is 100% synthetic.
"""

import hashlib
import hmac
import secrets
import json
from datetime import datetime, timedelta
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Tuple


@dataclass
class PseudonymKey:
    """Pseudonymization key with metadata."""
    key_id: str
    key_value: bytes
    created_at: str
    expires_at: str
    active: bool
    algorithm: str = "HMAC-SHA256"


@dataclass
class PseudonymizationEngine:
    """
    GDPR-compliant pseudonymization engine.

    Implements:
    - HMAC-based pseudonymization (deterministic within key period)
    - Monthly key rotation
    - Key separation from pseudonymized data
    - Re-pseudonymization capability for key rotation
    """
    keys: Dict[str, PseudonymKey] = field(default_factory=dict)
    active_key_id: Optional[str] = None
    key_vault_url: str = "https://keyvault.internal.example.com/v1/keys"

    def generate_key(self) -> PseudonymKey:
        """Generate a new pseudonymization key."""
        key_id = f"PSK-{secrets.token_hex(4).upper()}"
        key_value = secrets.token_bytes(32)  # 256-bit key
        now = datetime.now()
        expires = now + timedelta(days=30)  # 30-day rotation

        key = PseudonymKey(
            key_id=key_id,
            key_value=key_value,
            created_at=now.strftime("%Y-%m-%dT%H:%M:%SZ"),
            expires_at=expires.strftime("%Y-%m-%dT%H:%M:%SZ"),
            active=True,
        )

        # Deactivate previous active key
        if self.active_key_id and self.active_key_id in self.keys:
            self.keys[self.active_key_id].active = False

        self.keys[key_id] = key
        self.active_key_id = key_id

        print(f"[KEY] Generated new key {key_id}, "
              f"expires {expires.strftime('%Y-%m-%d')}")
        print(f"[KEY] Key stored at {self.key_vault_url}/{key_id}")
        return key

    def pseudonymize(self, identifier: str) -> Tuple[str, str]:
        """
        Pseudonymize an identifier using HMAC-SHA256.

        Returns (pseudonym, key_id) tuple.
        The pseudonym is deterministic for the same identifier
        within the same key period, enabling data linkage
        for analytics while protecting identity.
        """
        if not self.active_key_id:
            self.generate_key()

        key = self.keys[self.active_key_id]
        pseudonym = hmac.new(
            key.key_value,
            identifier.encode("utf-8"),
            hashlib.sha256,
        ).hexdigest()

        return pseudonym, key.key_id

    def batch_pseudonymize(
        self, identifiers: List[str]
    ) -> List[Dict[str, str]]:
        """Pseudonymize a batch of identifiers."""
        results = []
        for ident in identifiers:
            pseudo, key_id = self.pseudonymize(ident)
            results.append({
                "original_length": len(ident),
                "pseudonym": pseudo,
                "key_id": key_id,
                "algorithm": "HMAC-SHA256",
                "timestamp": datetime.now().strftime(
                    "%Y-%m-%dT%H:%M:%SZ"
                ),
            })
        return results

    def rotate_key(self) -> Tuple[str, str]:
        """
        Rotate to a new key.
        Returns (old_key_id, new_key_id).

        IMPORTANT: After rotation, all existing pseudonymized data
        should be re-pseudonymized with the new key within the
        grace period (7 days). The old key is retained for this
        period only, then securely destroyed.
        """
        old_key_id = self.active_key_id
        new_key = self.generate_key()
        print(f"[KEY] Rotated: {old_key_id} -> {new_key.key_id}")
        print(f"[KEY] Grace period: 7 days to re-pseudonymize data")
        print(f"[KEY] Old key {old_key_id} scheduled for destruction")
        return old_key_id, new_key.key_id

    def destroy_key(self, key_id: str) -> bool:
        """
        Securely destroy a key (cryptographic erasure).

        When the key is destroyed, all data pseudonymized with
        that key becomes irreversibly anonymous -- this is the
        basis of cryptographic erasure for GDPR Article 17
        (right to erasure).
        """
        if key_id in self.keys:
            # Overwrite key material
            self.keys[key_id].key_value = b'\x00' * 32
            self.keys[key_id].active = False
            del self.keys[key_id]
            print(f"[KEY] DESTROYED key {key_id} -- "
                  f"associated data now irreversibly anonymous")
            return True
        return False


def demo_pseudonymization():
    """Demonstrate the pseudonymization engine."""
    engine = PseudonymizationEngine()

    # Generate initial key
    engine.generate_key()

    # Synthetic employee IDs
    employees = [
        "EMP-00142",  # Jan de Vries
        "EMP-00143",  # Raj Patel
        "EMP-00144",  # Amelie Fontaine
        "EMP-00145",  # Thomas Mueller
        "EMP-00146",  # Sarah Okonkwo
    ]

    print("\n--- Pseudonymization Demo ---")
    print(f"{'Employee ID':15s} | {'Pseudonym':64s} | Key ID")
    print("-" * 100)

    for emp in employees:
        pseudo, key_id = engine.pseudonymize(emp)
        print(f"{emp:15s} | {pseudo} | {key_id}")

    # Demonstrate determinism (same input = same output within key period)
    print("\n--- Determinism Check ---")
    p1, _ = engine.pseudonymize("EMP-00142")
    p2, _ = engine.pseudonymize("EMP-00142")
    print(f"Same input, same key: {'MATCH' if p1 == p2 else 'MISMATCH'}")

    # Key rotation
    print("\n--- Key Rotation ---")
    old_id, new_id = engine.rotate_key()
    p3, _ = engine.pseudonymize("EMP-00142")
    print(f"Same input, new key: {'DIFFERENT' if p1 != p3 else 'SAME'}")
    print(f"Old pseudonym: {p1[:16]}...")
    print(f"New pseudonym: {p3[:16]}...")

    # Cryptographic erasure
    print("\n--- Cryptographic Erasure ---")
    engine.destroy_key(old_id)
    print(f"Data under key {old_id} is now irreversibly anonymous")


if __name__ == "__main__":
    demo_pseudonymization()

Expected output:

[KEY] Generated new key PSK-A1B2C3D4, expires 2026-05-13
[KEY] Key stored at https://keyvault.internal.example.com/v1/keys/PSK-A1B2C3D4

--- Pseudonymization Demo ---
Employee ID     | Pseudonym                                                        | Key ID
----------------------------------------------------------------------------------------------------
EMP-00142       | 7f8a3b2c1d4e5f6a7b8c9d0e1f2a3b4c5d6e7f8a9b0c1d2e3f4a5b6c7d8e | PSK-A1B2C3D4
EMP-00143       | 2a3b4c5d6e7f8a9b0c1d2e3f4a5b6c7d8e9f0a1b2c3d4e5f6a7b8c9d0e1f | PSK-A1B2C3D4
...

--- Determinism Check ---
Same input, same key: MATCH

--- Key Rotation ---
[KEY] Generated new key PSK-E5F6A7B8, expires 2026-05-13
[KEY] Rotated: PSK-A1B2C3D4 -> PSK-E5F6A7B8
Same input, new key: DIFFERENT
Old pseudonym: 7f8a3b2c1d4e5f6a...
New pseudonym: 9c1d2e3f4a5b6c7d...

--- Cryptographic Erasure ---
[KEY] DESTROYED key PSK-A1B2C3D4 -- associated data now irreversibly anonymous

Step 4.2: Encryption at Rest and in Transit

Implement encryption controls for all monitoring data:

"""
Encryption Controls for WorkSight AI Data Protection.
Demonstrates AES-256-GCM encryption at rest and TLS configuration.
All data is 100% synthetic.
"""

import os
import json
import base64
import hashlib
from dataclasses import dataclass
from typing import Dict, Optional, Tuple


@dataclass
class EncryptionConfig:
    """Encryption configuration for data at rest."""
    algorithm: str = "AES-256-GCM"
    key_length_bits: int = 256
    iv_length_bytes: int = 12  # 96 bits for GCM
    tag_length_bytes: int = 16  # 128 bits
    key_derivation: str = "HKDF-SHA256"
    key_vault_url: str = "https://keyvault.internal.example.com/v1/keys"


def simulate_encryption_at_rest(
    plaintext: str, config: EncryptionConfig
) -> Dict:
    """
    Simulate AES-256-GCM encryption at rest.

    NOTE: This is a simplified simulation for educational purposes.
    In production, use a properly vetted cryptographic library
    (e.g., Python's cryptography package with Fernet or AES-GCM).
    """
    # Generate synthetic key material
    key = os.urandom(config.key_length_bits // 8)
    iv = os.urandom(config.iv_length_bytes)

    # Simulate encryption (in production, use cryptography.hazmat)
    # Here we just demonstrate the structure
    simulated_ciphertext = hashlib.sha256(
        key + iv + plaintext.encode()
    ).digest()

    result = {
        "algorithm": config.algorithm,
        "key_id": f"KEY-{base64.b16encode(key[:4]).decode()}",
        "iv": base64.b64encode(iv).decode(),
        "ciphertext": base64.b64encode(simulated_ciphertext).decode(),
        "tag": base64.b64encode(os.urandom(config.tag_length_bytes)).decode(),
        "plaintext_length": len(plaintext),
        "encrypted_at": "2026-04-13T10:30:00Z",
        "key_vault_ref": f"{config.key_vault_url}/KEY-WORKSIGHT-DATA",
    }

    return result


def generate_tls_config() -> Dict:
    """
    Generate TLS 1.3 configuration for data in transit.
    Returns configuration suitable for Nginx/HAProxy.
    """
    config = {
        "tls_version": "1.3",
        "min_version": "1.2",
        "cipher_suites_tls13": [
            "TLS_AES_256_GCM_SHA384",
            "TLS_CHACHA20_POLY1305_SHA256",
            "TLS_AES_128_GCM_SHA256",
        ],
        "cipher_suites_tls12": [
            "ECDHE-ECDSA-AES256-GCM-SHA384",
            "ECDHE-RSA-AES256-GCM-SHA384",
            "ECDHE-ECDSA-CHACHA20-POLY1305",
            "ECDHE-RSA-CHACHA20-POLY1305",
        ],
        "certificate": "/etc/tls/worksight.internal.example.com.pem",
        "private_key": "/etc/tls/worksight.internal.example.com-key.pem",
        "ca_bundle": "/etc/tls/helios-ca-bundle.pem",
        "hsts_max_age": 31536000,
        "hsts_include_subdomains": True,
        "ocsp_stapling": True,
        "session_tickets": False,  # Disabled for forward secrecy
        "session_timeout": 300,
    }

    # Nginx configuration
    nginx_config = f"""
# WorkSight AI TLS Configuration
# Server: worksight-ai.internal.example.com (10.60.1.110)
# Generated: 2026-04-13

server {{
    listen 443 ssl http2;
    server_name worksight-ai.internal.example.com;

    # TLS 1.3 Configuration
    ssl_protocols TLSv1.3 TLSv1.2;
    ssl_ciphers '{":".join(config["cipher_suites_tls12"])}';
    ssl_prefer_server_ciphers on;

    ssl_certificate     {config["certificate"]};
    ssl_certificate_key {config["private_key"]};
    ssl_trusted_certificate {config["ca_bundle"]};

    # HSTS
    add_header Strict-Transport-Security
        "max-age={config["hsts_max_age"]}; includeSubDomains" always;

    # OCSP Stapling
    ssl_stapling on;
    ssl_stapling_verify on;

    # Session Configuration
    ssl_session_tickets off;
    ssl_session_timeout {config["session_timeout"]}s;
    ssl_session_cache shared:SSL:10m;

    # Security Headers
    add_header X-Content-Type-Options nosniff always;
    add_header X-Frame-Options DENY always;
    add_header Content-Security-Policy "default-src 'none'" always;
    add_header Referrer-Policy no-referrer always;
}}
"""

    return {
        "config": config,
        "nginx_config": nginx_config,
    }


def generate_encryption_report() -> None:
    """Generate encryption controls report."""
    config = EncryptionConfig()

    print("=" * 60)
    print("ENCRYPTION CONTROLS REPORT -- WorkSight AI")
    print("=" * 60)

    # Demonstrate encryption at rest
    sample_data = json.dumps({
        "employee_hash": "a1b2c3d4e5f6",
        "keystroke_intervals": [120, 95, 110, 130, 88],
        "app_name": "vscode",
        "timestamp": "2026-04-13T10:00:00Z",
    })

    print("\n--- Encryption at Rest ---")
    encrypted = simulate_encryption_at_rest(sample_data, config)
    print(f"  Algorithm:    {encrypted['algorithm']}")
    print(f"  Key ID:       {encrypted['key_id']}")
    print(f"  IV:           {encrypted['iv'][:20]}...")
    print(f"  Ciphertext:   {encrypted['ciphertext'][:20]}...")
    print(f"  Auth Tag:     {encrypted['tag'][:20]}...")
    print(f"  Key Vault:    {encrypted['key_vault_ref']}")
    print(f"  Original:     {encrypted['plaintext_length']} bytes")

    # TLS configuration
    tls = generate_tls_config()
    print("\n--- Encryption in Transit ---")
    print(f"  TLS Version:  {tls['config']['tls_version']}")
    print(f"  Min Version:  {tls['config']['min_version']}")
    print(f"  Cipher Suites (TLS 1.3):")
    for cs in tls['config']['cipher_suites_tls13']:
        print(f"    - {cs}")
    print(f"  HSTS:         {tls['config']['hsts_max_age']}s")
    print(f"  OCSP:         {'Enabled' if tls['config']['ocsp_stapling'] else 'Disabled'}")
    print(f"  Session Tickets: {'Enabled' if tls['config']['session_tickets'] else 'Disabled (forward secrecy)'}")

    # Data store encryption summary
    print("\n--- Data Store Encryption Summary ---")
    stores = [
        ("Raw Keystroke Store", "AES-256-GCM", "TLS 1.3",
         "10.60.1.110", "YES"),
        ("Email Metadata Store", "AES-256-GCM", "TLS 1.3",
         "10.60.1.130", "YES"),
        ("Badge Access Store", "AES-256", "TLS 1.3",
         "10.60.1.120", "YES"),
        ("ML Score Store", "AES-256-GCM", "TLS 1.3",
         "10.60.1.110", "YES"),
        ("Audit Log Store", "AES-256-GCM", "TLS 1.3",
         "10.60.1.150", "YES"),
        ("Bias Audit Store", "AES-256-GCM", "TLS 1.3",
         "10.60.1.110", "YES (DPO only)"),
    ]

    print(f"  {'Store':<25s} | {'At Rest':<12s} | {'Transit':<8s} | "
          f"{'Host':<15s} | Access Ctrl")
    print(f"  {'-'*25}-+-{'-'*12}-+-{'-'*8}-+-{'-'*15}-+-{'-'*15}")
    for name, rest, transit, host, access in stores:
        print(f"  {name:<25s} | {rest:<12s} | {transit:<8s} | "
              f"{host:<15s} | {access}")


if __name__ == "__main__":
    generate_encryption_report()

Step 4.3: Access Control Implementation

Implement role-based access controls with least privilege:

"""
RBAC Implementation for WorkSight AI Data Protection.
Defines roles, permissions, and access audit capabilities.
All data is 100% synthetic.
"""

from dataclasses import dataclass, field
from typing import Dict, List, Set, Optional
from datetime import datetime
from enum import Enum


class Permission(Enum):
    """Granular permissions for WorkSight AI data."""
    READ_RAW_KEYSTROKE = "read:raw:keystroke"
    READ_RAW_EMAIL = "read:raw:email"
    READ_RAW_BADGE = "read:raw:badge"
    READ_AGG_TEAM = "read:aggregated:team"
    READ_AGG_ORG = "read:aggregated:org"
    READ_ML_SCORES = "read:ml:scores"
    READ_ML_MODEL = "read:ml:model"
    READ_BIAS_AUDIT = "read:bias:audit"
    READ_AUDIT_LOG = "read:audit:log"
    WRITE_CONFIG = "write:config"
    ADMIN_KEY_MGMT = "admin:key:management"
    ADMIN_USER_MGMT = "admin:user:management"
    ADMIN_RETENTION = "admin:retention:override"
    EXPORT_DATA = "export:data"
    DELETE_DATA = "delete:data"


@dataclass
class Role:
    """RBAC role definition."""
    role_id: str
    name: str
    description: str
    permissions: Set[Permission]
    max_query_results: int = 100
    requires_mfa: bool = True
    requires_justification: bool = False
    time_limited: bool = False
    time_limit_hours: int = 0


@dataclass
class AccessEvent:
    """Audit log entry for data access."""
    event_id: str
    timestamp: str
    user_id: str
    role: str
    permission_used: str
    resource_accessed: str
    query_details: str
    result_count: int
    source_ip: str
    justification: Optional[str] = None
    approved_by: Optional[str] = None


def define_roles() -> Dict[str, Role]:
    """Define RBAC roles for WorkSight AI."""

    roles = {}

    # Privacy Engineer -- primary operational role
    roles["privacy-engineer"] = Role(
        role_id="ROLE-001",
        name="Privacy Engineer",
        description="Manages pseudonymization, retention, and data "
                    "quality. Access to raw data for pipeline maintenance.",
        permissions={
            Permission.READ_RAW_KEYSTROKE,
            Permission.READ_RAW_EMAIL,
            Permission.READ_RAW_BADGE,
            Permission.READ_AGG_TEAM,
            Permission.READ_AUDIT_LOG,
            Permission.WRITE_CONFIG,
            Permission.ADMIN_RETENTION,
        },
        max_query_results=1000,
        requires_mfa=True,
        requires_justification=True,
    )

    # ML Engineer -- model training and monitoring
    roles["ml-engineer"] = Role(
        role_id="ROLE-002",
        name="ML Engineer",
        description="Trains and maintains productivity scoring models. "
                    "Access to pseudonymized features only.",
        permissions={
            Permission.READ_RAW_KEYSTROKE,  # pseudonymized
            Permission.READ_RAW_EMAIL,      # pseudonymized
            Permission.READ_ML_SCORES,
            Permission.READ_ML_MODEL,
        },
        max_query_results=500,
        requires_mfa=True,
        requires_justification=False,
    )

    # Team Manager -- aggregated reports only
    roles["team-manager"] = Role(
        role_id="ROLE-003",
        name="Team Manager",
        description="Views team-level aggregated reports. "
                    "NO access to raw or individual data.",
        permissions={
            Permission.READ_AGG_TEAM,
            Permission.READ_ML_SCORES,
        },
        max_query_results=50,
        requires_mfa=True,
        requires_justification=False,
    )

    # DPO -- oversight and audit
    roles["dpo"] = Role(
        role_id="ROLE-004",
        name="Data Protection Officer",
        description="Full audit access for compliance monitoring. "
                    "Cannot modify data or configuration.",
        permissions={
            Permission.READ_AGG_TEAM,
            Permission.READ_AGG_ORG,
            Permission.READ_ML_SCORES,
            Permission.READ_BIAS_AUDIT,
            Permission.READ_AUDIT_LOG,
        },
        max_query_results=5000,
        requires_mfa=True,
        requires_justification=False,
    )

    # Security Team -- incident response
    roles["security-team"] = Role(
        role_id="ROLE-005",
        name="Security Team",
        description="Access to security-relevant events only. "
                    "Time-limited access during investigations.",
        permissions={
            Permission.READ_RAW_BADGE,
            Permission.READ_AUDIT_LOG,
        },
        max_query_results=200,
        requires_mfa=True,
        requires_justification=True,
        time_limited=True,
        time_limit_hours=24,
    )

    # Works Council -- oversight
    roles["works-council"] = Role(
        role_id="ROLE-006",
        name="Works Council Representative",
        description="Quarterly access to aggregated reports and "
                    "bias audit results for oversight.",
        permissions={
            Permission.READ_AGG_ORG,
            Permission.READ_ML_SCORES,
            Permission.READ_BIAS_AUDIT,
        },
        max_query_results=50,
        requires_mfa=True,
        requires_justification=False,
    )

    # System Admin -- infrastructure only
    roles["sys-admin"] = Role(
        role_id="ROLE-007",
        name="System Administrator",
        description="Infrastructure management. Access to encryption "
                    "keys and system configuration. NO data access.",
        permissions={
            Permission.ADMIN_KEY_MGMT,
            Permission.ADMIN_USER_MGMT,
            Permission.WRITE_CONFIG,
        },
        max_query_results=0,  # No data queries
        requires_mfa=True,
        requires_justification=True,
    )

    return roles


def check_access(
    user_role: str,
    requested_permission: Permission,
    roles: Dict[str, Role],
) -> Dict:
    """Check if a role has a specific permission."""
    if user_role not in roles:
        return {
            "granted": False,
            "reason": f"Role '{user_role}' does not exist",
        }

    role = roles[user_role]
    granted = requested_permission in role.permissions

    result = {
        "granted": granted,
        "role": role.name,
        "permission": requested_permission.value,
        "requires_mfa": role.requires_mfa,
        "requires_justification": role.requires_justification,
    }

    if not granted:
        result["reason"] = (
            f"Permission '{requested_permission.value}' is not assigned "
            f"to role '{role.name}'"
        )
    if role.time_limited:
        result["time_limited"] = True
        result["time_limit_hours"] = role.time_limit_hours

    return result


def simulate_access_audit(roles: Dict[str, Role]) -> List[AccessEvent]:
    """Simulate access audit events."""
    events = [
        AccessEvent(
            event_id="EVT-001",
            timestamp="2026-04-13T09:00:00Z",
            user_id="testuser",
            role="team-manager",
            permission_used="read:aggregated:team",
            resource_accessed="team_scores_engineering_q2",
            query_details="SELECT team_id, score FROM team_scores "
                          "WHERE department='Engineering'",
            result_count=3,
            source_ip="10.60.2.42",
        ),
        AccessEvent(
            event_id="EVT-002",
            timestamp="2026-04-13T09:15:00Z",
            user_id="testuser",
            role="team-manager",
            permission_used="read:raw:keystroke",
            resource_accessed="keystroke_raw_data",
            query_details="SELECT * FROM keystroke_data WHERE "
                          "employee_hash='a1b2c3d4'",
            result_count=0,
            source_ip="10.60.2.42",
            justification="DENIED -- insufficient permissions",
        ),
        AccessEvent(
            event_id="EVT-003",
            timestamp="2026-04-13T10:00:00Z",
            user_id="testuser",
            role="dpo",
            permission_used="read:bias:audit",
            resource_accessed="bias_audit_q1_2026",
            query_details="SELECT * FROM bias_audits WHERE "
                          "quarter='2026-Q1'",
            result_count=1,
            source_ip="10.60.3.10",
        ),
    ]
    return events


def print_rbac_report(roles: Dict[str, Role]) -> None:
    """Print RBAC configuration report."""
    print("=" * 60)
    print("RBAC CONFIGURATION REPORT -- WorkSight AI")
    print(f"Total Roles: {len(roles)}")
    print("=" * 60)

    for role_name, role in roles.items():
        print(f"\n--- {role.name} ({role.role_id}) ---")
        print(f"  Description: {role.description}")
        print(f"  MFA Required: {role.requires_mfa}")
        print(f"  Justification Required: {role.requires_justification}")
        if role.time_limited:
            print(f"  Time Limited: {role.time_limit_hours}h per session")
        print(f"  Max Query Results: {role.max_query_results}")
        print(f"  Permissions:")
        for p in sorted(role.permissions, key=lambda x: x.value):
            print(f"    - {p.value}")

    # Permission matrix
    print(f"\n{'=' * 60}")
    print("PERMISSION MATRIX")
    print(f"{'=' * 60}")
    all_perms = sorted(Permission, key=lambda x: x.value)
    header = f"{'Permission':<30s}"
    for rn in roles:
        header += f" | {rn[:8]:8s}"
    print(header)
    print("-" * len(header))
    for p in all_perms:
        row = f"{p.value:<30s}"
        for rn, role in roles.items():
            has = "X" if p in role.permissions else "."
            row += f" | {has:^8s}"
        print(row)

    # Access audit
    print(f"\n{'=' * 60}")
    print("SAMPLE ACCESS AUDIT LOG")
    print(f"{'=' * 60}")
    events = simulate_access_audit(roles)
    for e in events:
        status = "DENIED" if e.justification and "DENIED" in e.justification else "GRANTED"
        print(f"  [{e.event_id}] {e.timestamp} | {e.user_id}@{e.role} | "
              f"{e.permission_used} | {status} | Results: {e.result_count}")


if __name__ == "__main__":
    roles = define_roles()
    print_rbac_report(roles)

    # Demo access checks
    print(f"\n{'=' * 60}")
    print("ACCESS CHECK EXAMPLES")
    print(f"{'=' * 60}")

    checks = [
        ("team-manager", Permission.READ_AGG_TEAM),
        ("team-manager", Permission.READ_RAW_KEYSTROKE),
        ("dpo", Permission.READ_BIAS_AUDIT),
        ("ml-engineer", Permission.EXPORT_DATA),
        ("works-council", Permission.READ_ML_SCORES),
    ]

    for role_name, perm in checks:
        result = check_access(role_name, perm, roles)
        status = "GRANTED" if result["granted"] else "DENIED"
        print(f"  {role_name} -> {perm.value}: {status}")
        if not result["granted"]:
            print(f"    Reason: {result.get('reason', 'N/A')}")

Step 4.4: Data Minimization Pipeline

Implement automated data minimization controls:

"""
Data Minimization Pipeline for WorkSight AI.
Implements GDPR Article 5(1)(c) data minimization principle.
All data is 100% synthetic.
"""

import json
from dataclasses import dataclass
from typing import List, Dict, Any, Optional, Callable
from datetime import datetime, timedelta


@dataclass
class MinimizationRule:
    """Data minimization rule."""
    rule_id: str
    name: str
    description: str
    data_element: str
    action: str  # "remove", "aggregate", "generalize", "pseudonymize"
    threshold: Optional[str] = None
    implementation: Optional[str] = None


def define_minimization_rules() -> List[MinimizationRule]:
    """Define data minimization rules for WorkSight AI."""
    rules = []

    rules.append(MinimizationRule(
        rule_id="MIN-001",
        name="Remove keystroke content",
        description="Strip actual key values -- retain only timing "
                    "intervals between keystrokes",
        data_element="DI-001",
        action="remove",
        implementation=(
            "Endpoint agent captures only inter-key intervals in "
            "milliseconds. Key identity is never captured, transmitted, "
            "or stored. This is enforced at the agent level with no "
            "configuration option to enable content capture."
        ),
    ))

    rules.append(MinimizationRule(
        rule_id="MIN-002",
        name="Strip email content",
        description="Collect only email metadata (timestamps, sizes) -- "
                    "never access email body, subject, or attachments",
        data_element="DI-003",
        action="remove",
        implementation=(
            "Email gateway integration uses Exchange Web Services with "
            "scope limited to metadata only. The service account has "
            "NO permission to read message content. This is enforced "
            "at the API permission level."
        ),
    ))

    rules.append(MinimizationRule(
        rule_id="MIN-003",
        name="Aggregate to team level",
        description="All metrics aggregated to team level (min 10 "
                    "members) before any reporting or storage beyond "
                    "the processing buffer",
        data_element="DI-001, DI-002, DI-003",
        action="aggregate",
        threshold="Minimum 10 individuals per aggregation group",
        implementation=(
            "Aggregation service enforces k>=10 for all queries. "
            "Any query that would return results for fewer than 10 "
            "individuals is automatically blocked with an audit log "
            "entry. This is enforced in the query engine, not the "
            "application layer."
        ),
    ))

    rules.append(MinimizationRule(
        rule_id="MIN-004",
        name="Generalize badge location",
        description="Badge access locations generalized to floor level -- "
                    "no room, desk, or precise location tracking",
        data_element="DI-004",
        action="generalize",
        implementation=(
            "Badge reader identifiers are mapped to floor-level "
            "location labels at ingestion. The mapping table contains "
            "only building and floor -- never room or zone identifiers."
        ),
    ))

    rules.append(MinimizationRule(
        rule_id="MIN-005",
        name="Exclude sensitive application categories",
        description="Filter out applications in sensitive categories "
                    "from monitoring data",
        data_element="DI-002",
        action="remove",
        implementation=(
            "Application category exclusion list maintained by DPO. "
            "Excluded categories: health/medical, union/labor, "
            "legal/attorney, personal finance, dating, religious, "
            "political. Applications matching these categories are "
            "replaced with '[EXCLUDED-CATEGORY]' in the data stream."
        ),
    ))

    rules.append(MinimizationRule(
        rule_id="MIN-006",
        name="Automated retention enforcement",
        description="Automatically delete data beyond retention period "
                    "using cryptographic erasure",
        data_element="ALL",
        action="remove",
        implementation=(
            "Cron job runs daily at 02:00 UTC on dpia-server "
            "(10.60.1.100). Identifies data beyond retention threshold "
            "per data element. Executes cryptographic erasure by "
            "destroying encryption keys. Generates deletion "
            "certificate with hash of deleted records. Certificate "
            "stored in audit log (730-day retention)."
        ),
    ))

    return rules


def apply_minimization(
    raw_record: Dict[str, Any],
    rules: List[MinimizationRule],
) -> Dict[str, Any]:
    """
    Apply minimization rules to a raw data record.
    Returns the minimized record.
    """
    minimized = raw_record.copy()

    # Sensitive application categories to exclude
    excluded_apps = {
        "health-tracker", "medical-portal", "union-portal",
        "legal-advisor", "personal-finance", "dating-app",
    }

    # MIN-001: Remove keystroke content (keep only intervals)
    if "key_value" in minimized:
        del minimized["key_value"]
        minimized["_minimized"] = minimized.get("_minimized", [])
        minimized["_minimized"].append("MIN-001: key_value removed")

    # MIN-002: Remove email content
    for field in ["email_body", "email_subject", "attachments"]:
        if field in minimized:
            del minimized[field]
            minimized["_minimized"] = minimized.get("_minimized", [])
            minimized["_minimized"].append(
                f"MIN-002: {field} removed"
            )

    # MIN-005: Exclude sensitive apps
    if "app_name" in minimized:
        if minimized["app_name"].lower() in excluded_apps:
            minimized["app_name"] = "[EXCLUDED-CATEGORY]"
            minimized["_minimized"] = minimized.get("_minimized", [])
            minimized["_minimized"].append(
                "MIN-005: sensitive app excluded"
            )

    # MIN-004: Generalize location
    if "room_number" in minimized:
        del minimized["room_number"]
        minimized["_minimized"] = minimized.get("_minimized", [])
        minimized["_minimized"].append(
            "MIN-004: room_number removed (floor-level only)"
        )

    return minimized


def demo_minimization():
    """Demonstrate data minimization pipeline."""
    rules = define_minimization_rules()

    print("=" * 60)
    print("DATA MINIMIZATION PIPELINE DEMO")
    print("=" * 60)

    # Print rules
    print("\n--- Active Minimization Rules ---")
    for r in rules:
        print(f"\n  [{r.rule_id}] {r.name}")
        print(f"    Action:     {r.action}")
        print(f"    Element:    {r.data_element}")
        if r.threshold:
            print(f"    Threshold:  {r.threshold}")

    # Demo: Apply minimization to sample records
    sample_records = [
        {
            "timestamp": "2026-04-13T10:00:00Z",
            "employee_hash": "a1b2c3d4",
            "key_value": "a",  # This should be removed
            "interval_ms": 120,
            "app_name": "vscode",
        },
        {
            "timestamp": "2026-04-13T10:05:00Z",
            "employee_hash": "e5f6a7b8",
            "key_value": "b",  # This should be removed
            "interval_ms": 95,
            "app_name": "health-tracker",  # Sensitive -- exclude
        },
        {
            "timestamp": "2026-04-13T10:10:00Z",
            "employee_hash": "c9d0e1f2",
            "email_body": "Hello, meeting at 3pm...",  # Remove
            "email_subject": "Team meeting",  # Remove
            "send_timestamp": "2026-04-13T10:10:00Z",
            "size_bytes": 1500,
        },
        {
            "timestamp": "2026-04-13T08:00:00Z",
            "badge_id": "EMP-00142",
            "building": "BUILDING-A",
            "floor": "FLOOR-3",
            "room_number": "305A",  # Generalize -- remove
        },
    ]

    print("\n--- Minimization Results ---")
    for i, record in enumerate(sample_records):
        minimized = apply_minimization(record, rules)
        print(f"\n  Record {i+1}:")
        print(f"    Before: {json.dumps(record, indent=None)}")
        print(f"    After:  {json.dumps(minimized, indent=None)}")
        if "_minimized" in minimized:
            print(f"    Rules:  {', '.join(minimized['_minimized'])}")


if __name__ == "__main__":
    demo_minimization()

Expected output:

--- Minimization Results ---

  Record 1:
    Before: {"timestamp": "...", "key_value": "a", "interval_ms": 120, ...}
    After:  {"timestamp": "...", "interval_ms": 120, "app_name": "vscode", ...}
    Rules:  MIN-001: key_value removed

  Record 2:
    Before: {"timestamp": "...", "key_value": "b", "app_name": "health-tracker", ...}
    After:  {"timestamp": "...", "interval_ms": 95, "app_name": "[EXCLUDED-CATEGORY]", ...}
    Rules:  MIN-001: key_value removed, MIN-005: sensitive app excluded
  ...

Data Minimization Best Practices

The data minimization principle (GDPR Article 5(1)(c)) requires that personal data be:

  1. Adequate -- sufficient for the stated purpose
  2. Relevant -- directly related to the processing purpose
  3. Limited -- no more than necessary for the purpose

Technical enforcement is always preferable to policy controls. In this lab, minimization is enforced at the collection point (endpoint agent), the API permission level (email gateway), the query engine (k-anonymity), and the data pipeline (category exclusion).


Phase 5: LINDDUN Threat Analysis

Step 5.1: LINDDUN Framework Overview

LINDDUN is a privacy threat modeling framework that identifies seven categories of privacy threats. Apply each category to the WorkSight AI system.

LINDDUN Categories Applied to WorkSight AI

mindmap
  root((LINDDUN<br/>Threat Analysis))
    L["**Linkability**<br/>Can data from different<br/>sources be linked to<br/>the same employee?"]
    I["**Identifiability**<br/>Can a specific employee<br/>be identified from<br/>pseudonymized data?"]
    Nr["**Non-repudiation**<br/>Can employees deny<br/>actions that the system<br/>attributes to them?"]
    D["**Detectability**<br/>Can an observer detect<br/>that an employee is<br/>being monitored?"]
    Di["**Disclosure**<br/>Can monitoring data be<br/>disclosed to unauthorized<br/>parties?"]
    U["**Unawareness**<br/>Are employees fully<br/>aware of all monitoring<br/>and its implications?"]
    Nc["**Non-compliance**<br/>Does the system comply<br/>with all applicable<br/>privacy regulations?"]

Step 5.2: Detailed LINDDUN Analysis

"""
LINDDUN Privacy Threat Analysis for WorkSight AI.
Applies all 7 LINDDUN categories to identify privacy threats.
"""

from dataclasses import dataclass, field
from typing import List, Dict, Optional
from enum import Enum


class LINDDUNCategory(Enum):
    """LINDDUN threat categories."""
    LINKABILITY = "L - Linkability"
    IDENTIFIABILITY = "I - Identifiability"
    NON_REPUDIATION = "Nr - Non-repudiation"
    DETECTABILITY = "D - Detectability"
    DISCLOSURE = "Di - Disclosure of information"
    UNAWARENESS = "U - Unawareness"
    NON_COMPLIANCE = "Nc - Non-compliance"


class ThreatSeverity(Enum):
    """Threat severity levels."""
    LOW = "Low"
    MEDIUM = "Medium"
    HIGH = "High"
    CRITICAL = "Critical"


@dataclass
class LINDDUNThreat:
    """Individual LINDDUN privacy threat."""
    threat_id: str
    category: LINDDUNCategory
    title: str
    description: str
    attack_scenario: str
    affected_components: List[str]
    severity: ThreatSeverity
    privacy_pattern: str  # recommended countermeasure pattern
    current_controls: List[str]
    residual_risk: str
    dpia_risk_ref: Optional[str] = None


def analyze_linddun_threats() -> List[LINDDUNThreat]:
    """Perform LINDDUN threat analysis for WorkSight AI."""

    threats = []

    # === LINKABILITY ===
    threats.append(LINDDUNThreat(
        threat_id="LT-L01",
        category=LINDDUNCategory.LINKABILITY,
        title="Cross-stream behavioral linkage",
        description=(
            "An analyst with access to multiple data streams (keystroke "
            "timing + email metadata + badge access) can link records "
            "across streams using temporal correlation, even when each "
            "stream is individually pseudonymized."
        ),
        attack_scenario=(
            "Attacker observes that pseudonym X in the keystroke system "
            "has a typing pattern that slows at 12:15 PM daily. Badge "
            "data shows employee Y exits the building at 12:15 PM daily "
            "for lunch. Temporal correlation links X to Y."
        ),
        affected_components=[
            "Pseudonymization Engine",
            "Keystroke data store (10.60.1.110)",
            "Badge data store (10.60.1.120)",
        ],
        severity=ThreatSeverity.HIGH,
        privacy_pattern="Mix Zone / Unlinkability Pattern -- use "
                        "different pseudonyms per data stream with no "
                        "cross-reference table",
        current_controls=[
            "Separate pseudonymization keys per data stream",
            "No cross-stream queries permitted",
        ],
        residual_risk="MEDIUM -- temporal correlation still possible",
        dpia_risk_ref="R-004",
    ))

    threats.append(LINDDUNThreat(
        threat_id="LT-L02",
        category=LINDDUNCategory.LINKABILITY,
        title="Small team re-linkage",
        description=(
            "In teams with fewer than 10 members, aggregated team "
            "scores can be reverse-engineered to identify individual "
            "contributions by comparing scores before and after "
            "team membership changes."
        ),
        attack_scenario=(
            "Team A has 8 members. One member goes on leave. The team "
            "score changes from 72 to 68. A manager infers that the "
            "absent member contributed approximately +4 to the score."
        ),
        affected_components=["ML Scoring Engine (10.60.1.110)"],
        severity=ThreatSeverity.HIGH,
        privacy_pattern="k-Anonymity enforcement -- minimum group "
                        "size of 10 for any aggregated output",
        current_controls=[
            "Minimum team size of 10 enforced in query engine",
        ],
        residual_risk="LOW -- if minimum size strictly enforced",
    ))

    # === IDENTIFIABILITY ===
    threats.append(LINDDUNThreat(
        threat_id="LT-I01",
        category=LINDDUNCategory.IDENTIFIABILITY,
        title="Keystroke biometric identification",
        description=(
            "Keystroke timing patterns are unique enough to serve as "
            "biometric identifiers. Even pseudonymized timing data may "
            "enable identification if an attacker has a reference "
            "sample of the target's typing pattern."
        ),
        attack_scenario=(
            "An attacker obtains a reference keystroke sample from a "
            "target (e.g., from a public presentation where the target "
            "typed on screen). The attacker compares this reference "
            "against pseudonymized patterns in the WorkSight database "
            "to identify the target's records."
        ),
        affected_components=[
            "Keystroke data store (10.60.1.110)",
            "Endpoint agents (10.60.2.x)",
        ],
        severity=ThreatSeverity.HIGH,
        privacy_pattern="Differential Privacy -- add calibrated noise "
                        "to keystroke intervals before storage to prevent "
                        "biometric identification while preserving "
                        "aggregate utility",
        current_controls=[
            "Pseudonymization of employee ID",
            "No raw pattern matching capability exposed",
        ],
        residual_risk="MEDIUM -- patterns inherently identifying",
    ))

    threats.append(LINDDUNThreat(
        threat_id="LT-I02",
        category=LINDDUNCategory.IDENTIFIABILITY,
        title="Application fingerprinting identification",
        description=(
            "Unique combinations of applications used by an individual "
            "can serve as a fingerprint. Even pseudonymized, if "
            "someone uses a rare application combination, they are "
            "identifiable."
        ),
        attack_scenario=(
            "Only one employee in the organization uses both "
            "specialized-tool-x and specialized-tool-y. Application "
            "usage logs (even pseudonymized) make this employee "
            "trivially identifiable."
        ),
        affected_components=["Application usage data store (10.60.1.110)"],
        severity=ThreatSeverity.MEDIUM,
        privacy_pattern="Generalization -- replace specific application "
                        "names with categories (e.g., 'IDE', 'Browser', "
                        "'Communication') to prevent fingerprinting",
        current_controls=[
            "Pseudonymization of employee ID",
            "Category-based filtering for sensitive apps",
        ],
        residual_risk="MEDIUM -- rare app combinations still identifying",
    ))

    # === NON-REPUDIATION ===
    threats.append(LINDDUNThreat(
        threat_id="LT-Nr01",
        category=LINDDUNCategory.NON_REPUDIATION,
        title="Undeniable activity attribution",
        description=(
            "The monitoring system creates an undeniable record of "
            "employee activities. Employees cannot plausibly deny "
            "their work patterns, application usage, or physical "
            "location, even for legitimate privacy reasons."
        ),
        attack_scenario=(
            "An employee accesses a whistleblower portal during work "
            "hours. Even though the app is in the excluded category "
            "list, the keystroke timing gap (no typing during that "
            "period) and badge data (present at desk) create an "
            "undeniable record that the employee was doing 'something "
            "else' during that time."
        ),
        affected_components=[
            "All data streams",
            "Audit log (10.60.1.150)",
        ],
        severity=ThreatSeverity.MEDIUM,
        privacy_pattern="Plausible Deniability Pattern -- introduce "
                        "intentional noise in activity records so that "
                        "gaps and anomalies are expected and normal",
        current_controls=[
            "Sensitive application category exclusion",
            "Team-level aggregation (individual patterns not reported)",
        ],
        residual_risk="MEDIUM -- raw data still contains individual gaps",
    ))

    # === DETECTABILITY ===
    threats.append(LINDDUNThreat(
        threat_id="LT-D01",
        category=LINDDUNCategory.DETECTABILITY,
        title="Monitoring agent detection by employees",
        description=(
            "Employees can detect that monitoring software is active "
            "on their workstations, leading to behavioral modification "
            "(chilling effect) or attempts to circumvent monitoring."
        ),
        attack_scenario=(
            "An employee discovers the WorkSight endpoint agent in "
            "Task Manager. They begin using their personal phone for "
            "sensitive communications instead of their workstation, "
            "or they install counter-monitoring tools."
        ),
        affected_components=["Endpoint agents (10.60.2.x)"],
        severity=ThreatSeverity.LOW,
        privacy_pattern="Transparency Pattern -- this is actually "
                        "DESIRED. GDPR requires transparency. Employees "
                        "SHOULD know they are being monitored. The threat "
                        "is only if transparency leads to chilling effect.",
        current_controls=[
            "Employee privacy notice (transparency)",
            "Works Council communication",
            "Clear statement of monitoring scope",
        ],
        residual_risk="LOW -- transparency is a GDPR requirement, "
                      "not a threat. Chilling effect is addressed "
                      "separately (R-006).",
    ))

    # === DISCLOSURE OF INFORMATION ===
    threats.append(LINDDUNThreat(
        threat_id="LT-Di01",
        category=LINDDUNCategory.DISCLOSURE,
        title="Insider disclosure of raw monitoring data",
        description=(
            "A privileged insider (privacy engineer, sys admin) "
            "exfiltrates raw monitoring data and discloses it to "
            "unauthorized parties (media, competitors, blackmailers)."
        ),
        attack_scenario=(
            "A disgruntled privacy engineer with access to the "
            "pseudonymization key vault exports the key mapping "
            "table and raw data, enabling full re-identification "
            "of all 2,400 employees' monitoring data."
        ),
        affected_components=[
            "Key Vault (10.60.1.140)",
            "Raw data stores",
            "Pseudonymization Engine",
        ],
        severity=ThreatSeverity.CRITICAL,
        privacy_pattern="Separation of Duties Pattern -- no single "
                        "person should have access to both pseudonymized "
                        "data AND the pseudonymization keys",
        current_controls=[
            "RBAC with least privilege",
            "Audit logging",
            "Network segmentation",
        ],
        residual_risk="MEDIUM -- separation of duties not fully "
                      "implemented in current design",
        dpia_risk_ref="R-001",
    ))

    threats.append(LINDDUNThreat(
        threat_id="LT-Di02",
        category=LINDDUNCategory.DISCLOSURE,
        title="ML model inversion attack",
        description=(
            "An attacker with access to the ML productivity model "
            "performs a model inversion attack to extract individual "
            "training data points from the model parameters."
        ),
        attack_scenario=(
            "A team manager with access to the ML scoring API sends "
            "crafted queries to the model. By observing how the "
            "model's score changes with different input combinations, "
            "the manager reverse-engineers individual employee "
            "behavioral patterns that were used in training."
        ),
        affected_components=["ML Scoring Engine (10.60.1.110)"],
        severity=ThreatSeverity.MEDIUM,
        privacy_pattern="Differential Privacy in ML Training -- add "
                        "noise during model training (DP-SGD) and limit "
                        "API query rate to prevent model inversion",
        current_controls=[
            "Team-level scoring only",
            "API rate limiting",
            "Query audit logging",
        ],
        residual_risk="MEDIUM -- model inversion defenses not yet "
                      "implemented",
    ))

    # === UNAWARENESS ===
    threats.append(LINDDUNThreat(
        threat_id="LT-U01",
        category=LINDDUNCategory.UNAWARENESS,
        title="Incomplete transparency about ML scoring logic",
        description=(
            "Employees are not fully aware of how the ML productivity "
            "model works, what features it uses, how scores are "
            "calculated, or how scores might indirectly affect them "
            "through team management decisions."
        ),
        attack_scenario=(
            "Employees receive a privacy notice stating 'team-level "
            "productivity analysis' but do not understand that their "
            "keystroke rhythms, email patterns, and physical movements "
            "are combined into a composite score. A manager uses "
            "consistently low team scores to justify restructuring, "
            "effectively punishing individual employees."
        ),
        affected_components=[
            "Privacy notice",
            "ML Scoring Engine (10.60.1.110)",
            "Management reporting",
        ],
        severity=ThreatSeverity.HIGH,
        privacy_pattern="Informed Consent Pattern / Transparency "
                        "Dashboard -- provide clear, layered privacy "
                        "information and enable employees to view their "
                        "own data and understand the scoring model",
        current_controls=[
            "Privacy notice provided at onboarding",
            "Works Council communication",
            "SHAP values for model explainability",
        ],
        residual_risk="MEDIUM -- technical explainability exists but "
                      "communication to employees is insufficient",
    ))

    # === NON-COMPLIANCE ===
    threats.append(LINDDUNThreat(
        threat_id="LT-Nc01",
        category=LINDDUNCategory.NON_COMPLIANCE,
        title="Failure to honor right to object (Article 21)",
        description=(
            "The system lacks a technical mechanism for employees to "
            "exercise their right to object to processing based on "
            "legitimate interest (Article 21(1)). Even if the right "
            "is acknowledged in policy, there is no automated way "
            "to exclude an objecting employee's data."
        ),
        attack_scenario=(
            "An employee submits an Article 21 objection to the DPO. "
            "Due to the pseudonymized architecture, it takes 2 weeks "
            "to identify and exclude the employee's data from all "
            "processing streams. During this time, the employee's "
            "rights are violated."
        ),
        affected_components=[
            "All data collection agents",
            "Pseudonymization Engine",
            "Data processing pipeline",
        ],
        severity=ThreatSeverity.HIGH,
        privacy_pattern="Data Subject Rights Automation -- implement "
                        "automated opt-out mechanism that propagates "
                        "across all data streams within 24 hours",
        current_controls=[
            "DPO handles objections manually",
            "Policy acknowledges Article 21 right",
        ],
        residual_risk="HIGH -- no automated mechanism exists",
    ))

    threats.append(LINDDUNThreat(
        threat_id="LT-Nc02",
        category=LINDDUNCategory.NON_COMPLIANCE,
        title="Inadequate Article 13/14 information provision",
        description=(
            "The privacy notice provided to employees does not meet "
            "the full requirements of Article 13 (information to be "
            "provided where data is collected from the data subject), "
            "particularly regarding the logic of automated processing "
            "and profiling, and the right to object."
        ),
        attack_scenario=(
            "A supervisory authority audit reveals that the employee "
            "privacy notice omits: (1) details of profiling logic, "
            "(2) the legitimate interest balancing test results, "
            "(3) the existence of automated decision-making safeguards, "
            "and (4) the right to lodge a complaint. The authority "
            "issues a corrective order."
        ),
        affected_components=["Privacy notice", "HR onboarding process"],
        severity=ThreatSeverity.MEDIUM,
        privacy_pattern="Layered Privacy Notice Pattern -- provide "
                        "short-form notice at collection point with "
                        "link to full notice covering all Art 13 items",
        current_controls=[
            "Basic privacy notice at onboarding",
        ],
        residual_risk="MEDIUM -- notice exists but is incomplete",
    ))

    return threats


def build_threat_control_matrix(
    threats: List[LINDDUNThreat],
) -> Dict[str, List[str]]:
    """Build threat-to-control mapping matrix."""
    matrix = {}
    for t in threats:
        matrix[t.threat_id] = {
            "category": t.category.value,
            "title": t.title,
            "severity": t.severity.value,
            "privacy_pattern": t.privacy_pattern,
            "controls": t.current_controls,
        }
    return matrix


def print_linddun_report(threats: List[LINDDUNThreat]) -> None:
    """Print LINDDUN analysis report."""
    print("=" * 60)
    print("LINDDUN PRIVACY THREAT ANALYSIS -- WorkSight AI")
    print(f"Total Threats Identified: {len(threats)}")
    print("=" * 60)

    # Group by category
    by_category = {}
    for t in threats:
        cat = t.category.value
        if cat not in by_category:
            by_category[cat] = []
        by_category[cat].append(t)

    for cat, cat_threats in by_category.items():
        print(f"\n{'=' * 60}")
        print(f"  {cat}")
        print(f"{'=' * 60}")
        for t in cat_threats:
            print(f"\n  [{t.threat_id}] {t.title}")
            print(f"    Severity:       {t.severity.value}")
            print(f"    Description:    {t.description[:100]}...")
            print(f"    Attack:         {t.attack_scenario[:100]}...")
            print(f"    Pattern:        {t.privacy_pattern[:80]}...")
            print(f"    Residual Risk:  {t.residual_risk}")
            if t.dpia_risk_ref:
                print(f"    DPIA Risk Ref:  {t.dpia_risk_ref}")

    # Summary
    print(f"\n{'=' * 60}")
    print("THREAT SUMMARY BY SEVERITY")
    print(f"{'=' * 60}")
    for sev in ThreatSeverity:
        count = sum(1 for t in threats if t.severity == sev)
        print(f"  {sev.value:10s}: {count}")

    print(f"\n{'=' * 60}")
    print("THREAT-TO-CONTROL MATRIX")
    print(f"{'=' * 60}")
    print(f"  {'Threat':<10s} | {'Category':<25s} | {'Severity':<10s} | "
          f"{'Pattern':<40s}")
    print(f"  {'-'*10}-+-{'-'*25}-+-{'-'*10}-+-{'-'*40}")
    for t in threats:
        cat_short = t.category.value.split(" - ")[1]
        pattern_short = t.privacy_pattern[:40]
        print(f"  {t.threat_id:<10s} | {cat_short:<25s} | "
              f"{t.severity.value:<10s} | {pattern_short}")


if __name__ == "__main__":
    threats = analyze_linddun_threats()
    print_linddun_report(threats)

Expected output (truncated):

============================================================
LINDDUN PRIVACY THREAT ANALYSIS -- WorkSight AI
Total Threats Identified: 11
============================================================

============================================================
  L - Linkability
============================================================

  [LT-L01] Cross-stream behavioral linkage
    Severity:       High
    Description:    An analyst with access to multiple data streams ...
    Attack:         Attacker observes that pseudonym X in the keystroke ...
    Pattern:        Mix Zone / Unlinkability Pattern -- use different ...
    Residual Risk:  MEDIUM -- temporal correlation still possible

  [LT-L02] Small team re-linkage
    Severity:       High
    ...

THREAT SUMMARY BY SEVERITY
  Low       : 1
  Medium    : 4
  High      : 5
  Critical  : 1

THREAT-TO-CONTROL MATRIX
  Threat     | Category                  | Severity   | Pattern
  -----------+--------------------------+-----------+---...
  LT-L01     | Linkability               | High       | Mix Zone / Unlinkability Pattern ...
  LT-L02     | Linkability               | High       | k-Anonymity enforcement ...
  LT-I01     | Identifiability           | High       | Differential Privacy ...
  ...

Step 5.3: LINDDUN Threat-to-Control Mapping

Map each LINDDUN threat to specific privacy-enhancing technologies (PETs) and controls:

Threat ID LINDDUN Category Privacy Pattern Technical Control Implementation Priority
LT-L01 Linkability Mix Zone Separate pseudonym keys per stream P1 -- Before launch
LT-L02 Linkability k-Anonymity Query engine minimum group size=10 P1 -- Before launch
LT-I01 Identifiability Differential Privacy Add calibrated noise to keystroke data P2 -- Within 3 months
LT-I02 Identifiability Generalization Replace app names with categories P2 -- Within 3 months
LT-Nr01 Non-repudiation Plausible Deniability Random noise in activity logs P3 -- Within 6 months
LT-D01 Detectability Transparency Employee privacy dashboard P1 -- Before launch
LT-Di01 Disclosure Separation of Duties Split key access from data access P1 -- Before launch
LT-Di02 Disclosure DP-SGD in ML Differentially private model training P2 -- Within 3 months
LT-U01 Unawareness Layered Notice Interactive privacy explainer for employees P1 -- Before launch
LT-Nc01 Non-compliance Rights Automation Automated opt-out pipeline P1 -- Before launch
LT-Nc02 Non-compliance Layered Notice Complete Art 13 privacy notice P1 -- Before launch

Critical LINDDUN Finding

The LINDDUN analysis reveals that the most critical threat is LT-Di01 (Insider disclosure) at CRITICAL severity. The current design does not adequately separate access to pseudonymization keys from access to pseudonymized data. A single privileged insider could re-identify all employees. Separation of duties must be implemented before launch.


Phase 6: Compliance Validation

Step 6.1: GDPR Compliance Checker

Build an automated compliance checker that validates key GDPR requirements:

"""
GDPR Compliance Validation Engine for WorkSight AI DPIA.
Checks compliance with Articles 25, 30, 35, and related provisions.
All data is 100% synthetic.
"""

import json
from dataclasses import dataclass, field
from typing import List, Dict, Optional
from datetime import datetime
from enum import Enum


class ComplianceStatus(Enum):
    """Compliance check status."""
    COMPLIANT = "Compliant"
    PARTIALLY_COMPLIANT = "Partially Compliant"
    NON_COMPLIANT = "Non-Compliant"
    NOT_APPLICABLE = "Not Applicable"


class CompliancePriority(Enum):
    """Remediation priority."""
    CRITICAL = "Critical -- must fix before launch"
    HIGH = "High -- fix within 30 days"
    MEDIUM = "Medium -- fix within 90 days"
    LOW = "Low -- fix within 180 days"


@dataclass
class ComplianceCheck:
    """Individual compliance requirement check."""
    check_id: str
    article: str
    requirement: str
    description: str
    status: ComplianceStatus
    evidence: str
    gaps: List[str] = field(default_factory=list)
    remediation: List[str] = field(default_factory=list)
    priority: Optional[CompliancePriority] = None
    responsible: str = ""
    target_date: str = ""


def run_compliance_checks() -> List[ComplianceCheck]:
    """Run all GDPR compliance checks for WorkSight AI."""

    checks = []

    # === ARTICLE 25: Data Protection by Design and by Default ===
    checks.append(ComplianceCheck(
        check_id="CC-001",
        article="Article 25(1) -- Data Protection by Design",
        requirement="Implement appropriate technical and organisational "
                    "measures designed to implement data-protection "
                    "principles effectively",
        description="Verify that privacy controls are built into the "
                    "system architecture, not bolted on afterward",
        status=ComplianceStatus.PARTIALLY_COMPLIANT,
        evidence=(
            "Pseudonymization engine integrated into data pipeline. "
            "Encryption at rest (AES-256-GCM) and in transit (TLS 1.3). "
            "Minimization rules enforced at collection point. "
            "However: separation of duties incomplete, differential "
            "privacy not yet implemented, automated rights mechanism "
            "missing."
        ),
        gaps=[
            "Separation of duties between key management and data access",
            "Differential privacy for keystroke biometric risk",
            "Automated data subject rights (Art 21 objection) pipeline",
            "Application category generalization not yet implemented",
        ],
        remediation=[
            "Implement HSM-based key management with split access",
            "Deploy differential privacy noise injection in keystroke pipeline",
            "Build automated opt-out API endpoint",
            "Replace app names with category labels in collection agent",
        ],
        priority=CompliancePriority.CRITICAL,
        responsible="Marcus Brandt (CISO) + Raj Patel (ML Engineer)",
        target_date="2026-05-15",
    ))

    checks.append(ComplianceCheck(
        check_id="CC-002",
        article="Article 25(2) -- Data Protection by Default",
        requirement="Ensure that by default only personal data necessary "
                    "for each specific purpose is processed",
        description="Verify that the system collects the minimum data "
                    "needed and does not process beyond stated purposes",
        status=ComplianceStatus.PARTIALLY_COMPLIANT,
        evidence=(
            "Keystroke agent collects timing only (not content). "
            "Email integration metadata-only (no content access). "
            "Badge data limited to floor level. "
            "However: screen capture proposed (REJECTED by DPO but "
            "not yet removed from codebase). Application names "
            "collected in full (should be generalized to categories)."
        ),
        gaps=[
            "Screen capture code still present (must be removed entirely)",
            "Application names should be generalized to categories",
        ],
        remediation=[
            "Remove all screen capture code from endpoint agent",
            "Implement app name -> category mapping at collection point",
        ],
        priority=CompliancePriority.CRITICAL,
        responsible="Development team",
        target_date="2026-05-01",
    ))

    # === ARTICLE 30: Records of Processing Activities ===
    checks.append(ComplianceCheck(
        check_id="CC-003",
        article="Article 30(1) -- Records of Processing Activities",
        requirement="Maintain records of processing activities containing "
                    "all information specified in Article 30(1)(a-g)",
        description="Verify completeness of the processing activity register",
        status=ComplianceStatus.COMPLIANT,
        evidence=(
            "Processing Activity Register (PA-001 through PA-005) "
            "documents: controller identity, DPO contact, purposes, "
            "categories of data subjects and personal data, recipients, "
            "third-country transfers, retention periods, and security "
            "measures. Register generated by processing_activities.py "
            "and maintained in version control."
        ),
        gaps=[],
        remediation=[],
        responsible="Dr. Elke Krause (DPO)",
        target_date="N/A -- currently compliant",
    ))

    # === ARTICLE 35: DPIA ===
    checks.append(ComplianceCheck(
        check_id="CC-004",
        article="Article 35(1) -- DPIA Execution",
        requirement="Carry out a DPIA where processing is likely to "
                    "result in a high risk to the rights and freedoms "
                    "of natural persons",
        description="Verify that a DPIA has been conducted with all "
                    "required elements",
        status=ComplianceStatus.COMPLIANT,
        evidence=(
            "DPIA (DPIA-WS-2026-001) conducted covering: systematic "
            "description of processing (Section 1), necessity and "
            "proportionality assessment (Section 2), risk assessment "
            "(Section 3, 7 risks identified and scored), and measures "
            "to address risks (Section 4). DPIA reviewed by DPO."
        ),
        gaps=[],
        remediation=[],
        responsible="Dr. Elke Krause (DPO)",
        target_date="N/A -- currently compliant",
    ))

    checks.append(ComplianceCheck(
        check_id="CC-005",
        article="Article 35(7) -- DPIA Content Requirements",
        requirement="DPIA must contain: systematic description, necessity "
                    "assessment, risk assessment, and measures to address "
                    "risks",
        description="Verify DPIA document completeness against Art 35(7)",
        status=ComplianceStatus.COMPLIANT,
        evidence=(
            "DPIA-WS-2026-001 contains all four required elements: "
            "(a) systematic description of processing and purposes, "
            "(b) necessity and proportionality assessment, "
            "(c) risk assessment per Article 35(7)(c), "
            "(d) measures to address risks including safeguards."
        ),
        gaps=[],
        remediation=[],
        responsible="Dr. Elke Krause (DPO)",
        target_date="N/A -- currently compliant",
    ))

    # === ARTICLE 5: Data Protection Principles ===
    checks.append(ComplianceCheck(
        check_id="CC-006",
        article="Article 5(1)(a) -- Lawfulness, Fairness, Transparency",
        requirement="Personal data must be processed lawfully, fairly, "
                    "and in a transparent manner",
        description="Verify legal basis, fairness assessment, and "
                    "transparency measures",
        status=ComplianceStatus.PARTIALLY_COMPLIANT,
        evidence=(
            "Legal basis (legitimate interest) documented with LIA "
            "for each processing activity. Fairness assessed through "
            "Works Council consultation. Privacy notice drafted. "
            "However: privacy notice incomplete per Art 13 requirements, "
            "no employee privacy dashboard, and transparency about "
            "ML scoring logic is insufficient."
        ),
        gaps=[
            "Privacy notice missing: profiling logic, LIA results, "
            "Art 22 safeguards, complaint right details",
            "No interactive transparency dashboard for employees",
            "ML model explainability not communicated to data subjects",
        ],
        remediation=[
            "Complete Art 13 compliant privacy notice",
            "Build employee privacy dashboard showing own data",
            "Create plain-language ML explainer document",
        ],
        priority=CompliancePriority.HIGH,
        responsible="Dr. Elke Krause (DPO) + Legal Counsel",
        target_date="2026-05-15",
    ))

    checks.append(ComplianceCheck(
        check_id="CC-007",
        article="Article 5(1)(b) -- Purpose Limitation",
        requirement="Personal data collected for specified, explicit, "
                    "and legitimate purposes and not further processed "
                    "in a manner incompatible",
        description="Verify purpose limitation controls",
        status=ComplianceStatus.PARTIALLY_COMPLIANT,
        evidence=(
            "Purposes documented in ROPA. Technical controls prevent "
            "some purpose deviation (e.g., team-level only queries). "
            "However: no technical enforcement prevents purpose "
            "creep in management use of aggregated scores. Function "
            "creep risk (R-002) identified as HIGH."
        ),
        gaps=[
            "No technical enforcement of purpose limitation for "
            "management use of scores",
            "No automated detection of purpose deviation",
        ],
        remediation=[
            "Implement purpose tags on all data elements",
            "Automated alert when data accessed outside stated purpose",
            "Quarterly purpose limitation audit",
        ],
        priority=CompliancePriority.HIGH,
        responsible="Marcus Brandt (CISO)",
        target_date="2026-06-01",
    ))

    checks.append(ComplianceCheck(
        check_id="CC-008",
        article="Article 5(1)(e) -- Storage Limitation",
        requirement="Personal data kept in identifiable form only as "
                    "long as necessary for the purposes",
        description="Verify retention policies and deletion mechanisms",
        status=ComplianceStatus.COMPLIANT,
        evidence=(
            "Retention periods defined per data element (7-730 days). "
            "Automated deletion cron job runs daily at 02:00 UTC. "
            "Cryptographic erasure for sensitive elements. "
            "Deletion certificates generated and stored in audit log. "
            "Backup retention aligned with data retention."
        ),
        gaps=[],
        remediation=[],
        responsible="Marcus Brandt (CISO)",
        target_date="N/A -- currently compliant",
    ))

    # === ARTICLE 21: Right to Object ===
    checks.append(ComplianceCheck(
        check_id="CC-009",
        article="Article 21(1) -- Right to Object",
        requirement="Data subject has right to object to processing "
                    "based on legitimate interest, including profiling",
        description="Verify mechanism for employees to exercise right "
                    "to object",
        status=ComplianceStatus.NON_COMPLIANT,
        evidence=(
            "Policy acknowledges right to object. DPO handles requests "
            "manually. However: NO automated mechanism exists. "
            "Current manual process takes ~2 weeks to fully implement "
            "an objection across all data streams. This is too slow "
            "for meaningful exercise of the right."
        ),
        gaps=[
            "No automated opt-out mechanism",
            "Manual process takes 2 weeks (too slow)",
            "No employee-facing self-service portal",
        ],
        remediation=[
            "Build automated opt-out API with propagation to all streams",
            "Target: 24-hour objection implementation",
            "Employee self-service portal for rights management",
        ],
        priority=CompliancePriority.CRITICAL,
        responsible="Development team + Dr. Elke Krause (DPO)",
        target_date="2026-05-01",
    ))

    # === ARTICLE 22: Automated Decision-Making ===
    checks.append(ComplianceCheck(
        check_id="CC-010",
        article="Article 22 -- Automated Individual Decision-Making",
        requirement="Right not to be subject to a decision based solely "
                    "on automated processing which produces legal or "
                    "similarly significant effects",
        description="Verify that no automated individual decisions "
                    "are made based on WorkSight data",
        status=ComplianceStatus.COMPLIANT,
        evidence=(
            "Team-level scoring only -- no individual scores generated. "
            "Human review required before any management action. "
            "No employment decisions (hiring, firing, promotion, "
            "compensation) linked to WorkSight scores. "
            "Technical enforcement: minimum team size 10, query engine "
            "blocks individual-level queries."
        ),
        gaps=[],
        remediation=[],
        responsible="Dr. Elke Krause (DPO)",
        target_date="N/A -- currently compliant",
    ))

    return checks


def generate_compliance_report(checks: List[ComplianceCheck]) -> str:
    """Generate compliance report."""
    lines = []
    lines.append("=" * 70)
    lines.append("GDPR COMPLIANCE VALIDATION REPORT -- WorkSight AI")
    lines.append(f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
    lines.append(f"DPIA Reference: DPIA-WS-2026-001")
    lines.append(f"Total Checks: {len(checks)}")
    lines.append("=" * 70)

    # Summary counts
    compliant = sum(
        1 for c in checks if c.status == ComplianceStatus.COMPLIANT
    )
    partial = sum(
        1 for c in checks
        if c.status == ComplianceStatus.PARTIALLY_COMPLIANT
    )
    non_compliant = sum(
        1 for c in checks if c.status == ComplianceStatus.NON_COMPLIANT
    )

    lines.append(f"\n--- Compliance Summary ---")
    lines.append(f"  Compliant:           {compliant} / {len(checks)}")
    lines.append(f"  Partially Compliant: {partial} / {len(checks)}")
    lines.append(f"  Non-Compliant:       {non_compliant} / {len(checks)}")

    overall_pct = (
        (compliant * 100 + partial * 50) / (len(checks) * 100) * 100
    )
    lines.append(f"  Overall Score:       {overall_pct:.0f}%")

    if non_compliant > 0:
        lines.append(f"\n  *** LAUNCH BLOCKED until non-compliant items "
                     f"are resolved ***")

    # Detail per check
    for c in checks:
        lines.append(f"\n{'~' * 70}")
        lines.append(f"[{c.check_id}] {c.article}")
        lines.append(f"{'~' * 70}")
        lines.append(f"  Status:       {c.status.value}")
        lines.append(f"  Requirement:  {c.requirement}")
        lines.append(f"  Evidence:     {c.evidence}")
        if c.gaps:
            lines.append(f"  Gaps:")
            for g in c.gaps:
                lines.append(f"    - {g}")
        if c.remediation:
            lines.append(f"  Remediation:")
            for r in c.remediation:
                lines.append(f"    + {r}")
        if c.priority:
            lines.append(f"  Priority:     {c.priority.value}")
        lines.append(f"  Responsible:  {c.responsible}")
        lines.append(f"  Target Date:  {c.target_date}")

    # Remediation roadmap
    lines.append(f"\n{'=' * 70}")
    lines.append("REMEDIATION ROADMAP")
    lines.append(f"{'=' * 70}")

    critical_items = [
        c for c in checks if c.priority == CompliancePriority.CRITICAL
    ]
    high_items = [
        c for c in checks if c.priority == CompliancePriority.HIGH
    ]

    if critical_items:
        lines.append(f"\n  CRITICAL (must fix before launch):")
        for c in critical_items:
            lines.append(f"    [{c.check_id}] {c.article}")
            for r in c.remediation:
                lines.append(f"      + {r}")
            lines.append(f"      Target: {c.target_date}")

    if high_items:
        lines.append(f"\n  HIGH (fix within 30 days of launch):")
        for c in high_items:
            lines.append(f"    [{c.check_id}] {c.article}")
            for r in c.remediation:
                lines.append(f"      + {r}")
            lines.append(f"      Target: {c.target_date}")

    lines.append(f"\n{'=' * 70}")
    lines.append("END OF COMPLIANCE REPORT")
    lines.append(f"{'=' * 70}")

    return "\n".join(lines)


if __name__ == "__main__":
    checks = run_compliance_checks()
    report = generate_compliance_report(checks)
    print(report)

Expected output (truncated):

======================================================================
GDPR COMPLIANCE VALIDATION REPORT -- WorkSight AI
Generated: 2026-04-13 11:30:00
DPIA Reference: DPIA-WS-2026-001
Total Checks: 10
======================================================================

--- Compliance Summary ---
  Compliant:           5 / 10
  Partially Compliant: 4 / 10
  Non-Compliant:       1 / 10
  Overall Score:       70%

  *** LAUNCH BLOCKED until non-compliant items are resolved ***

REMEDIATION ROADMAP

  CRITICAL (must fix before launch):
    [CC-001] Article 25(1) -- Data Protection by Design
      + Implement HSM-based key management with split access
      + Deploy differential privacy noise injection
      + Build automated opt-out API endpoint
      Target: 2026-05-15

    [CC-002] Article 25(2) -- Data Protection by Default
      + Remove all screen capture code from endpoint agent
      + Implement app name -> category mapping
      Target: 2026-05-01

    [CC-009] Article 21(1) -- Right to Object
      + Build automated opt-out API with propagation
      + Target: 24-hour objection implementation
      Target: 2026-05-01
...

Step 6.2: Monitoring Metrics Dashboard

Define ongoing privacy monitoring metrics for post-deployment:

"""
Privacy Monitoring Metrics for WorkSight AI.
Defines KPIs for ongoing DPIA compliance monitoring.
"""

from dataclasses import dataclass
from typing import List
from enum import Enum


class MetricCategory(Enum):
    """Privacy metric categories."""
    DATA_MINIMIZATION = "Data Minimization"
    ACCESS_CONTROL = "Access Control"
    RETENTION = "Retention Compliance"
    RIGHTS_MANAGEMENT = "Data Subject Rights"
    INCIDENT = "Incident & Breach"
    TRANSPARENCY = "Transparency"
    BIAS = "Algorithmic Fairness"


@dataclass
class PrivacyMetric:
    """Privacy monitoring metric definition."""
    metric_id: str
    name: str
    category: MetricCategory
    description: str
    target: str
    measurement: str
    frequency: str
    alert_threshold: str
    responsible: str


def define_monitoring_metrics() -> List[PrivacyMetric]:
    """Define privacy monitoring metrics."""

    metrics = []

    metrics.append(PrivacyMetric(
        metric_id="PM-001",
        name="Data Minimization Compliance Rate",
        category=MetricCategory.DATA_MINIMIZATION,
        description="Percentage of data records that pass all "
                    "minimization rules (MIN-001 through MIN-006)",
        target=">= 99.9%",
        measurement="(records passing all rules / total records) x 100",
        frequency="Daily",
        alert_threshold="< 99.5% triggers immediate investigation",
        responsible="Privacy Engineer",
    ))

    metrics.append(PrivacyMetric(
        metric_id="PM-002",
        name="Unauthorized Access Attempts",
        category=MetricCategory.ACCESS_CONTROL,
        description="Number of blocked access attempts where a user "
                    "tried to access data outside their RBAC permissions",
        target="< 5 per week (indicates adequate training)",
        measurement="Count of DENIED access events in audit log",
        frequency="Daily",
        alert_threshold="> 10/day triggers security review",
        responsible="CISO",
    ))

    metrics.append(PrivacyMetric(
        metric_id="PM-003",
        name="Retention Policy Compliance",
        category=MetricCategory.RETENTION,
        description="Percentage of data elements deleted within 24 "
                    "hours of reaching retention expiry",
        target="100%",
        measurement="(on-time deletions / total due deletions) x 100",
        frequency="Daily",
        alert_threshold="Any non-deletion triggers CRITICAL alert",
        responsible="Privacy Engineer",
    ))

    metrics.append(PrivacyMetric(
        metric_id="PM-004",
        name="Article 21 Objection Response Time",
        category=MetricCategory.RIGHTS_MANAGEMENT,
        description="Time from employee objection submission to "
                    "complete data exclusion across all streams",
        target="< 24 hours",
        measurement="Hours from submission to confirmed exclusion",
        frequency="Per event",
        alert_threshold="> 48 hours triggers DPO escalation",
        responsible="DPO",
    ))

    metrics.append(PrivacyMetric(
        metric_id="PM-005",
        name="Privacy Incidents",
        category=MetricCategory.INCIDENT,
        description="Number of privacy incidents (unauthorized access, "
                    "data leaks, minimization failures) per month",
        target="0",
        measurement="Count of confirmed privacy incidents",
        frequency="Monthly",
        alert_threshold="Any incident triggers DPIA review",
        responsible="DPO + CISO",
    ))

    metrics.append(PrivacyMetric(
        metric_id="PM-006",
        name="Employee Privacy Notice Acknowledgment",
        category=MetricCategory.TRANSPARENCY,
        description="Percentage of monitored employees who have "
                    "acknowledged the privacy notice",
        target="100%",
        measurement="(acknowledged / total monitored) x 100",
        frequency="Monthly",
        alert_threshold="< 95% triggers HR follow-up",
        responsible="HR Director",
    ))

    metrics.append(PrivacyMetric(
        metric_id="PM-007",
        name="ML Bias Variance",
        category=MetricCategory.BIAS,
        description="Maximum variance in ML scores across teams with "
                    "different demographic compositions",
        target="< 5% variance",
        measurement="Max score variance across demographic groups",
        frequency="Quarterly",
        alert_threshold="> 10% variance triggers model retraining",
        responsible="ML Engineer + DPO",
    ))

    metrics.append(PrivacyMetric(
        metric_id="PM-008",
        name="Pseudonymization Key Rotation Compliance",
        category=MetricCategory.DATA_MINIMIZATION,
        description="Percentage of pseudonymization keys rotated "
                    "within the 30-day rotation schedule",
        target="100%",
        measurement="(on-time rotations / total due rotations) x 100",
        frequency="Monthly",
        alert_threshold="Any missed rotation triggers CRITICAL alert",
        responsible="Privacy Engineer",
    ))

    metrics.append(PrivacyMetric(
        metric_id="PM-009",
        name="k-Anonymity Enforcement Rate",
        category=MetricCategory.DATA_MINIMIZATION,
        description="Percentage of queries that correctly enforce the "
                    "minimum group size (k>=10) requirement",
        target="100%",
        measurement="(enforced queries / total queries) x 100",
        frequency="Daily",
        alert_threshold="Any bypass triggers CRITICAL alert + "
                        "automatic query kill",
        responsible="Privacy Engineer + CISO",
    ))

    metrics.append(PrivacyMetric(
        metric_id="PM-010",
        name="DPIA Review Currency",
        category=MetricCategory.TRANSPARENCY,
        description="Days since last DPIA review",
        target="< 90 days (quarterly review cycle)",
        measurement="Days since last review completion",
        frequency="Monthly check",
        alert_threshold="> 100 days triggers DPO escalation",
        responsible="DPO",
    ))

    return metrics


def print_metrics_dashboard(metrics: List[PrivacyMetric]) -> None:
    """Print metrics dashboard."""
    print("=" * 65)
    print("PRIVACY MONITORING DASHBOARD -- WorkSight AI")
    print(f"Metrics Defined: {len(metrics)}")
    print("=" * 65)

    # Group by category
    by_cat = {}
    for m in metrics:
        cat = m.category.value
        if cat not in by_cat:
            by_cat[cat] = []
        by_cat[cat].append(m)

    for cat, cat_metrics in by_cat.items():
        print(f"\n--- {cat} ---")
        for m in cat_metrics:
            print(f"\n  [{m.metric_id}] {m.name}")
            print(f"    Target:    {m.target}")
            print(f"    Frequency: {m.frequency}")
            print(f"    Alert:     {m.alert_threshold}")
            print(f"    Owner:     {m.responsible}")

    # Summary table
    print(f"\n{'=' * 65}")
    print("METRICS SUMMARY")
    print(f"{'=' * 65}")
    print(f"  {'ID':<8s} | {'Name':<40s} | {'Frequency':<10s} | "
          f"{'Target':<15s}")
    print(f"  {'-'*8}-+-{'-'*40}-+-{'-'*10}-+-{'-'*15}")
    for m in metrics:
        print(f"  {m.metric_id:<8s} | {m.name[:40]:<40s} | "
              f"{m.frequency:<10s} | {m.target[:15]}")


if __name__ == "__main__":
    metrics = define_monitoring_metrics()
    print_metrics_dashboard(metrics)

Step 6.3: Final Compliance Summary

Compliance Validation Summary

Article Requirement Status Action Required
Art 25(1) Data Protection by Design Partially Compliant Implement separation of duties, DP noise, rights API
Art 25(2) Data Protection by Default Partially Compliant Remove screen capture code, generalize app names
Art 30(1) Records of Processing Activities Compliant Maintain register
Art 35(1) DPIA Execution Compliant Schedule quarterly review
Art 35(7) DPIA Content Compliant Maintain completeness
Art 5(1)(a) Lawfulness/Fairness/Transparency Partially Compliant Complete Art 13 notice, build dashboard
Art 5(1)(b) Purpose Limitation Partially Compliant Technical purpose enforcement
Art 5(1)(e) Storage Limitation Compliant Monitor retention compliance
Art 21(1) Right to Object Non-Compliant Build automated opt-out (BLOCKER)
Art 22 Automated Decision-Making Compliant Maintain team-level only

Lab Summary

Key Takeaways

What You Learned

  1. DPIAs are iterative -- the assessment revealed that the screen capture component should be rejected, demonstrating that DPIAs are not rubber-stamp exercises
  2. Legal basis analysis is nuanced -- in an employment context, consent is almost never appropriate due to power imbalance; legitimate interest requires rigorous balancing
  3. Technical controls matter more than policies -- pseudonymization, encryption, k-anonymity enforcement, and automated retention are more reliable than policy controls alone
  4. LINDDUN reveals privacy-specific threats that traditional threat modeling (STRIDE) misses -- linkability, identifiability, and unawareness are privacy-unique concerns
  5. Compliance is not binary -- the 70% compliance score shows that real-world systems often have gaps requiring remediation before launch

Cross-Reference Map

Lab Topic Related Chapter Key Concepts
DPIA Methodology Ch56 -- Privacy Engineering DPIA lifecycle, Article 35, privacy controls
Risk Governance Ch13 -- Security Governance, Privacy & Risk Risk assessment, compliance frameworks, DPO role
LINDDUN Analysis Ch55 -- Threat Modeling Operations Threat modeling frameworks, LINDDUN, STRIDE comparison

Artifacts Produced

Artifact Purpose Format
Processing Activity Register GDPR Art 30 ROPA Python + JSON
Data Flow Diagram System architecture visualization Mermaid
Legal Basis Analysis Art 6(1) justification Python report
Data Inventory Complete PII catalog Python + classification
PII Discovery Scan Automated PII detection Python scanner
DPIA Threshold Assessment Art 35 trigger evaluation Python + 9 criteria
Risk Assessment Likelihood x Impact scoring Python + 7 risks
DPIA Template Structured DPIA document Template
Pseudonymization Engine HMAC-SHA256 with key rotation Python
Encryption Config AES-256-GCM + TLS 1.3 Python + Nginx
RBAC Model 7 roles, 15 permissions Python + matrix
Data Minimization Pipeline 6 minimization rules Python
LINDDUN Analysis 11 privacy threats Python + matrix
Compliance Report 10 GDPR article checks Python + roadmap
Monitoring Metrics 10 privacy KPIs Python + dashboard

Challenge Exercises

Advanced Challenges

  1. Differential Privacy Implementation: Extend the pseudonymization engine to add calibrated Laplace noise to keystroke timing data. What epsilon value preserves utility while preventing biometric identification? Test with epsilon values of 0.1, 1.0, and 10.0.

  2. Article 36 Prior Consultation Simulation: Modify the risk assessment so that one residual risk remains CRITICAL. Draft the prior consultation submission to the supervisory authority per Article 36(3).

  3. Cross-Border Transfer Assessment: Add a scenario where WorkSight AI processing is outsourced to a cloud provider with data centers outside the EU. Update the DPIA to address Chapter V (Articles 44-49) requirements including Standard Contractual Clauses and Transfer Impact Assessment.

  4. Works Council Agreement Draft: Write a mock Works Council agreement (Betriebsvereinbarung) covering all aspects of the WorkSight AI deployment, incorporating the DPIA findings and DPO recommendations.

  5. Incident Response Integration: Design a privacy-specific incident response procedure for a scenario where the pseudonymization key vault is compromised. How quickly can you execute cryptographic erasure across all data stores? Build and test the automation.


Appendix A: GDPR Article Quick Reference

Article Topic Relevance to Lab
Art 4(4) Definition of profiling WorkSight AI constitutes profiling
Art 5 Data protection principles All principles assessed
Art 6(1)(f) Legitimate interest Primary legal basis
Art 9 Special categories Biometric, health data risks
Art 13 Information to data subjects Transparency requirements
Art 21 Right to object Critical compliance gap
Art 22 Automated decision-making Team-level safeguards
Art 25 Data protection by design Architecture assessment
Art 30 Processing activity records ROPA maintained
Art 33 Breach notification (72h) Incident readiness
Art 35 Data protection impact assessment Core lab focus
Art 36 Prior consultation Triggered if high residual risk

Appendix B: LINDDUN Quick Reference

Category Meaning Key Question
L -- Linkability Linking data items to same individual Can records across systems be connected?
I -- Identifiability Identifying a specific individual Can pseudonymized data be re-identified?
Nr -- Non-repudiation Inability to deny actions Are employees unable to deny attributed actions?
D -- Detectability Detecting that data exists Can observers detect monitoring is active?
Di -- Disclosure Unauthorized information release Can data be exfiltrated or leaked?
U -- Unawareness Lack of awareness about processing Do employees understand all processing?
Nc -- Non-compliance Regulatory non-compliance Does the system meet all legal requirements?

Appendix C: Environment Teardown

After completing the lab, clean up all synthetic data:

# Remove all generated files
rm -f processing_activities.json
rm -f dpia_report.json
rm -f compliance_report.json

# Verify no synthetic data remains
echo "Lab 32 cleanup complete. All synthetic data removed."

Production Warning

This lab uses 100% synthetic data. In a real DPIA:

  • Engage qualified legal counsel for legal basis determination
  • Involve your actual DPO in the assessment process
  • Consult your supervisory authority's DPIA guidance and positive/negative lists
  • Follow your organization's DPIA methodology and templates
  • Maintain the DPIA as a living document with regular reviews
  • Never deploy high-risk processing without completing the DPIA and implementing all required controls