Lab 29: Zero Trust Architecture — Implementation & Validation¶
Chapter: 39 — Zero Trust Implementation | 33 — Identity & Access Security | 31 — Network Security Architecture Difficulty: ⭐⭐⭐⭐⭐ Expert Estimated Time: 6–8 hours Prerequisites: Chapter 39, Chapter 33, Chapter 31, identity management fundamentals, network segmentation basics, PKI/certificate concepts
Overview¶
In this lab you will:
- Design and implement Conditional Access policies — configure risk-based authentication, enforce MFA for privileged operations, build device compliance gates, and validate policy evaluation order in Azure AD/Entra ID environments
- Establish device trust and posture verification — implement certificate-based device authentication, configure endpoint health attestation checks, enforce compliance-based access decisions, and detect non-compliant device access attempts
- Build microsegmentation controls — design network segment policies, implement east-west traffic restrictions using software-defined perimeters, test segment isolation boundaries, and validate lateral movement prevention
- Deploy application-level zero trust — configure identity-aware proxy (IAP) for web applications, implement mutual TLS (mTLS) in service mesh architectures, validate JWT token claims for fine-grained authorization, and enforce application-layer access decisions
- Implement continuous verification and anomaly detection — build session re-evaluation triggers, configure behavioral analytics for anomalous access patterns, design step-up authentication workflows, and monitor for trust degradation events
- Detect zero trust bypass attempts — identify Conditional Access policy evasion, detect token replay and session hijacking, find device compliance spoofing, and build comprehensive detection engineering for zero trust environments
Synthetic Data Only
All data in this lab is 100% synthetic and fictional. All IP addresses use RFC 5737 (192.0.2.x, 198.51.100.x, 203.0.113.x) or RFC 1918 (10.x, 172.16.x, 192.168.x) reserved ranges. All domains use *.example or *.example.com. All credentials are testuser/REDACTED or admin/REDACTED. All JWT tokens, API keys, and secrets are completely fictitious. This lab is for defensive education only — never use these techniques against systems you do not own or without explicit written authorization.
Scenario¶
Engagement Brief — Apex Financial Services
Organization: Apex Financial Services (fictional) Domain: apex.example.com (SYNTHETIC) Identity Provider: login.apex.example.com — 192.0.2.10 (SYNTHETIC) Conditional Access Endpoint: ca.apex.example.com — 192.0.2.11 (SYNTHETIC) Device Management: mdm.apex.example.com — 192.0.2.12 (SYNTHETIC) Application Gateway: gateway.apex.example.com — 192.0.2.13 (SYNTHETIC) Service Mesh Control Plane: mesh.apex.example.com — 10.10.1.10 (SYNTHETIC) Internal App Tier: app.internal.apex.example.com — 10.10.10.0/24 (SYNTHETIC) Database Tier: db.internal.apex.example.com — 10.10.20.0/24 (SYNTHETIC) Monitoring/SIEM: siem.apex.example.com — 10.10.30.10 (SYNTHETIC) Engagement Type: Zero trust architecture assessment — implementation and validation Scope: Identity layer, device trust, network microsegmentation, application-layer controls Out of Scope: Physical security, social engineering, third-party SaaS Test Window: 2026-06-02 08:00 – 2026-06-06 20:00 UTC Emergency Contact: soc@apex.example.com (SYNTHETIC)
Summary: Apex Financial Services is a mid-size financial institution undergoing zero trust transformation. They have partially deployed Conditional Access policies, device compliance checks, and microsegmentation across their hybrid environment. The CISO has authorized a comprehensive zero trust validation exercise to verify policy enforcement, test bypass scenarios, identify gaps in continuous verification, and deliver detection engineering recommendations. The goal is to validate that "never trust, always verify" is enforced at every layer — identity, device, network, application, and data.
Certification Relevance¶
Certification Mapping
This lab maps to objectives in the following certifications:
| Certification | Relevant Domains |
|---|---|
| CompTIA Security+ (SY0-701) | Domain 1: General Security Concepts (12%), Domain 3: Security Architecture (18%) |
| CompTIA CySA+ (CS0-003) | Domain 1: Security Operations (33%), Domain 4: Reporting and Communication (23%) |
| CompTIA CASP+ (CAS-004) | Domain 1: Security Architecture (29%), Domain 2: Security Operations (30%) |
| SC-200 (Microsoft Security Operations Analyst) | KQL Detection, Conditional Access Monitoring, Sentinel Analytics |
| SC-300 (Microsoft Identity and Access Administrator) | Conditional Access, Identity Protection, Device Compliance |
| CCSP (Certified Cloud Security Professional) | Domain 3: Cloud Platform and Infrastructure Security, Domain 4: Cloud Application Security |
| CISSP | Domain 4: Communication and Network Security, Domain 5: Identity and Access Management |
Prerequisites¶
Required Tools¶
| Tool | Purpose | Version |
|---|---|---|
| PowerShell | Azure AD/Entra ID policy management | 7.4+ |
| Azure CLI | Cloud resource management | 2.60+ |
| Microsoft Graph PowerShell SDK | Identity and device management | 2.x |
| curl / httpie | HTTP request testing | Latest |
| Python 3 + requests | Automated validation scripts | 3.10+ |
| kubectl | Kubernetes network policy testing | 1.28+ |
| Istio CLI (istioctl) | Service mesh mTLS validation | 1.20+ |
| jq | JSON parsing | 1.7+ |
| nmap | Network segmentation validation | 7.94+ |
| Wireshark / tshark | Traffic analysis | 4.2+ |
Required Knowledge¶
- Chapter 39: Zero Trust Implementation
- Chapter 33: Identity & Access Security
- Chapter 31: Network Security Architecture
- Chapter 5: Detection Engineering at Scale
- Lab 27: Kubernetes Attack & Defense
- Lab 28: API Security Testing
Lab Environment Setup¶
# Clone the lab environment repository
git clone https://github.com/nexus-secops/lab29-zero-trust.example.git
cd lab29-zero-trust
# Install Python dependencies
pip install -r requirements.txt
# Verify Azure CLI authentication (use synthetic tenant)
az login --tenant apex-synthetic.example.com
# Install Microsoft Graph PowerShell module
Install-Module Microsoft.Graph -Scope CurrentUser -Force
# Verify kubectl context
kubectl config use-context lab29-zero-trust
# Verify Istio installation
istioctl verify-install
Lab Environment Options
This lab can be completed using:
- Azure AD/Entra ID Free Tier — for Conditional Access policy exercises
- Local Kubernetes cluster (minikube/kind) — for microsegmentation and service mesh exercises
- Simulated environment — all exercises include synthetic log data for detection query development without cloud access
Objectives¶
- Design Conditional Access policies that enforce risk-based authentication, MFA requirements, and location-based access restrictions for a financial services environment
- Implement device compliance verification using certificate-based authentication, endpoint health attestation, and compliance-driven access decisions
- Build and validate microsegmentation using Kubernetes NetworkPolicies, software-defined perimeters, and east-west traffic control to prevent lateral movement
- Deploy application-level zero trust with identity-aware proxy, mutual TLS in service mesh, and JWT-based fine-grained authorization
- Configure continuous verification including session re-evaluation, behavioral analytics, and step-up authentication for anomalous activity
- Detect zero trust bypass attempts by building KQL and SPL detection queries for policy evasion, token replay, device spoofing, and lateral movement
- Map findings to MITRE ATT&CK framework and produce actionable remediation recommendations
Phase 1: Identity & Conditional Access¶
1.1 Understanding Conditional Access Architecture¶
Conditional Access is the core policy engine of zero trust identity verification. Every access request is evaluated against a set of conditions — user identity, device state, location, application sensitivity, and real-time risk — before a trust decision is made.
The evaluation follows the zero trust decision flow:
┌─────────────────────────────────────────────────────────────────┐
│ ZERO TRUST POLICY ENGINE │
│ │
│ Access Request │
│ │ │
│ ▼ │
│ ┌─────────────┐ ┌──────────────┐ ┌───────────────────┐ │
│ │ Identity │───▶│ Device │───▶│ Application │ │
│ │ Verification│ │ Compliance │ │ Risk Assessment │ │
│ └──────┬──────┘ └──────┬───────┘ └────────┬──────────┘ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌─────────────┐ ┌──────────────┐ ┌───────────────────┐ │
│ │ Location │ │ Session │ │ Data │ │
│ │ Context │ │ Risk Score │ │ Classification │ │
│ └──────┬──────┘ └──────┬───────┘ └────────┬──────────┘ │
│ │ │ │ │
│ └──────────┬───────┘──────────────────────┘ │
│ ▼ │
│ ┌─────────────────────┐ │
│ │ TRUST DECISION │ │
│ │ Allow │ MFA │ Block│ │
│ └─────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
1.2 Exercise: Design Conditional Access Policies¶
Step 1: Define the Policy Framework¶
Create a Conditional Access policy set for Apex Financial Services. Each policy targets a specific zero trust signal.
# Connect to Microsoft Graph (synthetic tenant)
Connect-MgGraph -Scopes "Policy.ReadWrite.ConditionalAccess","Directory.Read.All"
# ============================================================
# Policy 1: Require MFA for All Users — Baseline Policy
# ============================================================
$params = @{
DisplayName = "ZT-CA-001: Require MFA for All Users"
State = "enabledForReportingButNotEnforced" # Start in report-only
Conditions = @{
Users = @{
IncludeUsers = @("All")
ExcludeUsers = @()
ExcludeGroups = @("BreakGlass-EmergencyAccess") # Emergency access accounts
}
Applications = @{
IncludeApplications = @("All")
}
ClientAppTypes = @("browser", "mobileAppsAndDesktopClients")
}
GrantControls = @{
Operator = "OR"
BuiltInControls = @("mfa")
}
}
New-MgIdentityConditionalAccessPolicy -BodyParameter $params
# ============================================================
# Policy 2: Block Legacy Authentication — Kill Insecure Protocols
# ============================================================
$legacyBlockPolicy = @{
DisplayName = "ZT-CA-002: Block Legacy Authentication"
State = "enabled"
Conditions = @{
Users = @{
IncludeUsers = @("All")
}
Applications = @{
IncludeApplications = @("All")
}
ClientAppTypes = @("exchangeActiveSync", "other")
}
GrantControls = @{
Operator = "OR"
BuiltInControls = @("block")
}
}
New-MgIdentityConditionalAccessPolicy -BodyParameter $legacyBlockPolicy
Legacy Authentication Risk
Legacy authentication protocols (IMAP, POP3, SMTP AUTH, Exchange ActiveSync basic auth) do not support MFA. Attackers specifically target these protocols because they bypass Conditional Access MFA requirements. Blocking legacy auth is a critical first step in zero trust — Microsoft reports that over 99% of password spray attacks use legacy protocols.
# ============================================================
# Policy 3: Risk-Based Conditional Access — Adaptive MFA
# ============================================================
$riskBasedPolicy = @{
DisplayName = "ZT-CA-003: Risk-Based Access Control"
State = "enabledForReportingButNotEnforced"
Conditions = @{
Users = @{
IncludeUsers = @("All")
ExcludeGroups = @("BreakGlass-EmergencyAccess")
}
Applications = @{
IncludeApplications = @("All")
}
SignInRiskLevels = @("high", "medium")
UserRiskLevels = @("high")
}
GrantControls = @{
Operator = "AND"
BuiltInControls = @("mfa", "passwordChange")
}
SessionControls = @{
SignInFrequency = @{
Value = 1
Type = "hours"
IsEnabled = $true
}
}
}
New-MgIdentityConditionalAccessPolicy -BodyParameter $riskBasedPolicy
# ============================================================
# Policy 4: Location-Based Access — Block Untrusted Locations
# ============================================================
# First, create a named location for trusted office networks
$trustedLocation = @{
"@odata.type" = "#microsoft.graph.ipNamedLocation"
DisplayName = "Apex-TrustedOffices"
IsTrusted = $true
IpRanges = @(
@{
"@odata.type" = "#microsoft.graph.iPv4CidrRange"
CidrAddress = "198.51.100.0/24" # SYNTHETIC — HQ office
}
@{
"@odata.type" = "#microsoft.graph.iPv4CidrRange"
CidrAddress = "203.0.113.0/24" # SYNTHETIC — Branch office
}
)
}
New-MgIdentityConditionalAccessNamedLocation -BodyParameter $trustedLocation
# Block access from non-trusted locations for privileged roles
$locationPolicy = @{
DisplayName = "ZT-CA-004: Block Untrusted Locations for Privileged Roles"
State = "enabled"
Conditions = @{
Users = @{
IncludeRoles = @(
"62e90394-69f5-4237-9190-012177145e10" # Global Administrator
"194ae4cb-b126-40b2-bd5b-6091b380977d" # Security Administrator
"f28a1f50-f6e7-4571-818b-6a12f2af6b6c" # SharePoint Administrator
)
}
Applications = @{
IncludeApplications = @("All")
}
Locations = @{
IncludeLocations = @("All")
ExcludeLocations = @("Apex-TrustedOffices")
}
}
GrantControls = @{
Operator = "OR"
BuiltInControls = @("block")
}
}
New-MgIdentityConditionalAccessPolicy -BodyParameter $locationPolicy
# ============================================================
# Policy 5: Require Compliant Device for Sensitive Applications
# ============================================================
$compliantDevicePolicy = @{
DisplayName = "ZT-CA-005: Require Compliant Device for Finance Apps"
State = "enabledForReportingButNotEnforced"
Conditions = @{
Users = @{
IncludeGroups = @("Finance-Department", "Executive-Team")
}
Applications = @{
IncludeApplications = @(
"00000003-0000-0ff1-ce00-000000000000" # SharePoint Online
"00000002-0000-0ff1-ce00-000000000000" # Exchange Online
)
}
Platforms = @{
IncludePlatforms = @("all")
}
}
GrantControls = @{
Operator = "AND"
BuiltInControls = @("mfa", "compliantDevice")
}
}
New-MgIdentityConditionalAccessPolicy -BodyParameter $compliantDevicePolicy
Step 2: Validate Policy Evaluation Order¶
Conditional Access policies are evaluated in a specific order. Understanding evaluation order is critical for preventing policy conflicts and bypass scenarios.
# ============================================================
# Enumerate all Conditional Access policies and their states
# ============================================================
$allPolicies = Get-MgIdentityConditionalAccessPolicy -All | Sort-Object DisplayName
$allPolicies | ForEach-Object {
[PSCustomObject]@{
Name = $_.DisplayName
State = $_.State
GrantAction = ($_.GrantControls.BuiltInControls -join ", ")
UserScope = if ($_.Conditions.Users.IncludeUsers -contains "All") { "All Users" }
elseif ($_.Conditions.Users.IncludeGroups) { "Groups: $($_.Conditions.Users.IncludeGroups -join ', ')" }
elseif ($_.Conditions.Users.IncludeRoles) { "Roles: $($_.Conditions.Users.IncludeRoles.Count) roles" }
else { "Custom" }
AppScope = if ($_.Conditions.Applications.IncludeApplications -contains "All") { "All Apps" }
else { "Specific Apps: $($_.Conditions.Applications.IncludeApplications.Count)" }
}
} | Format-Table -AutoSize
Policy Evaluation Logic
Conditional Access policies use additive evaluation:
- All applicable policies are evaluated (not just the first match)
- If ANY policy blocks → access is blocked (block wins)
- If multiple policies grant → ALL grant conditions must be satisfied (AND logic)
- Report-only policies log but do not enforce
- Exclusions (users/groups) take precedence within a single policy
# ============================================================
# Test policy evaluation with What-If analysis
# ============================================================
# Simulate: testuser accessing Exchange Online from untrusted location
$whatIfParams = @{
ConditionalAccessWhatIfConditions = @{
UserPrincipalName = "testuser@apex.example.com"
CloudAppId = "00000002-0000-0ff1-ce00-000000000000" # Exchange Online
IpAddress = "192.0.2.200" # SYNTHETIC — untrusted IP
ClientAppType = "browser"
DevicePlatform = "windows"
SignInRiskLevel = "low"
UserRiskLevel = "none"
DeviceInfo = @{
IsCompliant = $false
TrustType = "AzureAd"
}
}
}
# Invoke What-If evaluation
$whatIfResult = Invoke-MgGraphRequest -Method POST `
-Uri "https://graph.microsoft.com/beta/identity/conditionalAccess/evaluate" `
-Body $whatIfParams
# Display results
$whatIfResult.appliedPolicies | ForEach-Object {
Write-Host "Policy: $($_.displayName)" -ForegroundColor Yellow
Write-Host " Result: $($_.result)" -ForegroundColor $(if ($_.result -eq "block") { "Red" } else { "Green" })
Write-Host " Reasons: $($_.reasons -join ', ')"
Write-Host ""
}
1.3 Exercise: MFA Enforcement Validation¶
Step 3: Test MFA Bypass Scenarios¶
# ============================================================
# Scenario A: Attempt authentication without MFA
# ============================================================
# Simulate ROPC (Resource Owner Password Credential) flow
# This is a legacy flow that may bypass MFA if not properly blocked
$tokenEndpoint = "https://login.apex.example.com/oauth2/v2.0/token"
$ropcBody = @{
grant_type = "password"
client_id = "00000000-0000-0000-0000-000000000001" # SYNTHETIC
username = "testuser@apex.example.com" # SYNTHETIC
password = "REDACTED"
scope = "https://graph.example.com/.default"
}
# Expected: This should be BLOCKED by ZT-CA-002 (legacy auth block)
try {
$response = Invoke-RestMethod -Method POST -Uri $tokenEndpoint -Body $ropcBody
Write-Warning "FINDING: ROPC flow succeeded — legacy auth NOT blocked!"
Write-Host "Token acquired without MFA — Zero Trust violation detected"
} catch {
Write-Host "SUCCESS: ROPC flow blocked as expected" -ForegroundColor Green
Write-Host "Error: $($_.Exception.Message)"
}
# ============================================================
# Scenario B: Test MFA enforcement for interactive sign-in
# ============================================================
# Simulate OAuth2 Authorization Code flow (interactive)
$authUrl = "https://login.apex.example.com/oauth2/v2.0/authorize?" +
"client_id=00000000-0000-0000-0000-000000000001" +
"&response_type=code" +
"&redirect_uri=https://app.apex.example.com/callback" +
"&scope=openid profile email" +
"&state=$(New-Guid)" +
"&prompt=login"
Write-Host "Authorization URL (inspect redirect for MFA challenge):"
Write-Host $authUrl
# Expected behavior:
# 1. User authenticates with username/password
# 2. Conditional Access evaluates → triggers MFA challenge
# 3. User completes MFA (phone/authenticator app)
# 4. Authorization code returned only after MFA completion
1.4 Detection: Conditional Access Monitoring¶
KQL Detection Queries¶
// ============================================================
// KQL: Conditional Access Policy Failures — All Blocked Sign-ins
// ============================================================
SigninLogs
| where TimeGenerated > ago(24h)
| where ConditionalAccessStatus == "failure"
| extend CAPolicy = parse_json(ConditionalAccessPolicies)
| mv-expand CAPolicy
| where CAPolicy.result == "failure"
| summarize
BlockedAttempts = count(),
UniqueUsers = dcount(UserPrincipalName),
UniqueIPs = dcount(IPAddress)
by PolicyName = tostring(CAPolicy.displayName),
bin(TimeGenerated, 1h)
| order by BlockedAttempts desc
// ============================================================
// KQL: Legacy Authentication Attempts (should be zero in ZT)
// ============================================================
SigninLogs
| where TimeGenerated > ago(7d)
| where ClientAppUsed in ("Exchange ActiveSync", "IMAP4", "POP3",
"SMTP", "Authenticated SMTP", "Other clients",
"Exchange Online PowerShell")
| summarize
Attempts = count(),
SuccessCount = countif(ResultType == 0),
FailCount = countif(ResultType != 0)
by UserPrincipalName, ClientAppUsed, IPAddress,
ConditionalAccessStatus
| where SuccessCount > 0
| order by SuccessCount desc
| project UserPrincipalName, ClientAppUsed, IPAddress,
SuccessCount, FailCount, ConditionalAccessStatus
// ============================================================
// KQL: MFA Registration Anomalies — New MFA Method Added
// ============================================================
AuditLogs
| where TimeGenerated > ago(24h)
| where OperationName in (
"User registered security info",
"User changed default security info",
"Admin registered security info for user"
)
| extend InitiatedByUser = tostring(InitiatedBy.user.userPrincipalName)
| extend TargetUser = tostring(TargetResources[0].userPrincipalName)
| extend ModifiedProperties = TargetResources[0].modifiedProperties
| project TimeGenerated, InitiatedByUser, TargetUser,
OperationName, ModifiedProperties, CorrelationId
| order by TimeGenerated desc
// ============================================================
// KQL: Sign-ins Bypassing Conditional Access (report-only)
// ============================================================
SigninLogs
| where TimeGenerated > ago(24h)
| where ConditionalAccessStatus == "notApplied"
| where ResultType == 0 // Successful sign-in
| extend CAPolicy = parse_json(ConditionalAccessPolicies)
| mv-expand CAPolicy
| where CAPolicy.result == "reportOnlyNotApplied"
or CAPolicy.result == "notApplied"
| summarize
Count = count(),
Policies = make_set(CAPolicy.displayName)
by UserPrincipalName, AppDisplayName, IPAddress,
DeviceDetail_operatingSystem = tostring(DeviceDetail.operatingSystem)
| where Count > 5
| order by Count desc
SPL Detection Queries¶
// ============================================================
// SPL: Conditional Access Policy Failures
// ============================================================
index=azure sourcetype="azure:signinlogs"
| spath "conditionalAccessStatus"
| search conditionalAccessStatus="failure"
| spath output=policyName "conditionalAccessPolicies{}.displayName"
| spath output=policyResult "conditionalAccessPolicies{}.result"
| stats count as BlockedAttempts,
dc(userPrincipalName) as UniqueUsers,
dc(ipAddress) as UniqueIPs
by policyName, policyResult
| sort -BlockedAttempts
// ============================================================
// SPL: Legacy Authentication Attempts
// ============================================================
index=azure sourcetype="azure:signinlogs"
| spath "clientAppUsed"
| search clientAppUsed IN ("Exchange ActiveSync", "IMAP4", "POP3",
"SMTP", "Authenticated SMTP", "Other clients")
| stats count as Attempts,
count(eval(resultType=0)) as SuccessCount,
count(eval(resultType!=0)) as FailCount
by userPrincipalName, clientAppUsed, ipAddress
| where SuccessCount > 0
| sort -SuccessCount
// ============================================================
// SPL: MFA Bypass Detection — Successful Auth Without MFA Claim
// ============================================================
index=azure sourcetype="azure:signinlogs"
| spath "authenticationRequirement"
| spath "mfaDetail.authMethod"
| search resultType=0
| eval hasMFA=if(isnotnull('mfaDetail.authMethod') AND
'mfaDetail.authMethod'!="", 1, 0)
| where hasMFA=0 AND authenticationRequirement="multiFactorAuthentication"
| stats count as BypassCount,
values(clientAppUsed) as ClientApps,
values(ipAddress) as IPs
by userPrincipalName
| sort -BypassCount
Phase 2: Device Compliance & Posture¶
2.1 Understanding Device Trust in Zero Trust¶
In a zero trust architecture, the device is a critical trust signal. A legitimate user on a compromised or non-compliant device should be denied access just as quickly as an illegitimate user on a healthy device. Device trust verification ensures that endpoints meet security baselines before accessing corporate resources.
┌──────────────────────────────────────────────────────────┐
│ DEVICE TRUST EVALUATION │
│ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Certificate │ │ Compliance │ │ Health │ │
│ │ Validation │ │ State │ │ Attestation │ │
│ │ │ │ │ │ │ │
│ │ ✓ Device cert│ │ ✓ OS patched │ │ ✓ SecureBoot │ │
│ │ ✓ CA chain │ │ ✓ AV active │ │ ✓ BitLocker │ │
│ │ ✓ Not revoked│ │ ✓ Firewall on│ │ ✓ TPM valid │ │
│ │ ✓ Not expired│ │ ✓ Encryption │ │ ✓ CodeInteg │ │
│ └──────┬───────┘ └──────┬───────┘ └──────┬───────┘ │
│ │ │ │ │
│ └─────────┬────────┘───────────────────┘ │
│ ▼ │
│ ┌─────────────────┐ │
│ │ DEVICE TRUST │ │
│ │ SCORE: 0-100 │ │
│ │ ──────────── │ │
│ │ Trusted ≥ 80 │ │
│ │ Limited 40-79 │ │
│ │ Blocked < 40 │ │
│ └─────────────────┘ │
└──────────────────────────────────────────────────────────┘
2.2 Exercise: Device Compliance Policy Configuration¶
Step 1: Create Device Compliance Policies¶
# ============================================================
# Windows 10/11 Device Compliance Policy
# ============================================================
$compliancePolicy = @{
"@odata.type" = "#microsoft.graph.windows10CompliancePolicy"
displayName = "ZT-DEVICE-001: Windows Endpoint Compliance"
description = "Zero trust device compliance baseline for Windows endpoints"
scheduledActionsForRule = @(
@{
ruleName = "PasswordRequired"
scheduledActionConfigurations = @(
@{
actionType = "block"
gracePeriodHours = 0
notificationTemplateId = ""
}
)
}
)
# Password requirements
passwordRequired = $true
passwordMinimumLength = 14
passwordRequiredType = "alphanumeric"
passwordMinutesOfInactivityBeforeLock = 5
passwordExpirationDays = 90
passwordPreviousPasswordBlockCount = 12
# Device security
secureBootEnabled = $true
codeIntegrityEnabled = $true
bitLockerEnabled = $true
tpmRequired = $true
# OS version
osMinimumVersion = "10.0.19045.0"
# Defender requirements
defenderEnabled = $true
defenderVersion = ""
signatureOutOfDate = $false
rtpEnabled = $true # Real-time protection
# Firewall
firewallEnabled = $true
# Device health attestation
deviceThreatProtectionEnabled = $true
deviceThreatProtectionRequiredSecurityLevel = "secured"
}
# Create the compliance policy via Graph API
Invoke-MgGraphRequest -Method POST `
-Uri "https://graph.microsoft.com/v1.0/deviceManagement/deviceCompliancePolicies" `
-Body ($compliancePolicy | ConvertTo-Json -Depth 10)
Grace Periods and Remediation
Setting gracePeriodHours = 0 means non-compliant devices are immediately blocked. In production, consider a short grace period (24-72 hours) for first deployment to allow users to remediate. However, for high-security environments like financial services, immediate enforcement is recommended.
Step 2: Certificate-Based Device Authentication¶
# ============================================================
# Configure certificate-based authentication for device trust
# ============================================================
# Step 2a: Create a certificate authority configuration
$certAuthConfig = @{
"@odata.type" = "#microsoft.graph.certificateBasedAuthConfiguration"
certificateAuthorities = @(
@{
isRootAuthority = $true
certificate = "MIIEYzCCA0ugAwIBAgIQAYL4..." # SYNTHETIC — Base64 root CA cert
issuer = "CN=Apex-Root-CA, O=Apex Financial Services, C=US"
issuerSki = "A1B2C3D4E5F6..." # SYNTHETIC
}
@{
isRootAuthority = $false
certificate = "MIIEczCCA1ugAwIBAgIQBZM5..." # SYNTHETIC — Base64 issuing CA cert
issuer = "CN=Apex-Issuing-CA, O=Apex Financial Services, C=US"
issuerSki = "F6E5D4C3B2A1..." # SYNTHETIC
}
)
}
# Upload CA certificates to Azure AD
Invoke-MgGraphRequest -Method POST `
-Uri "https://graph.microsoft.com/v1.0/organization/{tenant-id}/certificateBasedAuthConfiguration" `
-Body ($certAuthConfig | ConvertTo-Json -Depth 10)
# Step 2b: Map certificate fields to user attributes
# Configure certificate user mapping
$certUserMapping = @{
x509CertificateAuthenticationModeConfiguration = @{
x509CertificateAuthenticationDefaultMode = "x509CertificateSingleFactor"
rules = @(
@{
x509CertificateRuleType = "issuerSubject"
identifier = "CN=Apex-Issuing-CA"
x509CertificateAuthenticationMode = "x509CertificateMultiFactor"
}
)
}
certificateUserBindings = @(
@{
x509CertificateField = "PrincipalName"
userProperty = "onPremisesUserPrincipalName"
priority = 1
}
@{
x509CertificateField = "RFC822Name"
userProperty = "userPrincipalName"
priority = 2
}
)
}
Step 3: Endpoint Health Attestation Verification¶
# ============================================================
# Query device compliance state from Intune/Endpoint Manager
# ============================================================
# Get all managed devices and their compliance state
$devices = Get-MgDeviceManagementManagedDevice -All `
-Filter "operatingSystem eq 'Windows'" `
-Select "deviceName,complianceState,lastSyncDateTime,
osVersion,isEncrypted,managedDeviceOwnerType"
# Identify non-compliant devices
$nonCompliantDevices = $devices | Where-Object {
$_.ComplianceState -ne "compliant"
}
Write-Host "`n=== NON-COMPLIANT DEVICE REPORT ===" -ForegroundColor Red
Write-Host "Total Managed Devices: $($devices.Count)"
Write-Host "Non-Compliant: $($nonCompliantDevices.Count)"
Write-Host "Compliance Rate: $([math]::Round((($devices.Count - $nonCompliantDevices.Count) / $devices.Count) * 100, 1))%"
Write-Host ""
$nonCompliantDevices | ForEach-Object {
Write-Host "Device: $($_.DeviceName)" -ForegroundColor Yellow
Write-Host " OS Version: $($_.OsVersion)"
Write-Host " Last Sync: $($_.LastSyncDateTime)"
Write-Host " Encrypted: $($_.IsEncrypted)"
Write-Host " Compliance: $($_.ComplianceState)"
Write-Host ""
}
# ============================================================
# Device Health Attestation — Verify TPM and Secure Boot
# ============================================================
# Query Windows Security Health Attestation
$healthReport = @{
DeviceName = "APEX-WS-1042" # SYNTHETIC
SerialNumber = "SN-APEX-2026-1042" # SYNTHETIC
TPMVersion = "2.0"
SecureBoot = $true
BitLocker = $true
CodeIntegrity = $true
ELAM = $true # Early Launch Anti-Malware
VSM = $true # Virtual Secure Mode
BootDebugging = $false # Should be false in production
OSKernelDebugging = $false
TestSigning = $false
}
# Validate health attestation
function Test-DeviceHealthAttestation {
param([hashtable]$Report)
$failures = @()
if (-not $Report.SecureBoot) { $failures += "SecureBoot disabled" }
if (-not $Report.BitLocker) { $failures += "BitLocker not enabled" }
if (-not $Report.CodeIntegrity) { $failures += "Code Integrity disabled" }
if (-not $Report.ELAM) { $failures += "Early Launch Anti-Malware not present" }
if ($Report.BootDebugging) { $failures += "Boot debugging enabled (suspicious)" }
if ($Report.OSKernelDebugging) { $failures += "Kernel debugging enabled (suspicious)" }
if ($Report.TestSigning) { $failures += "Test signing enabled (suspicious)" }
if ($Report.TPMVersion -lt "2.0") { $failures += "TPM version below 2.0" }
if ($failures.Count -eq 0) {
Write-Host "DEVICE HEALTH: PASSED" -ForegroundColor Green
return $true
} else {
Write-Host "DEVICE HEALTH: FAILED" -ForegroundColor Red
$failures | ForEach-Object { Write-Host " FAIL: $_" -ForegroundColor Red }
return $false
}
}
Test-DeviceHealthAttestation -Report $healthReport
2.3 Detection: Device Compliance Monitoring¶
KQL Detection Queries¶
// ============================================================
// KQL: Non-Compliant Device Access Attempts
// ============================================================
SigninLogs
| where TimeGenerated > ago(24h)
| where ResultType == 0 // Successful sign-in
| where DeviceDetail.isCompliant == false
or isempty(DeviceDetail.isCompliant)
| project TimeGenerated, UserPrincipalName,
AppDisplayName, IPAddress,
DeviceName = tostring(DeviceDetail.displayName),
DeviceOS = tostring(DeviceDetail.operatingSystem),
DeviceTrustType = tostring(DeviceDetail.trustType),
IsCompliant = tostring(DeviceDetail.isCompliant),
IsManaged = tostring(DeviceDetail.isManaged),
ConditionalAccessStatus
| summarize
AccessCount = count(),
Apps = make_set(AppDisplayName),
IPs = make_set(IPAddress)
by UserPrincipalName, DeviceName, DeviceOS,
IsCompliant, IsManaged
| order by AccessCount desc
// ============================================================
// KQL: Device Compliance State Changes — Newly Non-Compliant
// ============================================================
IntuneDeviceComplianceOrg
| where TimeGenerated > ago(24h)
| where ComplianceState == "noncompliant"
| extend PreviousState = prev(ComplianceState, 1)
| where PreviousState == "compliant" or isempty(PreviousState)
| project TimeGenerated, DeviceName, OS, OSVersion,
ComplianceState, OwnerType,
InGracePeriodUntil, LastContact
| order by TimeGenerated desc
// ============================================================
// KQL: Unmanaged Device Access to Sensitive Applications
// ============================================================
SigninLogs
| where TimeGenerated > ago(7d)
| where ResultType == 0
| where DeviceDetail.isManaged == false
or isempty(DeviceDetail.isManaged)
| where AppDisplayName in (
"Microsoft SharePoint Online",
"Microsoft Exchange Online",
"Microsoft Teams",
"Apex Financial Portal" // SYNTHETIC app
)
| summarize
UnmanagedAccessCount = count(),
UniqueDevices = dcount(tostring(DeviceDetail.displayName)),
FirstSeen = min(TimeGenerated),
LastSeen = max(TimeGenerated)
by UserPrincipalName, AppDisplayName,
DeviceOS = tostring(DeviceDetail.operatingSystem)
| order by UnmanagedAccessCount desc
SPL Detection Queries¶
// ============================================================
// SPL: Non-Compliant Device Access Attempts
// ============================================================
index=azure sourcetype="azure:signinlogs"
| spath "deviceDetail.isCompliant"
| spath "deviceDetail.isManaged"
| spath "deviceDetail.operatingSystem"
| search resultType=0
("deviceDetail.isCompliant"="false"
OR NOT "deviceDetail.isCompliant"=*)
| stats count as AccessCount,
values(appDisplayName) as Apps,
values(ipAddress) as IPs
by userPrincipalName,
"deviceDetail.displayName",
"deviceDetail.operatingSystem",
"deviceDetail.isCompliant"
| sort -AccessCount
// ============================================================
// SPL: Device Compliance Drift Detection
// ============================================================
index=intune sourcetype="intune:compliance"
| spath "complianceState"
| spath "deviceName"
| streamstats current=f last(complianceState) as previousState
by deviceName
| where complianceState="noncompliant" AND
(previousState="compliant" OR isnull(previousState))
| table _time, deviceName, complianceState, previousState,
osVersion, ownerType, lastContactedDateTime
| sort -_time
Phase 3: Microsegmentation¶
3.1 Understanding Microsegmentation¶
Microsegmentation is the practice of creating fine-grained network security zones that isolate workloads from one another. Unlike traditional network segmentation (which separates broad zones like DMZ, internal, and management), microsegmentation operates at the workload level — each application, service, or pod gets its own security boundary.
┌─────────────────────────────────────────────────────────────────┐
│ TRADITIONAL vs. MICROSEGMENTATION │
│ │
│ TRADITIONAL SEGMENTATION MICROSEGMENTATION │
│ ┌───────────────────┐ ┌───────────────────┐ │
│ │ DMZ (VLAN 10) │ │ ┌───┐ ┌───┐ ┌───┐│ │
│ │ Web1 Web2 Web3 │ │ │W1 │ │W2 │ │W3 ││ │
│ │ (all can talk) │ │ │ ╳ │ │ ╳ │ │ ╳ ││ │
│ └────────┬──────────┘ │ └─┬─┘ └─┬─┘ └─┬─┘│ │
│ │ │ │ │ │ │ │
│ ┌────────┴──────────┐ │ ┌─┴─┐ ┌─┴─┐ ┌─┴─┐│ │
│ │ App (VLAN 20) │ │ │A1 │ │A2 │ │A3 ││ │
│ │ App1 App2 App3 │ │ │ ╳ │ │ ╳ │ │ ╳ ││ │
│ │ (all can talk) │ │ └─┬─┘ └─┬─┘ └─┬─┘│ │
│ └────────┬──────────┘ │ │ │ │ │ │
│ │ │ ┌─┴─┐ ┌─┴─┐ ┌─┴─┐│ │
│ ┌────────┴──────────┐ │ │D1 │ │D2 │ │D3 ││ │
│ │ DB (VLAN 30) │ │ │ ╳ │ │ ╳ │ │ ╳ ││ │
│ │ DB1 DB2 DB3 │ │ └───┘ └───┘ └───┘│ │
│ │ (all can talk) │ │ (isolated per app)│ │
│ └───────────────────┘ └───────────────────┘ │
│ │
│ Lateral movement: FREE Lateral movement: BLOCKED │
└─────────────────────────────────────────────────────────────────┘
3.2 Exercise: Kubernetes Network Policy Design¶
For containerized environments, Kubernetes NetworkPolicies provide microsegmentation at the pod level. See also Lab 27: Kubernetes Attack & Defense and Chapter 51: Kubernetes Security.
Step 1: Default Deny All Traffic¶
# ============================================================
# default-deny-all.yaml
# Default deny all ingress AND egress traffic in the namespace
# This is the foundation of microsegmentation — deny by default
# ============================================================
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
name: default-deny-all
namespace: apex-financial
labels:
app.kubernetes.io/part-of: zero-trust
security.apex.example.com/policy-type: baseline
spec:
podSelector: {} # Applies to ALL pods in namespace
policyTypes:
- Ingress
- Egress
Default Deny Impact
Applying a default-deny policy immediately blocks all pod-to-pod communication in the namespace, including DNS resolution. Always deploy the DNS egress allowance policy simultaneously, or applications will fail to resolve service names.
# ============================================================
# allow-dns-egress.yaml
# Allow DNS resolution for all pods (required for service discovery)
# ============================================================
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
name: allow-dns-egress
namespace: apex-financial
labels:
app.kubernetes.io/part-of: zero-trust
spec:
podSelector: {} # All pods
policyTypes:
- Egress
egress:
- to:
- namespaceSelector:
matchLabels:
kubernetes.io/metadata.name: kube-system
ports:
- protocol: UDP
port: 53
- protocol: TCP
port: 53
Step 2: Application-Specific Microsegmentation Policies¶
# ============================================================
# frontend-policy.yaml
# Frontend web tier — accepts external traffic, can only talk to API tier
# ============================================================
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
name: frontend-allow
namespace: apex-financial
labels:
app.kubernetes.io/part-of: zero-trust
security.apex.example.com/tier: frontend
spec:
podSelector:
matchLabels:
app: apex-frontend
tier: web
policyTypes:
- Ingress
- Egress
ingress:
# Allow traffic from ingress controller only
- from:
- namespaceSelector:
matchLabels:
kubernetes.io/metadata.name: ingress-nginx
podSelector:
matchLabels:
app.kubernetes.io/component: controller
ports:
- protocol: TCP
port: 8080
egress:
# Can only talk to API tier on specific port
- to:
- podSelector:
matchLabels:
app: apex-api
tier: api
ports:
- protocol: TCP
port: 8443
# DNS resolution
- to:
- namespaceSelector:
matchLabels:
kubernetes.io/metadata.name: kube-system
ports:
- protocol: UDP
port: 53
# ============================================================
# api-tier-policy.yaml
# API tier — accepts traffic from frontend only, can talk to DB only
# ============================================================
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
name: api-tier-allow
namespace: apex-financial
labels:
app.kubernetes.io/part-of: zero-trust
security.apex.example.com/tier: api
spec:
podSelector:
matchLabels:
app: apex-api
tier: api
policyTypes:
- Ingress
- Egress
ingress:
# Accept traffic from frontend only
- from:
- podSelector:
matchLabels:
app: apex-frontend
tier: web
ports:
- protocol: TCP
port: 8443
# Accept traffic from service mesh sidecar (health checks)
- from:
- podSelector:
matchLabels:
app: apex-api
ports:
- protocol: TCP
port: 15021 # Istio health check port
egress:
# Can only talk to database tier
- to:
- podSelector:
matchLabels:
app: apex-database
tier: database
ports:
- protocol: TCP
port: 5432 # PostgreSQL
# Can talk to cache tier
- to:
- podSelector:
matchLabels:
app: apex-cache
tier: cache
ports:
- protocol: TCP
port: 6379 # Redis
# DNS
- to:
- namespaceSelector:
matchLabels:
kubernetes.io/metadata.name: kube-system
ports:
- protocol: UDP
port: 53
# ============================================================
# database-policy.yaml
# Database tier — accepts traffic from API tier ONLY
# NO egress allowed (database should never initiate connections)
# ============================================================
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
name: database-allow
namespace: apex-financial
labels:
app.kubernetes.io/part-of: zero-trust
security.apex.example.com/tier: database
spec:
podSelector:
matchLabels:
app: apex-database
tier: database
policyTypes:
- Ingress
- Egress
ingress:
# Accept traffic from API tier ONLY on PostgreSQL port
- from:
- podSelector:
matchLabels:
app: apex-api
tier: api
ports:
- protocol: TCP
port: 5432
egress:
# DNS only — no other egress allowed
- to:
- namespaceSelector:
matchLabels:
kubernetes.io/metadata.name: kube-system
ports:
- protocol: UDP
port: 53
Step 3: Apply and Validate Microsegmentation¶
# ============================================================
# Apply all network policies
# ============================================================
kubectl apply -f default-deny-all.yaml
kubectl apply -f allow-dns-egress.yaml
kubectl apply -f frontend-policy.yaml
kubectl apply -f api-tier-policy.yaml
kubectl apply -f database-policy.yaml
# Verify policies are applied
kubectl get networkpolicies -n apex-financial -o wide
# Expected output:
# NAME POD-SELECTOR AGE
# default-deny-all <none> (all pods) 5s
# allow-dns-egress <none> (all pods) 5s
# frontend-allow app=apex-frontend,tier=web 5s
# api-tier-allow app=apex-api,tier=api 5s
# database-allow app=apex-database,tier=db 5s
3.3 Exercise: Lateral Movement Testing¶
Step 4: Validate Segment Isolation¶
# ============================================================
# Test 1: Frontend → Database (should be BLOCKED)
# ============================================================
# Spawn a test pod in the frontend tier
kubectl run test-frontend \
--image=nicolaka/netshoot \
--labels="app=apex-frontend,tier=web" \
-n apex-financial \
--rm -it -- bash
# Inside the pod, try to reach the database directly
# This should FAIL — frontend cannot talk to database
ncat -zv apex-database.apex-financial.svc.cluster.local 5432 -w 3
# Expected: Connection timed out / refused
# Try to reach the API tier (should SUCCEED)
ncat -zv apex-api.apex-financial.svc.cluster.local 8443 -w 3
# Expected: Connection succeeded
# ============================================================
# Test 2: API → Database (should be ALLOWED)
# ============================================================
kubectl run test-api \
--image=nicolaka/netshoot \
--labels="app=apex-api,tier=api" \
-n apex-financial \
--rm -it -- bash
# Inside the pod, try to reach the database
ncat -zv apex-database.apex-financial.svc.cluster.local 5432 -w 3
# Expected: Connection succeeded
# Try to reach an external service (should be BLOCKED — no egress rule)
curl -s --connect-timeout 3 https://203.0.113.50/exfil
# Expected: Connection timed out
# ============================================================
# Test 3: Database → External (egress should be BLOCKED)
# ============================================================
kubectl run test-db \
--image=nicolaka/netshoot \
--labels="app=apex-database,tier=database" \
-n apex-financial \
--rm -it -- bash
# Try to reach external IP (data exfiltration simulation)
curl -s --connect-timeout 3 http://198.51.100.99/beacon
# Expected: Connection timed out — database has NO external egress
# Try reverse shell simulation (should be BLOCKED)
ncat -zv 203.0.113.100 4444 -w 3
# Expected: Connection timed out
# ============================================================
# Test 4: Cross-namespace communication (should be BLOCKED)
# ============================================================
# From apex-financial namespace, try to reach a pod in another namespace
kubectl run test-cross \
--image=nicolaka/netshoot \
--labels="app=apex-frontend,tier=web" \
-n apex-financial \
--rm -it -- bash
# Try to reach a service in the 'default' namespace
ncat -zv kubernetes.default.svc.cluster.local 443 -w 3
# Expected: Connection timed out (default deny blocks cross-namespace)
# Try to reach a pod in 'monitoring' namespace
ncat -zv prometheus-server.monitoring.svc.cluster.local 9090 -w 3
# Expected: Connection timed out
Step 5: Firewall-Based Microsegmentation (Non-Kubernetes)¶
# ============================================================
# Windows Host-Based Firewall Microsegmentation
# For non-containerized environments
# ============================================================
# Define application segments
$segments = @{
"WebTier" = @{
Servers = @("10.10.10.10", "10.10.10.11") # SYNTHETIC
AllowedInbound = @{
Source = @("198.51.100.0/24") # SYNTHETIC — Load balancer
Ports = @(443)
}
AllowedOutbound = @{
Destination = @("10.10.20.10", "10.10.20.11") # API tier
Ports = @(8443)
}
}
"ApiTier" = @{
Servers = @("10.10.20.10", "10.10.20.11") # SYNTHETIC
AllowedInbound = @{
Source = @("10.10.10.10", "10.10.10.11") # Web tier only
Ports = @(8443)
}
AllowedOutbound = @{
Destination = @("10.10.30.10") # Database tier
Ports = @(5432)
}
}
"DatabaseTier" = @{
Servers = @("10.10.30.10") # SYNTHETIC
AllowedInbound = @{
Source = @("10.10.20.10", "10.10.20.11") # API tier only
Ports = @(5432)
}
AllowedOutbound = @{
Destination = @() # NO outbound allowed
Ports = @()
}
}
}
# Create firewall rules for each segment
foreach ($segmentName in $segments.Keys) {
$segment = $segments[$segmentName]
# Block all inbound by default
New-NetFirewallRule -DisplayName "ZT-$segmentName-DenyAllInbound" `
-Direction Inbound -Action Block -Profile Any `
-Enabled True -Description "Zero Trust: Default deny inbound for $segmentName"
# Allow specific inbound
if ($segment.AllowedInbound.Ports.Count -gt 0) {
New-NetFirewallRule -DisplayName "ZT-$segmentName-AllowInbound" `
-Direction Inbound -Action Allow -Profile Any `
-RemoteAddress $segment.AllowedInbound.Source `
-LocalPort $segment.AllowedInbound.Ports `
-Protocol TCP `
-Enabled True `
-Description "Zero Trust: Allowed inbound for $segmentName"
}
# Block all outbound by default
New-NetFirewallRule -DisplayName "ZT-$segmentName-DenyAllOutbound" `
-Direction Outbound -Action Block -Profile Any `
-Enabled True -Description "Zero Trust: Default deny outbound for $segmentName"
# Allow specific outbound
if ($segment.AllowedOutbound.Ports.Count -gt 0) {
New-NetFirewallRule -DisplayName "ZT-$segmentName-AllowOutbound" `
-Direction Outbound -Action Allow -Profile Any `
-RemoteAddress $segment.AllowedOutbound.Destination `
-RemotePort $segment.AllowedOutbound.Ports `
-Protocol TCP `
-Enabled True `
-Description "Zero Trust: Allowed outbound for $segmentName"
}
Write-Host "Segment '$segmentName' configured: $($segment.AllowedInbound.Ports.Count) inbound rules, $($segment.AllowedOutbound.Ports.Count) outbound rules" -ForegroundColor Green
}
3.4 Detection: Microsegmentation Monitoring¶
KQL Detection Queries¶
// ============================================================
// KQL: East-West Traffic Violations — Blocked Lateral Movement
// ============================================================
AzureNetworkAnalytics_CL
| where TimeGenerated > ago(24h)
| where FlowStatus_s == "D" // Denied flows
| where SrcIP_s startswith "10.10."
and DestIP_s startswith "10.10."
| where SrcIP_s != DestIP_s
| extend SrcSegment = case(
SrcIP_s startswith "10.10.10.", "WebTier",
SrcIP_s startswith "10.10.20.", "ApiTier",
SrcIP_s startswith "10.10.30.", "DatabaseTier",
"Unknown"
)
| extend DestSegment = case(
DestIP_s startswith "10.10.10.", "WebTier",
DestIP_s startswith "10.10.20.", "ApiTier",
DestIP_s startswith "10.10.30.", "DatabaseTier",
"Unknown"
)
| summarize
BlockedFlows = count(),
UniqueSrcIPs = dcount(SrcIP_s),
UniqueDestPorts = make_set(DestPort_d)
by SrcSegment, DestSegment, bin(TimeGenerated, 1h)
| where BlockedFlows > 10 // Alert threshold
| order by BlockedFlows desc
// ============================================================
// KQL: Kubernetes NetworkPolicy Denied Connections
// ============================================================
AzureDiagnostics
| where Category == "kube-audit"
| where TimeGenerated > ago(24h)
| extend logEntry = parse_json(log_s)
| where logEntry.verb == "create"
and logEntry.objectRef.resource == "networkpolicies"
| project TimeGenerated,
User = tostring(logEntry.user.username),
PolicyName = tostring(logEntry.objectRef.name),
Namespace = tostring(logEntry.objectRef.namespace),
Action = tostring(logEntry.verb)
| order by TimeGenerated desc
// ============================================================
// KQL: Detect Unauthorized Cross-Segment Communication
// ============================================================
CommonSecurityLog
| where TimeGenerated > ago(24h)
| where DeviceAction == "Deny" or DeviceAction == "Drop"
| where SourceIP startswith "10.10."
and DestinationIP startswith "10.10."
| extend SourceSegment = case(
SourceIP startswith "10.10.10.", "WebTier",
SourceIP startswith "10.10.20.", "ApiTier",
SourceIP startswith "10.10.30.", "DatabaseTier",
SourceIP startswith "10.10.40.", "ManagementTier",
"Unknown"
)
| extend DestSegment = case(
DestinationIP startswith "10.10.10.", "WebTier",
DestinationIP startswith "10.10.20.", "ApiTier",
DestinationIP startswith "10.10.30.", "DatabaseTier",
DestinationIP startswith "10.10.40.", "ManagementTier",
"Unknown"
)
| where SourceSegment != DestSegment
| summarize
DeniedFlows = count(),
DestPorts = make_set(DestinationPort),
FirstSeen = min(TimeGenerated),
LastSeen = max(TimeGenerated)
by SourceIP, DestinationIP, SourceSegment, DestSegment
| where DeniedFlows > 5
| order by DeniedFlows desc
SPL Detection Queries¶
// ============================================================
// SPL: East-West Traffic Violations
// ============================================================
index=firewall sourcetype="pan:traffic" action="deny"
| where cidrmatch("10.10.0.0/16", src_ip)
AND cidrmatch("10.10.0.0/16", dest_ip)
| eval src_segment=case(
cidrmatch("10.10.10.0/24", src_ip), "WebTier",
cidrmatch("10.10.20.0/24", src_ip), "ApiTier",
cidrmatch("10.10.30.0/24", src_ip), "DatabaseTier",
cidrmatch("10.10.40.0/24", src_ip), "ManagementTier",
1=1, "Unknown"
)
| eval dest_segment=case(
cidrmatch("10.10.10.0/24", dest_ip), "WebTier",
cidrmatch("10.10.20.0/24", dest_ip), "ApiTier",
cidrmatch("10.10.30.0/24", dest_ip), "DatabaseTier",
cidrmatch("10.10.40.0/24", dest_ip), "ManagementTier",
1=1, "Unknown"
)
| where src_segment!=dest_segment
| stats count as BlockedFlows,
dc(src_ip) as UniqueSrcIPs,
values(dest_port) as DestPorts
by src_segment, dest_segment
| sort -BlockedFlows
// ============================================================
// SPL: Database Tier Outbound Connection Attempts (Exfiltration)
// ============================================================
index=firewall sourcetype="pan:traffic"
| where cidrmatch("10.10.30.0/24", src_ip)
| where NOT cidrmatch("10.10.0.0/16", dest_ip)
| stats count as OutboundAttempts,
values(dest_ip) as ExternalDests,
values(dest_port) as DestPorts,
values(app) as Applications
by src_ip
| where OutboundAttempts > 0
| sort -OutboundAttempts
Phase 4: Application-Level Zero Trust¶
4.1 Understanding Application-Layer Zero Trust¶
Network-level controls are necessary but not sufficient. Application-layer zero trust extends the trust boundary to the application itself — every request must carry verifiable identity, every service connection must be mutually authenticated, and every API call must be authorized based on fine-grained claims.
See also Chapter 52: API Security Framework and Lab 28: API Security Testing.
4.2 Exercise: Identity-Aware Proxy (IAP) Configuration¶
An identity-aware proxy sits in front of applications and enforces authentication and authorization before any request reaches the application backend. This eliminates the need for VPN access and ensures that the application never receives unauthenticated traffic.
Step 1: Configure Identity-Aware Proxy¶
# ============================================================
# iap-config.yaml
# Identity-Aware Proxy configuration for Apex Financial Portal
# ============================================================
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
name: apex-portal-iap
namespace: apex-financial
annotations:
# Enable IAP
kubernetes.io/ingress.class: "nginx"
nginx.ingress.kubernetes.io/auth-url: "https://oauth2-proxy.apex.example.com/oauth2/auth"
nginx.ingress.kubernetes.io/auth-signin: "https://oauth2-proxy.apex.example.com/oauth2/start?rd=$scheme://$host$request_uri"
nginx.ingress.kubernetes.io/auth-response-headers: "X-Auth-Request-User,X-Auth-Request-Email,X-Auth-Request-Groups,X-Auth-Request-Access-Token"
# Security headers
nginx.ingress.kubernetes.io/configuration-snippet: |
more_set_headers "X-Frame-Options: DENY";
more_set_headers "X-Content-Type-Options: nosniff";
more_set_headers "X-XSS-Protection: 1; mode=block";
more_set_headers "Referrer-Policy: strict-origin-when-cross-origin";
more_set_headers "Content-Security-Policy: default-src 'self'";
spec:
tls:
- hosts:
- portal.apex.example.com # SYNTHETIC
secretName: apex-portal-tls
rules:
- host: portal.apex.example.com # SYNTHETIC
http:
paths:
- path: /
pathType: Prefix
backend:
service:
name: apex-portal
port:
number: 8443
# ============================================================
# oauth2-proxy-deployment.yaml
# OAuth2 Proxy — authenticates users before forwarding to backend
# ============================================================
apiVersion: apps/v1
kind: Deployment
metadata:
name: oauth2-proxy
namespace: apex-financial
spec:
replicas: 2
selector:
matchLabels:
app: oauth2-proxy
template:
metadata:
labels:
app: oauth2-proxy
spec:
containers:
- name: oauth2-proxy
image: quay.io/oauth2-proxy/oauth2-proxy:v7.6.0
args:
- --provider=oidc
- --oidc-issuer-url=https://login.apex.example.com/v2.0 # SYNTHETIC
- --client-id=00000000-0000-0000-0000-000000000002 # SYNTHETIC
- --client-secret=REDACTED
- --cookie-secret=REDACTED
- --cookie-secure=true
- --cookie-httponly=true
- --cookie-samesite=strict
- --email-domain=apex.example.com # SYNTHETIC
- --upstream=http://apex-portal.apex-financial.svc.cluster.local:8443
- --http-address=0.0.0.0:4180
- --reverse-proxy=true
- --set-xauthrequest=true
- --pass-access-token=true
- --skip-provider-button=true
- --session-store-type=redis
- --redis-connection-url=redis://apex-cache.apex-financial.svc.cluster.local:6379
# Zero trust: short session lifetime, force re-auth
- --cookie-expire=1h
- --cookie-refresh=15m
ports:
- containerPort: 4180
resources:
requests:
cpu: 100m
memory: 128Mi
limits:
cpu: 200m
memory: 256Mi
livenessProbe:
httpGet:
path: /ping
port: 4180
initialDelaySeconds: 5
periodSeconds: 10
4.3 Exercise: Service Mesh mTLS¶
Mutual TLS (mTLS) ensures that both the client and server in a service-to-service communication present valid certificates. This prevents impersonation, man-in-the-middle attacks, and unauthorized service access within the mesh.
Step 2: Configure Istio mTLS Strict Mode¶
# ============================================================
# peer-authentication-strict.yaml
# Enforce STRICT mTLS for all services in the namespace
# ============================================================
apiVersion: security.istio.io/v1beta1
kind: PeerAuthentication
metadata:
name: default-strict-mtls
namespace: apex-financial
spec:
mtls:
mode: STRICT # All traffic MUST be mTLS — no plaintext allowed
# ============================================================
# destination-rule-mtls.yaml
# Configure mTLS for outbound connections to all services
# ============================================================
apiVersion: networking.istio.io/v1beta1
kind: DestinationRule
metadata:
name: apex-mtls-default
namespace: apex-financial
spec:
host: "*.apex-financial.svc.cluster.local"
trafficPolicy:
tls:
mode: ISTIO_MUTUAL # Use Istio-managed certificates
connectionPool:
tcp:
maxConnections: 100
connectTimeout: 5s
http:
h2UpgradePolicy: DEFAULT
maxRequestsPerConnection: 10
# ============================================================
# authorization-policy-api.yaml
# Fine-grained authorization: only frontend can call API service
# ============================================================
apiVersion: security.istio.io/v1beta1
kind: AuthorizationPolicy
metadata:
name: api-authz-policy
namespace: apex-financial
spec:
selector:
matchLabels:
app: apex-api
action: ALLOW
rules:
- from:
- source:
principals:
- "cluster.local/ns/apex-financial/sa/apex-frontend"
namespaces:
- "apex-financial"
to:
- operation:
methods: ["GET", "POST", "PUT"]
paths:
- "/api/v2/accounts/*"
- "/api/v2/transactions/*"
- "/api/v2/portfolio/*"
ports: ["8443"]
when:
- key: request.auth.claims[iss]
values:
- "https://login.apex.example.com/v2.0" # SYNTHETIC
Step 3: Validate mTLS Enforcement¶
# ============================================================
# Test 1: Verify mTLS is enforced between services
# ============================================================
# Check mTLS status for all services in the namespace
istioctl x describe pod $(kubectl get pod -n apex-financial \
-l app=apex-api -o jsonpath='{.items[0].metadata.name}') \
-n apex-financial
# Expected output should show:
# Pilot reports that pod enforces mTLS and target is strict
# Check peer authentication policies
istioctl authn tls-check $(kubectl get pod -n apex-financial \
-l app=apex-api -o jsonpath='{.items[0].metadata.name}') \
-n apex-financial
# ============================================================
# Test 2: Attempt plaintext connection (should FAIL)
# ============================================================
# Deploy a pod WITHOUT Istio sidecar
kubectl run no-sidecar \
--image=nicolaka/netshoot \
-n apex-financial \
--overrides='{"metadata":{"annotations":{"sidecar.istio.io/inject":"false"}}}' \
--rm -it -- bash
# Try to connect to the API service via plaintext HTTP
curl -v http://apex-api.apex-financial.svc.cluster.local:8443/api/v2/accounts
# Expected: Connection REFUSED — mTLS strict rejects plaintext
# Try to connect with a self-signed certificate
curl -v --cert /tmp/fake-cert.pem --key /tmp/fake-key.pem \
https://apex-api.apex-financial.svc.cluster.local:8443/api/v2/accounts
# Expected: TLS handshake FAILED — certificate not issued by Istio CA
4.4 Exercise: JWT Token Validation¶
Step 4: Implement JWT-Based Authorization¶
#!/usr/bin/env python3
"""
JWT validation middleware for zero trust application-layer security.
Validates token claims, audience, issuer, and custom authorization claims.
ALL VALUES ARE SYNTHETIC — for educational purposes only.
"""
import jwt
import json
import time
from functools import wraps
from flask import Flask, request, jsonify, g
app = Flask(__name__)
# ============================================================
# SYNTHETIC configuration — all values are fictional
# ============================================================
JWT_CONFIG = {
"issuer": "https://login.apex.example.com/v2.0", # SYNTHETIC
"audience": "api://apex-financial-portal", # SYNTHETIC
"jwks_uri": "https://login.apex.example.com/.well-known/jwks.json", # SYNTHETIC
"algorithms": ["RS256"], # ONLY allow RS256 — never HS256 or none
"max_token_age_seconds": 3600, # 1 hour max token lifetime
"required_claims": ["sub", "aud", "iss", "exp", "iat", "roles"],
"clock_skew_seconds": 30,
}
# SYNTHETIC JWKS public key for educational purposes
SYNTHETIC_PUBLIC_KEY = """-----BEGIN PUBLIC KEY-----
MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA0SYNTHETIC_KEY_FOR_
EDUCATIONAL_PURPOSES_ONLY_NOT_A_REAL_KEY_DO_NOT_USE_IN_PRODUCTION
xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
-----END PUBLIC KEY-----"""
def validate_jwt_token(token: str) -> dict:
"""
Validate JWT token with zero trust principles:
1. Algorithm restriction (RS256 only)
2. Issuer validation
3. Audience validation
4. Expiration check with clock skew
5. Required claims verification
6. Token age validation
"""
try:
# Decode and validate token
# CRITICAL: Always specify algorithms to prevent algorithm confusion
payload = jwt.decode(
token,
SYNTHETIC_PUBLIC_KEY,
algorithms=JWT_CONFIG["algorithms"], # RS256 only
audience=JWT_CONFIG["audience"],
issuer=JWT_CONFIG["issuer"],
leeway=JWT_CONFIG["clock_skew_seconds"],
options={
"require": JWT_CONFIG["required_claims"],
"verify_exp": True,
"verify_iat": True,
"verify_aud": True,
"verify_iss": True,
}
)
# Additional validation: token age
issued_at = payload.get("iat", 0)
token_age = time.time() - issued_at
if token_age > JWT_CONFIG["max_token_age_seconds"]:
raise jwt.InvalidTokenError(
f"Token too old: {token_age}s > {JWT_CONFIG['max_token_age_seconds']}s"
)
# Validate custom claims
if "roles" not in payload or not isinstance(payload["roles"], list):
raise jwt.InvalidTokenError("Missing or invalid 'roles' claim")
return payload
except jwt.ExpiredSignatureError:
raise ValueError("Token has expired — re-authentication required")
except jwt.InvalidAudienceError:
raise ValueError("Invalid audience — token not intended for this service")
except jwt.InvalidIssuerError:
raise ValueError("Invalid issuer — token not from trusted IdP")
except jwt.InvalidAlgorithmError:
raise ValueError("Invalid algorithm — only RS256 is accepted")
except jwt.DecodeError:
raise ValueError("Token decode failed — malformed or tampered token")
except jwt.InvalidTokenError as e:
raise ValueError(f"Token validation failed: {str(e)}")
def require_role(*required_roles):
"""Decorator to enforce role-based access control on endpoints."""
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
# Extract token from Authorization header
auth_header = request.headers.get("Authorization", "")
if not auth_header.startswith("Bearer "):
return jsonify({
"error": "unauthorized",
"message": "Missing or invalid Authorization header"
}), 401
token = auth_header[7:] # Strip "Bearer "
try:
claims = validate_jwt_token(token)
except ValueError as e:
return jsonify({
"error": "unauthorized",
"message": str(e)
}), 401
# Check role authorization
user_roles = set(claims.get("roles", []))
if not user_roles.intersection(set(required_roles)):
return jsonify({
"error": "forbidden",
"message": f"Required roles: {required_roles}, user has: {list(user_roles)}"
}), 403
# Store claims in request context for downstream use
g.user_claims = claims
g.user_id = claims.get("sub")
g.user_roles = list(user_roles)
return f(*args, **kwargs)
return decorated_function
return decorator
# ============================================================
# Example protected endpoints
# ============================================================
@app.route("/api/v2/accounts/<account_id>", methods=["GET"])
@require_role("account.read", "admin")
def get_account(account_id):
"""Protected endpoint — requires account.read or admin role."""
# Additional object-level authorization
if "admin" not in g.user_roles:
# Non-admin users can only access their own account
if account_id != g.user_id:
return jsonify({
"error": "forbidden",
"message": "Cannot access another user's account"
}), 403
return jsonify({
"account_id": account_id, # SYNTHETIC
"name": "Test User", # SYNTHETIC
"balance": 50000.00, # SYNTHETIC
"currency": "USD",
"status": "active"
})
@app.route("/api/v2/transactions", methods=["POST"])
@require_role("transaction.write", "admin")
def create_transaction():
"""Protected endpoint — requires transaction.write or admin role."""
# Validate request body
data = request.get_json()
if not data:
return jsonify({"error": "bad_request", "message": "Missing request body"}), 400
# Log transaction for audit
print(f"[AUDIT] Transaction initiated by {g.user_id}: "
f"amount={data.get('amount')}, "
f"destination={data.get('destination_account')}")
return jsonify({
"transaction_id": "TXN-2026-SYNTHETIC-001", # SYNTHETIC
"status": "pending",
"initiated_by": g.user_id,
"timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
}), 201
@app.route("/api/v2/admin/users", methods=["GET"])
@require_role("admin")
def list_users():
"""Admin-only endpoint — requires admin role."""
return jsonify({
"users": [
{"id": "user-001", "email": "testuser@apex.example.com"}, # SYNTHETIC
{"id": "user-002", "email": "analyst@apex.example.com"}, # SYNTHETIC
],
"total": 2
})
if __name__ == "__main__":
app.run(host="0.0.0.0", port=8443, ssl_context="adhoc")
JWT Security Best Practices for Zero Trust
- Always restrict algorithms — specify
algorithms=["RS256"]to prevent algorithm confusion attacks (none, HS256 key confusion) - Validate all claims — issuer, audience, expiration, and custom claims
- Short token lifetimes — 1 hour maximum for access tokens, 15 minutes for sensitive operations
- Token binding — bind tokens to specific devices/sessions using
cnf(confirmation) claims - Rotation — rotate signing keys regularly and support key rollover via JWKS
4.5 Detection: Application-Layer Monitoring¶
KQL Detection Queries¶
// ============================================================
// KQL: JWT Token Validation Failures — Potential Attack
// ============================================================
AppServiceHTTPLogs
| where TimeGenerated > ago(24h)
| where ScStatus == 401 or ScStatus == 403
| extend RequestPath = tostring(split(CsUriStem, "?")[0])
| summarize
FailureCount = count(),
UniqueIPs = dcount(CIp),
Paths = make_set(RequestPath),
StatusCodes = make_set(ScStatus)
by CIp, bin(TimeGenerated, 15m)
| where FailureCount > 20
| order by FailureCount desc
// ============================================================
// KQL: mTLS Certificate Validation Failures
// ============================================================
AzureDiagnostics
| where Category == "IstioAccessLogs" or Category == "EnvoyAccessLogs"
| where TimeGenerated > ago(24h)
| where response_code_d == 503 or response_code_d == 403
| where response_flags_s contains "UC" // Upstream connection failure
or response_flags_s contains "UF"
| extend SourceApp = tostring(split(upstream_cluster_s, "|")[3])
| extend DestApp = tostring(split(upstream_cluster_s, "|")[4])
| summarize
TLSFailures = count(),
ResponseFlags = make_set(response_flags_s)
by SourceApp, DestApp, bin(TimeGenerated, 15m)
| where TLSFailures > 5
| order by TLSFailures desc
SPL Detection Queries¶
// ============================================================
// SPL: JWT Token Validation Failures
// ============================================================
index=web sourcetype="access_combined"
| where status=401 OR status=403
| rex field=uri_path "^(?<api_path>/api/v[0-9]+/[^/]+)"
| stats count as FailureCount,
dc(clientip) as UniqueIPs,
values(api_path) as TargetPaths,
values(status) as StatusCodes
by clientip
| where FailureCount > 20
| sort -FailureCount
// ============================================================
// SPL: Service Mesh mTLS Failures
// ============================================================
index=istio sourcetype="istio:accesslog"
| where response_code=503 OR response_code=403
| where response_flags="UC" OR response_flags="UF"
OR response_flags="URX"
| stats count as TLSFailures,
values(upstream_cluster) as UpstreamServices,
values(response_flags) as Flags
by source_workload, destination_workload
| where TLSFailures > 5
| sort -TLSFailures
Phase 5: Continuous Verification & Anomaly Detection¶
5.1 Understanding Continuous Verification¶
Zero trust is not a one-time gate check at authentication. It requires continuous evaluation of trust throughout the session. If a user's risk level changes, if their device becomes non-compliant, or if their behavior becomes anomalous, trust must be re-evaluated and access may be revoked or stepped up in real time.
┌──────────────────────────────────────────────────────────────────┐
│ CONTINUOUS VERIFICATION LIFECYCLE │
│ │
│ ┌──────┐ ┌──────────┐ ┌──────────┐ ┌──────────────┐ │
│ │ Auth │───▶│ Session │───▶│ Ongoing │───▶│ Re-evaluate │ │
│ │ Gate │ │ Granted │ │ Monitoring│ │ Trust Level │ │
│ └──────┘ └──────────┘ └────┬─────┘ └──────┬───────┘ │
│ │ │ │
│ ▼ ▼ │
│ ┌──────────────────┐ ┌──────────────┐ │
│ │ Trust Signals │ │ Actions │ │
│ │ ─────────────── │ │ ────────── │ │
│ │ • IP changed │ │ • Continue │ │
│ │ • Device drift │ │ • Step-up │ │
│ │ • Impossible │ │ MFA │ │
│ │ travel │ │ • Restrict │ │
│ │ • Anomalous │ │ access │ │
│ │ data access │ │ • Terminate │ │
│ │ • Privilege │ │ session │ │
│ │ escalation │ │ • Alert SOC │ │
│ └──────────────────┘ └──────────────┘ │
└──────────────────────────────────────────────────────────────────┘
5.2 Exercise: Session Re-Evaluation Engine¶
Step 1: Build a Continuous Trust Scoring Engine¶
#!/usr/bin/env python3
"""
Continuous Trust Scoring Engine for Zero Trust Architecture.
Evaluates trust signals in real-time and triggers re-authentication
when trust degrades below threshold.
ALL VALUES ARE SYNTHETIC — for educational purposes only.
"""
import json
import time
import hashlib
from dataclasses import dataclass, field
from typing import Optional
from enum import Enum
class TrustAction(Enum):
CONTINUE = "continue" # Trust level acceptable
STEP_UP_MFA = "step_up_mfa" # Require additional authentication
RESTRICT = "restrict" # Limit access to read-only
TERMINATE = "terminate" # Kill the session immediately
ALERT_SOC = "alert_soc" # Notify SOC for investigation
@dataclass
class TrustSignal:
"""Individual trust signal contributing to overall trust score."""
name: str
score: float # 0.0 (no trust) to 1.0 (full trust)
weight: float # Relative importance
timestamp: float
details: str = ""
@dataclass
class SessionContext:
"""User session context for continuous evaluation."""
session_id: str
user_id: str # SYNTHETIC
user_email: str # SYNTHETIC
auth_time: float
ip_address: str # SYNTHETIC — RFC 5737/1918
device_id: str # SYNTHETIC
device_compliant: bool
user_agent: str
location: str
risk_level: str = "low"
trust_score: float = 100.0
signals: list = field(default_factory=list)
actions_taken: list = field(default_factory=list)
class ContinuousTrustEngine:
"""
Zero Trust Continuous Trust Evaluation Engine.
Evaluates trust signals and determines appropriate actions.
"""
# Trust thresholds (configurable per organization)
THRESHOLDS = {
"full_access": 80.0, # >= 80: full access, no friction
"step_up_mfa": 60.0, # 60-79: require step-up MFA
"restrict_access": 40.0, # 40-59: restrict to read-only
"terminate_session": 0.0, # < 40: terminate session immediately
}
# Signal weights
SIGNAL_WEIGHTS = {
"ip_consistency": 0.15,
"device_compliance": 0.20,
"location_consistency": 0.15,
"behavior_normal": 0.15,
"session_age": 0.10,
"privilege_usage": 0.10,
"data_access_pattern": 0.10,
"impossible_travel": 0.05,
}
def __init__(self):
self.evaluation_count = 0
def evaluate_session(self, session: SessionContext) -> tuple:
"""
Evaluate all trust signals for a session.
Returns (trust_score, action, signals).
"""
self.evaluation_count += 1
signals = []
# Signal 1: IP Address Consistency
ip_signal = self._evaluate_ip_consistency(session)
signals.append(ip_signal)
# Signal 2: Device Compliance
device_signal = self._evaluate_device_compliance(session)
signals.append(device_signal)
# Signal 3: Location Consistency
location_signal = self._evaluate_location(session)
signals.append(location_signal)
# Signal 4: Behavioral Analysis
behavior_signal = self._evaluate_behavior(session)
signals.append(behavior_signal)
# Signal 5: Session Age
age_signal = self._evaluate_session_age(session)
signals.append(age_signal)
# Signal 6: Privilege Usage Pattern
privilege_signal = self._evaluate_privilege_usage(session)
signals.append(privilege_signal)
# Signal 7: Data Access Pattern
data_signal = self._evaluate_data_access(session)
signals.append(data_signal)
# Signal 8: Impossible Travel
travel_signal = self._evaluate_impossible_travel(session)
signals.append(travel_signal)
# Calculate weighted trust score
total_score = sum(s.score * s.weight for s in signals)
total_weight = sum(s.weight for s in signals)
trust_score = (total_score / total_weight) * 100 if total_weight > 0 else 0
# Determine action based on trust score
action = self._determine_action(trust_score)
session.trust_score = trust_score
session.signals = signals
return trust_score, action, signals
def _evaluate_ip_consistency(self, session: SessionContext) -> TrustSignal:
"""Check if IP address has changed during session."""
# SYNTHETIC: Simulate IP check
original_ip = "198.51.100.42" # SYNTHETIC — original auth IP
current_ip = session.ip_address
if current_ip == original_ip:
return TrustSignal(
name="ip_consistency", score=1.0,
weight=self.SIGNAL_WEIGHTS["ip_consistency"],
timestamp=time.time(),
details=f"IP consistent: {current_ip}"
)
else:
return TrustSignal(
name="ip_consistency", score=0.2,
weight=self.SIGNAL_WEIGHTS["ip_consistency"],
timestamp=time.time(),
details=f"IP CHANGED: {original_ip} → {current_ip}"
)
def _evaluate_device_compliance(self, session: SessionContext) -> TrustSignal:
"""Check device compliance state."""
if session.device_compliant:
return TrustSignal(
name="device_compliance", score=1.0,
weight=self.SIGNAL_WEIGHTS["device_compliance"],
timestamp=time.time(),
details="Device compliant"
)
else:
return TrustSignal(
name="device_compliance", score=0.0,
weight=self.SIGNAL_WEIGHTS["device_compliance"],
timestamp=time.time(),
details="DEVICE NON-COMPLIANT — trust degraded"
)
def _evaluate_location(self, session: SessionContext) -> TrustSignal:
"""Check location consistency."""
trusted_locations = ["US-NY-NewYork", "US-NJ-Newark"] # SYNTHETIC
if session.location in trusted_locations:
return TrustSignal(
name="location_consistency", score=1.0,
weight=self.SIGNAL_WEIGHTS["location_consistency"],
timestamp=time.time(),
details=f"Trusted location: {session.location}"
)
else:
return TrustSignal(
name="location_consistency", score=0.3,
weight=self.SIGNAL_WEIGHTS["location_consistency"],
timestamp=time.time(),
details=f"UNTRUSTED location: {session.location}"
)
def _evaluate_behavior(self, session: SessionContext) -> TrustSignal:
"""Evaluate behavioral analytics."""
# SYNTHETIC behavioral analysis
return TrustSignal(
name="behavior_normal", score=0.85,
weight=self.SIGNAL_WEIGHTS["behavior_normal"],
timestamp=time.time(),
details="Behavior within normal parameters"
)
def _evaluate_session_age(self, session: SessionContext) -> TrustSignal:
"""Evaluate session age — older sessions get lower trust."""
session_age_hours = (time.time() - session.auth_time) / 3600
if session_age_hours < 1:
score = 1.0
elif session_age_hours < 4:
score = 0.8
elif session_age_hours < 8:
score = 0.5
else:
score = 0.1
return TrustSignal(
name="session_age", score=score,
weight=self.SIGNAL_WEIGHTS["session_age"],
timestamp=time.time(),
details=f"Session age: {session_age_hours:.1f} hours"
)
def _evaluate_privilege_usage(self, session: SessionContext) -> TrustSignal:
"""Check for unusual privilege escalation patterns."""
return TrustSignal(
name="privilege_usage", score=0.9,
weight=self.SIGNAL_WEIGHTS["privilege_usage"],
timestamp=time.time(),
details="Privilege usage within normal bounds"
)
def _evaluate_data_access(self, session: SessionContext) -> TrustSignal:
"""Check for anomalous data access patterns."""
return TrustSignal(
name="data_access_pattern", score=0.95,
weight=self.SIGNAL_WEIGHTS["data_access_pattern"],
timestamp=time.time(),
details="Data access pattern normal"
)
def _evaluate_impossible_travel(self, session: SessionContext) -> TrustSignal:
"""Detect impossible travel scenarios."""
# SYNTHETIC: No impossible travel detected
return TrustSignal(
name="impossible_travel", score=1.0,
weight=self.SIGNAL_WEIGHTS["impossible_travel"],
timestamp=time.time(),
details="No impossible travel detected"
)
def _determine_action(self, trust_score: float) -> TrustAction:
"""Determine the appropriate action based on trust score."""
if trust_score >= self.THRESHOLDS["full_access"]:
return TrustAction.CONTINUE
elif trust_score >= self.THRESHOLDS["step_up_mfa"]:
return TrustAction.STEP_UP_MFA
elif trust_score >= self.THRESHOLDS["restrict_access"]:
return TrustAction.RESTRICT
else:
return TrustAction.TERMINATE
# ============================================================
# Example: Continuous trust evaluation scenarios
# ============================================================
if __name__ == "__main__":
engine = ContinuousTrustEngine()
# Scenario 1: Normal session — all signals healthy
print("=" * 60)
print("SCENARIO 1: Normal Session")
print("=" * 60)
normal_session = SessionContext(
session_id="sess-001-synthetic",
user_id="user-001", # SYNTHETIC
user_email="testuser@apex.example.com", # SYNTHETIC
auth_time=time.time() - 1800, # 30 min ago
ip_address="198.51.100.42", # SYNTHETIC
device_id="device-apex-ws-1042", # SYNTHETIC
device_compliant=True,
user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64)",
location="US-NY-NewYork" # SYNTHETIC
)
score, action, signals = engine.evaluate_session(normal_session)
print(f"Trust Score: {score:.1f}/100")
print(f"Action: {action.value}")
for s in signals:
status = "OK" if s.score >= 0.7 else "WARN" if s.score >= 0.4 else "FAIL"
print(f" [{status}] {s.name}: {s.score:.2f} — {s.details}")
# Scenario 2: Compromised session — IP changed, device non-compliant
print(f"\n{'=' * 60}")
print("SCENARIO 2: Compromised Session")
print("=" * 60)
compromised_session = SessionContext(
session_id="sess-002-synthetic",
user_id="user-002", # SYNTHETIC
user_email="analyst@apex.example.com", # SYNTHETIC
auth_time=time.time() - 36000, # 10 hours ago
ip_address="203.0.113.99", # SYNTHETIC — different IP
device_id="device-unknown-9999", # SYNTHETIC
device_compliant=False,
user_agent="python-requests/2.31.0", # Suspicious UA
location="RU-MOW-Moscow" # SYNTHETIC — untrusted
)
score, action, signals = engine.evaluate_session(compromised_session)
print(f"Trust Score: {score:.1f}/100")
print(f"Action: {action.value}")
for s in signals:
status = "OK" if s.score >= 0.7 else "WARN" if s.score >= 0.4 else "FAIL"
print(f" [{status}] {s.name}: {s.score:.2f} — {s.details}")
5.3 Exercise: Step-Up Authentication Triggers¶
Step 2: Configure Step-Up Authentication Rules¶
{
"step_up_auth_rules": [
{
"rule_id": "STEP-001",
"name": "High-Value Transaction Step-Up",
"description": "Require re-authentication for transactions exceeding $10,000",
"trigger": {
"type": "transaction_amount",
"operator": "greater_than",
"value": 10000,
"currency": "USD"
},
"action": {
"type": "step_up_mfa",
"method": "push_notification",
"timeout_seconds": 120,
"fallback_method": "phone_call"
},
"scope": {
"applications": ["apex-financial-portal"],
"endpoints": ["/api/v2/transactions"],
"methods": ["POST", "PUT"]
}
},
{
"rule_id": "STEP-002",
"name": "Sensitive Data Export Step-Up",
"description": "Require re-authentication for bulk data export operations",
"trigger": {
"type": "data_export",
"operator": "record_count_exceeds",
"value": 100
},
"action": {
"type": "step_up_mfa",
"method": "authenticator_totp",
"timeout_seconds": 60,
"max_attempts": 3
},
"scope": {
"applications": ["apex-financial-portal"],
"endpoints": ["/api/v2/reports/export", "/api/v2/accounts/export"],
"methods": ["GET", "POST"]
}
},
{
"rule_id": "STEP-003",
"name": "Admin Action Step-Up",
"description": "Require re-authentication for all administrative actions",
"trigger": {
"type": "role_action",
"roles": ["admin", "security_admin"],
"action_categories": ["user_management", "policy_change", "audit_log_access"]
},
"action": {
"type": "step_up_mfa",
"method": "fido2_webauthn",
"timeout_seconds": 30,
"require_user_presence": true
},
"scope": {
"applications": ["apex-admin-portal"],
"endpoints": ["/api/v2/admin/*"],
"methods": ["POST", "PUT", "DELETE", "PATCH"]
}
},
{
"rule_id": "STEP-004",
"name": "Anomalous Behavior Step-Up",
"description": "Require re-authentication when behavioral analytics detect anomalies",
"trigger": {
"type": "trust_score_degradation",
"operator": "drops_below",
"value": 70,
"evaluation_window_minutes": 15
},
"action": {
"type": "step_up_mfa",
"method": "push_notification",
"timeout_seconds": 120,
"on_failure": "terminate_session"
},
"scope": {
"applications": ["*"],
"endpoints": ["*"],
"methods": ["*"]
}
},
{
"rule_id": "STEP-005",
"name": "IP Change Step-Up",
"description": "Require re-authentication when source IP changes mid-session",
"trigger": {
"type": "ip_change",
"exclude_known_vpn_ranges": true,
"known_vpn_ranges": [
"198.51.100.0/24",
"203.0.113.0/24"
]
},
"action": {
"type": "step_up_mfa",
"method": "authenticator_totp",
"timeout_seconds": 60,
"on_failure": "terminate_session"
},
"scope": {
"applications": ["*"],
"endpoints": ["*"],
"methods": ["*"]
}
}
],
"global_settings": {
"max_step_up_per_session": 5,
"cooldown_after_step_up_seconds": 300,
"log_all_step_up_events": true,
"siem_integration": {
"endpoint": "https://siem.apex.example.com/api/events",
"format": "CEF",
"severity_mapping": {
"step_up_success": "low",
"step_up_failure": "high",
"session_terminated": "critical"
}
}
}
}
5.4 Detection: Continuous Verification Monitoring¶
KQL Detection Queries¶
// ============================================================
// KQL: Impossible Travel Detection
// ============================================================
let MaxTravelSpeedKmh = 900; // Max plausible speed (flight)
SigninLogs
| where TimeGenerated > ago(24h)
| where ResultType == 0 // Successful sign-ins
| extend Latitude = toreal(LocationDetails.geoCoordinates.latitude)
| extend Longitude = toreal(LocationDetails.geoCoordinates.longitude)
| extend City = tostring(LocationDetails.city)
| extend Country = tostring(LocationDetails.countryOrRegion)
| order by UserPrincipalName, TimeGenerated asc
| serialize
| extend PrevTime = prev(TimeGenerated, 1)
| extend PrevLat = prev(Latitude, 1)
| extend PrevLon = prev(Longitude, 1)
| extend PrevCity = prev(City, 1)
| extend PrevUser = prev(UserPrincipalName, 1)
| where UserPrincipalName == PrevUser
| extend TimeDiffHours = datetime_diff('second', TimeGenerated, PrevTime) / 3600.0
| where TimeDiffHours > 0
| extend DistanceKm = geo_distance_2points(Longitude, Latitude, PrevLon, PrevLat) / 1000
| extend SpeedKmh = DistanceKm / TimeDiffHours
| where SpeedKmh > MaxTravelSpeedKmh
and DistanceKm > 500 // Minimum 500km to reduce false positives
| project TimeGenerated, UserPrincipalName,
FromCity = PrevCity, ToCity = City,
DistanceKm = round(DistanceKm, 0),
TimeDiffHours = round(TimeDiffHours, 2),
SpeedKmh = round(SpeedKmh, 0),
IPAddress
| order by SpeedKmh desc
// ============================================================
// KQL: Session Anomaly — Multiple IPs in Short Timeframe
// ============================================================
SigninLogs
| where TimeGenerated > ago(1h)
| where ResultType == 0
| summarize
UniqueIPs = dcount(IPAddress),
IPs = make_set(IPAddress),
UniqueLocations = dcount(tostring(LocationDetails.city)),
Locations = make_set(tostring(LocationDetails.city)),
SignInCount = count()
by UserPrincipalName, bin(TimeGenerated, 15m)
| where UniqueIPs > 3
| order by UniqueIPs desc
// ============================================================
// KQL: Trust Score Degradation Events
// ============================================================
let TrustScoreLogs = datatable(
TimeGenerated: datetime,
SessionId: string,
UserPrincipalName: string,
TrustScore: real,
Action: string,
DegradedSignals: string
) [
datetime("2026-06-03T14:30:00Z"), "sess-001", "testuser@apex.example.com", 45.2, "step_up_mfa", "ip_changed,device_noncompliant",
datetime("2026-06-03T14:35:00Z"), "sess-001", "testuser@apex.example.com", 32.1, "terminate", "ip_changed,device_noncompliant,impossible_travel",
datetime("2026-06-03T15:10:00Z"), "sess-002", "analyst@apex.example.com", 68.5, "step_up_mfa", "session_age_exceeded",
datetime("2026-06-03T16:00:00Z"), "sess-003", "admin@apex.example.com", 28.0, "terminate", "ip_changed,untrusted_location,anomalous_behavior"
];
TrustScoreLogs
| where TrustScore < 50
| extend SignalArray = split(DegradedSignals, ",")
| mv-expand Signal = SignalArray
| summarize
AvgTrustScore = round(avg(TrustScore), 1),
MinTrustScore = round(min(TrustScore), 1),
TerminationCount = countif(Action == "terminate"),
StepUpCount = countif(Action == "step_up_mfa")
by UserPrincipalName, tostring(Signal)
| order by MinTrustScore asc
// ============================================================
// KQL: Step-Up MFA Failures (Potential Account Compromise)
// ============================================================
SigninLogs
| where TimeGenerated > ago(24h)
| where AuthenticationRequirement == "multiFactorAuthentication"
| where ResultType != 0 // Failed MFA
| extend MFAMethod = tostring(MfaDetail.authMethod)
| extend MFAResult = tostring(MfaDetail.authDetail)
| summarize
MFAFailures = count(),
Methods = make_set(MFAMethod),
Results = make_set(MFAResult),
IPs = make_set(IPAddress),
FirstAttempt = min(TimeGenerated),
LastAttempt = max(TimeGenerated)
by UserPrincipalName
| where MFAFailures > 3
| extend TimespanMinutes = datetime_diff('minute', LastAttempt, FirstAttempt)
| where TimespanMinutes < 30 // Multiple failures in short window
| order by MFAFailures desc
SPL Detection Queries¶
// ============================================================
// SPL: Impossible Travel Detection
// ============================================================
index=azure sourcetype="azure:signinlogs"
| spath "locationDetails.geoCoordinates.latitude" output=lat
| spath "locationDetails.geoCoordinates.longitude" output=lon
| spath "locationDetails.city" output=city
| search resultType=0
| sort 0 userPrincipalName, _time
| streamstats current=f last(lat) as prev_lat,
last(lon) as prev_lon,
last(city) as prev_city,
last(_time) as prev_time
by userPrincipalName
| eval time_diff_hours=(_time - prev_time) / 3600
| where time_diff_hours > 0
| eval distance_km=round(
6371 * acos(
cos(lat * 3.14159/180) * cos(prev_lat * 3.14159/180) *
cos((prev_lon - lon) * 3.14159/180) +
sin(lat * 3.14159/180) * sin(prev_lat * 3.14159/180)
), 0)
| eval speed_kmh=round(distance_km / time_diff_hours, 0)
| where speed_kmh > 900 AND distance_km > 500
| table _time, userPrincipalName, prev_city, city,
distance_km, time_diff_hours, speed_kmh, ipAddress
| sort -speed_kmh
// ============================================================
// SPL: Step-Up MFA Failures
// ============================================================
index=azure sourcetype="azure:signinlogs"
| spath "authenticationRequirement"
| spath "mfaDetail.authMethod"
| spath "mfaDetail.authDetail"
| search authenticationRequirement="multiFactorAuthentication"
resultType!=0
| stats count as MFAFailures,
values(mfaDetail.authMethod) as Methods,
values(mfaDetail.authDetail) as Results,
values(ipAddress) as IPs,
min(_time) as FirstAttempt,
max(_time) as LastAttempt
by userPrincipalName
| where MFAFailures > 3
| eval timespan_min=round((LastAttempt - FirstAttempt) / 60, 1)
| where timespan_min < 30
| sort -MFAFailures
Phase 6: Zero Trust Bypass Detection¶
6.1 Understanding Zero Trust Bypass Techniques¶
Attackers specifically target zero trust control points to bypass access restrictions. Common bypass techniques include:
| Bypass Technique | Target Control | MITRE ATT&CK |
|---|---|---|
| Token replay / session hijacking | Identity layer | T1550.001 — Application Access Token |
| Conditional Access policy evasion | Policy engine | T1556 — Modify Authentication Process |
| Device compliance spoofing | Device trust | T1036 — Masquerading |
| MFA fatigue / push bombing | Authentication | T1621 — Multi-Factor Authentication Request Generation |
| OAuth consent phishing | Application authorization | T1528 — Steal Application Access Token |
| Service principal abuse | Application identity | T1098.001 — Additional Cloud Credentials |
| VPN/proxy location spoofing | Location controls | T1090 — Proxy |
| Token claims manipulation | Application-layer auth | T1134 — Access Token Manipulation |
6.2 Exercise: Detect Conditional Access Bypass¶
Step 1: Policy Evasion Detection¶
# ============================================================
# Detect users who successfully authenticated but had NO
# Conditional Access policies evaluated (potential bypass)
# ============================================================
$query = @"
SigninLogs
| where TimeGenerated > ago(24h)
| where ResultType == 0
| where ConditionalAccessStatus == "notApplied"
| extend PolicyCount = array_length(parse_json(ConditionalAccessPolicies))
| where PolicyCount == 0 or isnull(PolicyCount)
| project TimeGenerated, UserPrincipalName, AppDisplayName,
IPAddress, ClientAppUsed, DeviceDetail,
UserAgent, ConditionalAccessStatus
| summarize Count=count() by UserPrincipalName, AppDisplayName,
ClientAppUsed
| order by Count desc
"@
# Execute via Sentinel API (SYNTHETIC endpoint)
$headers = @{
"Authorization" = "Bearer REDACTED"
"Content-Type" = "application/json"
}
$body = @{
query = $query
timespan = "P1D"
} | ConvertTo-Json
$response = Invoke-RestMethod -Method POST `
-Uri "https://api.loganalytics.io/v1/workspaces/SYNTHETIC-WORKSPACE-ID/query" `
-Headers $headers -Body $body
Write-Host "`n=== CONDITIONAL ACCESS BYPASS CANDIDATES ===" -ForegroundColor Red
$response.tables[0].rows | ForEach-Object {
Write-Host " User: $($_[0]) App: $($_[1]) Client: $($_[2]) Count: $($_[3])" -ForegroundColor Yellow
}
Step 2: Token Replay Detection¶
#!/usr/bin/env python3
"""
Token Replay Detection Engine.
Identifies potential token replay attacks by detecting:
1. Same token used from multiple IPs
2. Token used after user session termination
3. Token used with mismatched device fingerprint
ALL VALUES ARE SYNTHETIC — for educational purposes only.
"""
import json
import hashlib
import time
from collections import defaultdict
class TokenReplayDetector:
"""Detects potential token replay attacks."""
def __init__(self):
# Track token usage: token_hash -> [(timestamp, ip, device, user_agent)]
self.token_usage = defaultdict(list)
# Track revoked tokens
self.revoked_tokens = set()
# Alerts
self.alerts = []
def record_token_usage(self, token_hash: str, ip: str,
device_id: str, user_agent: str,
user_id: str) -> list:
"""
Record a token usage event and check for replay indicators.
Returns list of alerts (empty if no issues).
"""
timestamp = time.time()
new_alerts = []
# Check 1: Is token revoked?
if token_hash in self.revoked_tokens:
alert = {
"type": "TOKEN_REPLAY_REVOKED",
"severity": "CRITICAL",
"description": f"Revoked token used by {user_id} from {ip}",
"timestamp": timestamp,
"indicators": {
"token_hash": token_hash[:16] + "...",
"ip_address": ip,
"device_id": device_id,
"user_id": user_id,
}
}
new_alerts.append(alert)
# Record usage
self.token_usage[token_hash].append({
"timestamp": timestamp,
"ip": ip,
"device_id": device_id,
"user_agent": user_agent,
"user_id": user_id,
})
usages = self.token_usage[token_hash]
# Check 2: Same token from multiple IPs
unique_ips = set(u["ip"] for u in usages)
if len(unique_ips) > 1:
alert = {
"type": "TOKEN_MULTI_IP",
"severity": "HIGH",
"description": f"Token for {user_id} used from {len(unique_ips)} IPs",
"timestamp": timestamp,
"indicators": {
"token_hash": token_hash[:16] + "...",
"ip_addresses": list(unique_ips),
"user_id": user_id,
"usage_count": len(usages),
}
}
new_alerts.append(alert)
# Check 3: Same token from multiple devices
unique_devices = set(u["device_id"] for u in usages)
if len(unique_devices) > 1:
alert = {
"type": "TOKEN_DEVICE_MISMATCH",
"severity": "HIGH",
"description": f"Token for {user_id} used from {len(unique_devices)} devices",
"timestamp": timestamp,
"indicators": {
"token_hash": token_hash[:16] + "...",
"devices": list(unique_devices),
"user_id": user_id,
}
}
new_alerts.append(alert)
# Check 4: Abnormally high usage rate
recent_usages = [u for u in usages
if timestamp - u["timestamp"] < 60] # Last 60 seconds
if len(recent_usages) > 10:
alert = {
"type": "TOKEN_HIGH_FREQUENCY",
"severity": "MEDIUM",
"description": f"Token for {user_id} used {len(recent_usages)} times in 60s",
"timestamp": timestamp,
"indicators": {
"token_hash": token_hash[:16] + "...",
"usage_rate": len(recent_usages),
"window_seconds": 60,
"user_id": user_id,
}
}
new_alerts.append(alert)
self.alerts.extend(new_alerts)
return new_alerts
def revoke_token(self, token_hash: str, reason: str):
"""Revoke a token and track the revocation."""
self.revoked_tokens.add(token_hash)
print(f"[TOKEN REVOKED] {token_hash[:16]}... Reason: {reason}")
# ============================================================
# Simulation: Token replay attack detection
# ============================================================
if __name__ == "__main__":
detector = TokenReplayDetector()
# Simulate legitimate token usage
token_hash = hashlib.sha256(b"SYNTHETIC-JWT-TOKEN-001").hexdigest()
print("=" * 60)
print("TOKEN REPLAY DETECTION SIMULATION")
print("=" * 60)
# Normal usage from legitimate user
print("\n[1] Normal usage — legitimate user")
alerts = detector.record_token_usage(
token_hash=token_hash,
ip="198.51.100.42", # SYNTHETIC
device_id="device-apex-ws-1042", # SYNTHETIC
user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64)",
user_id="testuser@apex.example.com" # SYNTHETIC
)
print(f" Alerts: {len(alerts)}")
# Attacker replays stolen token from different IP
print("\n[2] Token replay — attacker uses stolen token")
alerts = detector.record_token_usage(
token_hash=token_hash,
ip="203.0.113.99", # SYNTHETIC — attacker IP
device_id="device-attacker-x1", # SYNTHETIC
user_agent="python-requests/2.31.0",
user_id="testuser@apex.example.com" # SYNTHETIC
)
print(f" Alerts: {len(alerts)}")
for alert in alerts:
print(f" [{alert['severity']}] {alert['type']}: {alert['description']}")
# Token gets revoked, attacker tries again
print("\n[3] Post-revocation — attacker retries")
detector.revoke_token(token_hash, "Suspicious multi-IP usage detected")
alerts = detector.record_token_usage(
token_hash=token_hash,
ip="203.0.113.99", # SYNTHETIC
device_id="device-attacker-x1", # SYNTHETIC
user_agent="python-requests/2.31.0",
user_id="testuser@apex.example.com" # SYNTHETIC
)
print(f" Alerts: {len(alerts)}")
for alert in alerts:
print(f" [{alert['severity']}] {alert['type']}: {alert['description']}")
6.3 Exercise: MFA Fatigue Attack Detection¶
MFA fatigue (also known as MFA push bombing or prompt bombing) occurs when an attacker repeatedly triggers MFA push notifications to the legitimate user, hoping they will eventually approve one out of frustration or confusion.
Step 3: Detect MFA Fatigue Attacks¶
// ============================================================
// KQL: MFA Fatigue / Push Bombing Detection
// ============================================================
SigninLogs
| where TimeGenerated > ago(1h)
| where ResultType != 0 // Failed sign-in
| where MfaDetail.authMethod == "Push notification"
or MfaDetail.authMethod == "Microsoft Authenticator"
| summarize
PushAttempts = count(),
UniqueIPs = dcount(IPAddress),
IPs = make_set(IPAddress),
FirstAttempt = min(TimeGenerated),
LastAttempt = max(TimeGenerated),
// Check if any attempt eventually succeeded
SuccessFollowed = countif(ResultType == 0)
by UserPrincipalName
| extend DurationMinutes = datetime_diff('minute', LastAttempt, FirstAttempt)
| where PushAttempts >= 5
and DurationMinutes <= 30 // 5+ attempts in 30 minutes
| extend RiskLevel = case(
PushAttempts >= 20, "CRITICAL",
PushAttempts >= 10, "HIGH",
PushAttempts >= 5, "MEDIUM",
"LOW"
)
| project UserPrincipalName, PushAttempts, DurationMinutes,
UniqueIPs, IPs, RiskLevel, SuccessFollowed,
FirstAttempt, LastAttempt
| order by PushAttempts desc
// ============================================================
// SPL: MFA Fatigue / Push Bombing Detection
// ============================================================
index=azure sourcetype="azure:signinlogs"
| spath "mfaDetail.authMethod"
| search resultType!=0
("mfaDetail.authMethod"="Push notification"
OR "mfaDetail.authMethod"="Microsoft Authenticator")
| stats count as PushAttempts,
dc(ipAddress) as UniqueIPs,
values(ipAddress) as IPs,
min(_time) as FirstAttempt,
max(_time) as LastAttempt,
count(eval(resultType=0)) as SuccessFollowed
by userPrincipalName
| eval duration_min=round((LastAttempt - FirstAttempt) / 60, 1)
| where PushAttempts >= 5 AND duration_min <= 30
| eval risk=case(
PushAttempts >= 20, "CRITICAL",
PushAttempts >= 10, "HIGH",
PushAttempts >= 5, "MEDIUM",
1=1, "LOW"
)
| sort -PushAttempts
6.4 Exercise: Device Compliance Spoofing Detection¶
Step 4: Detect Device Spoofing Attempts¶
// ============================================================
// KQL: Device ID Spoofing — Same Device ID from Multiple Users
// ============================================================
SigninLogs
| where TimeGenerated > ago(7d)
| where ResultType == 0
| where isnotempty(DeviceDetail.deviceId)
| extend DeviceId = tostring(DeviceDetail.deviceId)
| extend DeviceOS = tostring(DeviceDetail.operatingSystem)
| summarize
UniqueUsers = dcount(UserPrincipalName),
Users = make_set(UserPrincipalName),
UniqueIPs = dcount(IPAddress),
IPs = make_set(IPAddress),
AccessCount = count()
by DeviceId, DeviceOS
| where UniqueUsers > 1 // Same device ID used by multiple users
| order by UniqueUsers desc
// ============================================================
// KQL: Device Compliance State Flip-Flopping
// Detects devices that rapidly toggle between compliant/non-compliant
// May indicate compliance spoofing or MDM manipulation
// ============================================================
IntuneDeviceComplianceOrg
| where TimeGenerated > ago(7d)
| summarize
ComplianceChanges = dcount(ComplianceState),
States = make_set(ComplianceState),
StateCount = count()
by DeviceName, bin(TimeGenerated, 1d)
| where ComplianceChanges > 2 // Multiple state changes per day
| order by StateCount desc
// ============================================================
// KQL: Suspicious Device Registration — Bulk Device Join
// ============================================================
AuditLogs
| where TimeGenerated > ago(24h)
| where OperationName in (
"Add device",
"Register device",
"Add registered owner to device"
)
| extend InitiatedBy = tostring(InitiatedBy.user.userPrincipalName)
| extend DeviceName = tostring(TargetResources[0].displayName)
| summarize
DevicesRegistered = dcount(DeviceName),
Devices = make_set(DeviceName)
by InitiatedBy, bin(TimeGenerated, 1h)
| where DevicesRegistered > 3 // More than 3 devices in 1 hour
| order by DevicesRegistered desc
6.5 Exercise: Service Principal and OAuth Bypass Detection¶
// ============================================================
// KQL: Service Principal Credential Added (Persistence)
// ============================================================
AuditLogs
| where TimeGenerated > ago(24h)
| where OperationName in (
"Add service principal credentials",
"Update application – Certificates and secrets management"
)
| extend InitiatedBy = tostring(InitiatedBy.user.userPrincipalName)
| extend AppName = tostring(TargetResources[0].displayName)
| extend AppId = tostring(TargetResources[0].id)
| extend ModifiedProperty = TargetResources[0].modifiedProperties
| project TimeGenerated, InitiatedBy, AppName, AppId,
OperationName, ModifiedProperty, CorrelationId
| order by TimeGenerated desc
// ============================================================
// KQL: OAuth Consent Grant — Illicit App Authorization
// ============================================================
AuditLogs
| where TimeGenerated > ago(7d)
| where OperationName == "Consent to application"
| extend ConsentedBy = tostring(InitiatedBy.user.userPrincipalName)
| extend AppDisplayName = tostring(TargetResources[0].displayName)
| extend Permissions = tostring(TargetResources[0].modifiedProperties)
| where Permissions contains "ReadWrite" or
Permissions contains "Mail.Read" or
Permissions contains "Files.ReadWrite" or
Permissions contains "Directory.Read"
| project TimeGenerated, ConsentedBy, AppDisplayName,
Permissions, CorrelationId
| order by TimeGenerated desc
// ============================================================
// SPL: Service Principal Credential Addition
// ============================================================
index=azure sourcetype="azure:auditlogs"
| spath "operationName"
| search operationName="Add service principal credentials"
OR operationName="Update application*Certificates*"
| spath output=initiatedBy "initiatedBy.user.userPrincipalName"
| spath output=appName "targetResources{}.displayName"
| stats count as CredentialAdditions,
values(appName) as Applications,
values(operationName) as Operations
by initiatedBy
| sort -CredentialAdditions
// ============================================================
// SPL: OAuth Consent Grant Detection
// ============================================================
index=azure sourcetype="azure:auditlogs"
| spath "operationName"
| search operationName="Consent to application"
| spath output=consentedBy "initiatedBy.user.userPrincipalName"
| spath output=appName "targetResources{}.displayName"
| spath output=permissions "targetResources{}.modifiedProperties{}.newValue"
| search permissions="*ReadWrite*" OR permissions="*Mail.Read*"
OR permissions="*Files.ReadWrite*" OR permissions="*Directory.Read*"
| table _time, consentedBy, appName, permissions
| sort -_time
6.6 Comprehensive Zero Trust Monitoring Dashboard¶
// ============================================================
// KQL: Zero Trust Health Dashboard — Unified Metrics
// ============================================================
// Metric 1: Overall Conditional Access Compliance Rate
let CACompliance = SigninLogs
| where TimeGenerated > ago(24h)
| where ResultType == 0
| summarize
TotalSignins = count(),
CAApplied = countif(ConditionalAccessStatus == "success"),
CANotApplied = countif(ConditionalAccessStatus == "notApplied"),
CAFailed = countif(ConditionalAccessStatus == "failure")
| extend ComplianceRate = round(todecimal(CAApplied) / todecimal(TotalSignins) * 100, 1);
// Metric 2: Device Compliance Rate
let DeviceCompliance = SigninLogs
| where TimeGenerated > ago(24h)
| where ResultType == 0
| where isnotempty(DeviceDetail.deviceId)
| summarize
TotalDevices = dcount(tostring(DeviceDetail.deviceId)),
CompliantDevices = dcountif(tostring(DeviceDetail.deviceId),
DeviceDetail.isCompliant == true),
NonCompliantDevices = dcountif(tostring(DeviceDetail.deviceId),
DeviceDetail.isCompliant == false)
| extend DeviceComplianceRate = round(
todecimal(CompliantDevices) / todecimal(TotalDevices) * 100, 1);
// Metric 3: Legacy Auth Block Rate
let LegacyAuthMetric = SigninLogs
| where TimeGenerated > ago(24h)
| where ClientAppUsed in ("Exchange ActiveSync", "IMAP4", "POP3", "SMTP")
| summarize
LegacyAttempts = count(),
LegacyBlocked = countif(ConditionalAccessStatus == "failure"
or ResultType != 0),
LegacySucceeded = countif(ResultType == 0)
| extend LegacyBlockRate = iff(LegacyAttempts > 0,
round(todecimal(LegacyBlocked) / todecimal(LegacyAttempts) * 100, 1),
100.0);
// Combine metrics
CACompliance
| join kind=fullouter DeviceCompliance on $left.TotalSignins == $right.TotalDevices
| join kind=fullouter LegacyAuthMetric on $left.TotalSignins == $right.LegacyAttempts
| project
ZT_Metric = "Zero Trust Health",
SignIn_CA_ComplianceRate = ComplianceRate,
Device_ComplianceRate = DeviceComplianceRate,
Legacy_Auth_BlockRate = LegacyBlockRate,
Total_SignIns = TotalSignins,
CA_Bypassed = CANotApplied,
NonCompliant_Devices = NonCompliantDevices,
Legacy_Auth_Succeeded = LegacySucceeded
Challenge Questions¶
Scenario-Based Challenge Questions
Test your understanding of zero trust architecture with these scenario-based questions. Each question requires analysis of the concepts covered in this lab.
Challenge 1: Conditional Access Gap Analysis¶
Scenario: You discover that a user (analyst@apex.example.com) successfully signed into the Apex Financial Portal from IP 203.0.113.250 (an IP address not in any named location) using a Chrome browser on an unmanaged Windows device. The sign-in logs show ConditionalAccessStatus = "notApplied". The user has the "Finance-Department" group membership.
Questions:
- Which of the five Conditional Access policies defined in Phase 1 should have applied to this sign-in? Why did they not apply?
- What specific policy gap allowed this access?
- Write a KQL query to find all similar bypasses in the last 30 days.
- Propose a remediation — a new Conditional Access policy that would prevent this scenario.
Challenge 2: Microsegmentation Bypass¶
Scenario: During a purple team exercise, the red team compromised a pod labeled app=apex-frontend,tier=web in the apex-financial namespace. From this pod, they were unable to reach the database directly (blocked by NetworkPolicy). However, they discovered that the cache tier (Redis at 10.10.20.50:6379) was accessible from the frontend tier, and Redis was configured without authentication. They used the Redis SLAVEOF command to replicate data from the database tier's Redis instance.
Questions:
- Why was the frontend pod able to reach the cache tier despite the microsegmentation policies?
- What NetworkPolicy change would prevent this lateral movement path?
- What application-layer control should have been in place on Redis?
- Write a detection query (KQL or SPL) to identify Redis
SLAVEOFcommands in network traffic.
Challenge 3: Token Replay After Session Termination¶
Scenario: A user's session was terminated by the Continuous Trust Engine due to impossible travel detection (trust score dropped to 28). However, 15 minutes later, the SOC notices that API requests are still being processed with the user's access token from the suspicious IP (203.0.113.99).
Questions:
- Why did session termination not prevent continued API access?
- What is the difference between session revocation and token revocation?
- What architectural change would ensure immediate token invalidation?
- Design a token binding mechanism that would prevent this specific replay scenario.
Challenge 4: MFA Fatigue Success¶
Scenario: The SOC detects 47 MFA push notification attempts for admin@apex.example.com over a 20-minute window from IP 192.0.2.200. On attempt #48, the sign-in succeeded — the administrator approved the push notification. Post-incident analysis reveals the attacker had obtained the admin's password from a credential dump.
Questions:
- What defensive controls could have prevented the MFA fatigue attack from succeeding?
- Should the Conditional Access policy have blocked the sign-in attempts after a threshold?
- What additional context should the MFA push notification have displayed?
- Write a Sentinel automation rule that automatically disables the user account after 10 failed MFA pushes in 15 minutes.
Challenge 5: Zero Trust Architecture Maturity¶
Scenario: Apex Financial Services wants to move from their current zero trust implementation (Phases 1-3) to a fully mature zero trust architecture. The CISO has asked you to produce a maturity assessment and roadmap.
Questions:
- Map the six phases of this lab to the CISA Zero Trust Maturity Model pillars (Identity, Devices, Networks, Applications & Workloads, Data). Which pillars have the strongest coverage? Which have gaps?
- What is the most critical gap in the current implementation?
- Propose a 90-day roadmap with specific milestones for advancing from "Advanced" to "Optimal" maturity.
- How would you measure the effectiveness of zero trust implementation beyond compliance checkboxes?
Scoring Rubric¶
| Phase | Exercise | Points | Criteria |
|---|---|---|---|
| Phase 1 | Conditional Access Policy Design | 15 | All 5 policies correctly configured with proper conditions, grant controls, and exclusions |
| Phase 1 | Policy Evaluation Validation | 10 | What-If analysis correctly identifies policy conflicts and evaluation order |
| Phase 1 | MFA Enforcement Testing | 10 | Successfully validates MFA bypass scenarios (legacy auth block, ROPC block) |
| Phase 1 | Detection Queries (KQL + SPL) | 10 | Working queries for CA failures, legacy auth, MFA registration anomalies |
| Phase 2 | Device Compliance Policy | 10 | Compliance policy covers all required health checks (TPM, BitLocker, Defender, OS version) |
| Phase 2 | Certificate-Based Auth | 10 | CA configuration and user mapping correctly implemented |
| Phase 2 | Health Attestation Validation | 5 | Device health check script correctly evaluates all attestation signals |
| Phase 2 | Detection Queries (KQL + SPL) | 5 | Working queries for non-compliant device access and compliance drift |
| Phase 3 | Default Deny + DNS Policy | 10 | Correct default-deny-all with DNS allowance (pods can still resolve names) |
| Phase 3 | Tiered NetworkPolicies | 10 | Frontend → API → DB chain correctly configured with proper selectors |
| Phase 3 | Lateral Movement Testing | 10 | All four test scenarios executed with correct expected results documented |
| Phase 3 | Detection Queries (KQL + SPL) | 5 | Working queries for east-west violations and cross-segment communication |
| Phase 4 | Identity-Aware Proxy | 10 | IAP correctly configured with OAuth2 proxy, session management, and security headers |
| Phase 4 | Service Mesh mTLS | 10 | Strict mTLS enforced, authorization policies correctly scope service-to-service access |
| Phase 4 | JWT Validation Middleware | 10 | Algorithm restriction, claim validation, RBAC decorator, and object-level authorization |
| Phase 4 | Detection Queries (KQL + SPL) | 5 | Working queries for JWT failures and mTLS violations |
| Phase 5 | Continuous Trust Engine | 15 | All 8 trust signals evaluated, weighted scoring, correct action thresholds |
| Phase 5 | Step-Up Auth Rules | 5 | At least 4 step-up rules with appropriate triggers and escalation actions |
| Phase 5 | Detection Queries (KQL + SPL) | 10 | Working queries for impossible travel, session anomalies, and step-up failures |
| Phase 6 | CA Bypass Detection | 10 | Queries identify policy evasion and gap analysis |
| Phase 6 | Token Replay Detection | 10 | Detection engine identifies multi-IP, multi-device, and post-revocation replay |
| Phase 6 | MFA Fatigue Detection | 5 | Query correctly identifies push bombing patterns with risk scoring |
| Phase 6 | Device Spoofing Detection | 5 | Queries identify device ID spoofing and compliance state manipulation |
| Phase 6 | OAuth/SP Abuse Detection | 5 | Queries detect service principal credential additions and illicit consent grants |
| Challenge | Challenge Questions (5) | 25 | 5 points each — thorough analysis, specific technical answers, actionable remediation |
| Total | 250 |
Grading Scale
- 225-250 (90-100%): Expert — ready for zero trust architecture lead roles
- 200-224 (80-89%): Advanced — strong practitioner, minor gaps
- 175-199 (70-79%): Intermediate — solid foundation, needs practice with advanced scenarios
- 150-174 (60-69%): Developing — review core concepts and re-attempt bypass exercises
- Below 150 (<60%): Review Chapter 39 and prerequisite chapters before reattempting
MITRE ATT&CK Mapping¶
| Technique ID | Technique Name | Lab Phase | Exercise |
|---|---|---|---|
| T1078 | Valid Accounts | Phase 1, 6 | Conditional Access bypass, credential-based access |
| T1078.004 | Valid Accounts: Cloud Accounts | Phase 1 | Azure AD/Entra sign-in testing |
| T1110 | Brute Force | Phase 1, 5 | MFA enforcement, step-up auth |
| T1556 | Modify Authentication Process | Phase 1, 6 | Conditional Access evasion |
| T1556.006 | Multi-Factor Authentication | Phase 6 | MFA fatigue / push bombing |
| T1621 | Multi-Factor Authentication Request Generation | Phase 6 | MFA fatigue detection |
| T1550 | Use Alternate Authentication Material | Phase 4, 6 | JWT token replay, session hijacking |
| T1550.001 | Application Access Token | Phase 4, 6 | Token replay detection |
| T1528 | Steal Application Access Token | Phase 6 | OAuth consent phishing detection |
| T1098 | Account Manipulation | Phase 6 | Service principal credential addition |
| T1098.001 | Additional Cloud Credentials | Phase 6 | Service principal abuse |
| T1134 | Access Token Manipulation | Phase 4 | JWT claims manipulation |
| T1036 | Masquerading | Phase 2, 6 | Device compliance spoofing |
| T1090 | Proxy | Phase 1, 6 | VPN/proxy location spoofing |
| T1570 | Lateral Tool Transfer | Phase 3 | Microsegmentation bypass |
| T1046 | Network Service Discovery | Phase 3 | Segment scanning detection |
| T1021 | Remote Services | Phase 3 | Cross-segment access attempts |
| T1071 | Application Layer Protocol | Phase 4 | Service mesh traffic manipulation |
| T1557 | Adversary-in-the-Middle | Phase 4 | mTLS bypass attempts |
| T1530 | Data from Cloud Storage | Phase 5 | Anomalous data access detection |
Additional Resources¶
- Chapter 39: Zero Trust Implementation — comprehensive zero trust framework and implementation guidance
- Chapter 33: Identity & Access Security — identity management, authentication, and authorization
- Chapter 31: Network Security Architecture — network segmentation, firewalls, and SDN
- Chapter 5: Detection Engineering at Scale — building detection queries and analytics rules
- Chapter 51: Kubernetes Security — container orchestration security and network policies
- Chapter 52: API Security Framework — API authentication, authorization, and rate limiting
- Lab 27: Kubernetes Attack & Defense — hands-on K8s security testing
- Lab 28: API Security Testing — OWASP API Top 10 attack and defense exercises
- CISA Zero Trust Maturity Model — federal zero trust guidance
- NIST SP 800-207: Zero Trust Architecture — foundational ZTA reference
- Microsoft Zero Trust Deployment Guide — practical Azure AD/Entra implementation