Skip to content

Chapter 57: Cloud Forensics & Investigation

Overview

Cloud forensics represents one of the most significant evolutions in digital investigation methodology since the transition from physical disk imaging to live memory analysis. Traditional forensic principles -- preservation, acquisition, analysis, and reporting -- remain foundational, but their application in cloud environments requires fundamental rethinking. Evidence that once resided on physical hard drives you could seize and image now lives in ephemeral containers that exist for seconds, serverless functions that leave no persistent disk artifacts, and multi-tenant storage systems spanning jurisdictions across the globe. The forensic examiner who walks into a cloud investigation with only traditional tools and mindset will fail -- not because the evidence does not exist, but because it exists in forms and locations that traditional methodology never anticipated.

The shared responsibility model that governs cloud security equally governs cloud forensics, but with a critical asymmetry: while the cloud provider manages the infrastructure, the customer is responsible for collecting and preserving evidence from that infrastructure before it disappears. An EC2 instance terminated by an attacker is gone. The EBS volume attached to it can be gone. The memory contents are gone. CloudTrail logs persist, but only if you configured them correctly before the incident. VPC Flow Logs capture network metadata, but only if you enabled them. The fundamental challenge of cloud forensics is not the absence of evidence -- cloud environments actually generate far more telemetry than traditional on-premises systems -- but the ephemerality of that evidence and the pre-configuration required to ensure its availability when an incident occurs.

This chapter provides a comprehensive, provider-specific guide to cloud forensic investigation across AWS, Azure, and GCP. We cover evidence identification, collection, preservation, and analysis for each major cloud service. We address the unique challenges of container and Kubernetes forensics, serverless investigation, and multi-cloud evidence correlation. We provide detection queries in KQL and SPL for identifying malicious activity in cloud environments. We build forensic automation pipelines that reduce evidence collection time from hours to minutes. And we ground everything in three detailed case studies that demonstrate cloud forensic methodology applied to realistic breach scenarios -- all using synthetic data. Every technique connects back to the forensic fundamentals covered in Chapter 27: Digital Forensics and the incident response lifecycle from Chapter 9: Incident Response Lifecycle.

The organizations that invest in cloud forensic readiness before an incident occurs -- enabling logging, configuring retention, testing collection procedures, training investigators -- will resolve cloud security incidents in days. Those that do not will spend weeks reconstructing partial evidence from fragmented sources, often discovering that the most critical evidence was never collected at all.

Educational Content Only

All techniques, architecture diagrams, IP addresses, domain names, and scenarios in this chapter are 100% synthetic and created for educational purposes only. IP addresses use RFC 5737 (192.0.2.0/24, 198.51.100.0/24, 203.0.113.0/24) and RFC 1918 ranges (10.x, 172.16.x, 192.168.x). Domains use *.example.com and *.example. All credentials shown are placeholders (testuser/REDACTED). Organization names such as "CloudNova Corp" or "Stratos Health" are entirely fictional. Never execute offensive techniques without explicit written authorization against systems you own or have written permission to test.

Learning Objectives

By the end of this chapter, students SHALL be able to:

  1. Explain the shared responsibility model for forensics across AWS, Azure, and GCP, identifying which evidence sources the customer controls versus the provider (Comprehension)
  2. Collect forensic evidence from AWS environments using CloudTrail, VPC Flow Logs, EBS snapshots, and memory acquisition tools while maintaining chain of custody (Application)
  3. Collect forensic evidence from Azure environments using Activity Logs, NSG Flow Logs, VM disk snapshots, and Azure AD audit logs (Application)
  4. Collect forensic evidence from GCP environments using Cloud Audit Logs, VPC Flow Logs, Compute Engine snapshots, and IAM policy analysis (Application)
  5. Investigate container and Kubernetes security incidents by analyzing container images, pod logs, etcd state, and service mesh traces (Analysis)
  6. Preserve cloud evidence with cryptographic integrity verification, proper chain of custody documentation, and legally defensible acquisition procedures (Application)
  7. Write detection queries in KQL (Azure) and SPL (AWS via CloudTrail) for cloud-specific attack patterns including credential abuse, privilege escalation, and data exfiltration (Synthesis)
  8. Investigate serverless and PaaS security incidents using execution logs, API Gateway traces, and managed service audit logs (Analysis)
  9. Correlate evidence across multiple cloud providers to construct unified investigation timelines (Analysis)
  10. Design automated forensic evidence collection pipelines that integrate with IR playbooks for rapid cloud incident response (Synthesis)

Prerequisites


MITRE ATT&CK Cloud Technique Mapping

Technique ID Technique Name Cloud Forensic Context Tactic
T1530 Data from Cloud Storage Object Unauthorized access to S3/Blob/GCS buckets; forensic analysis of access logs Collection
T1537 Transfer Data to Cloud Account Exfiltration to attacker-controlled cloud accounts; cross-account log analysis Exfiltration
T1078.004 Valid Accounts: Cloud Accounts Compromised cloud credentials; IAM audit log investigation Defense Evasion, Initial Access
T1562.008 Impair Defenses: Disable Cloud Logs Attacker disabling CloudTrail/audit logs; detecting logging gaps Defense Evasion
T1190 Exploit Public-Facing Application Cloud-hosted application exploitation; WAF and load balancer log analysis Initial Access
T1098 Account Manipulation IAM policy changes, role assumption abuse; tracking privilege modifications Persistence
T1078.001 Valid Accounts: Default Accounts Default service account abuse in cloud environments Initial Access
T1552.005 Unsecured Credentials: Cloud Instance Metadata IMDS exploitation; metadata service access log analysis Credential Access
T1580 Cloud Infrastructure Discovery Reconnaissance of cloud resources; API call pattern analysis Discovery
T1578 Modify Cloud Compute Infrastructure Creating/modifying instances for persistence; compute audit trails Defense Evasion

57.1 Cloud Forensics Fundamentals

57.1.1 The Shared Responsibility Model for Forensics

The shared responsibility model defines who owns what in cloud security -- and by extension, who owns what in cloud forensics. Understanding this division is the first step in any cloud investigation, because it determines which evidence you can collect yourself and which requires provider cooperation (or is simply unavailable).

Layer IaaS (EC2/VM/GCE) PaaS (RDS/App Service/Cloud SQL) SaaS (M365/Workspace)
Physical Infrastructure Provider Provider Provider
Hypervisor/Host OS Provider Provider Provider
Guest OS/Runtime Customer Provider Provider
Application Customer Shared Provider
Data Customer Customer Customer
Identity/Access Customer Customer Shared
Logging Configuration Customer Customer Provider (limited)
Log Retention Customer Customer Provider (limited)
Evidence Collection Customer Customer (limited) Provider (via API)

Critical Insight

The forensic implications of this model are stark: you cannot forensically image a cloud provider's hypervisor. You cannot seize their physical servers. You cannot subpoena their RAM. Your forensic universe is bounded by the APIs and services the provider exposes -- and by the logging you configured before the incident occurred. If CloudTrail was not enabled in a region, that region's API activity is forensically invisible. If VPC Flow Logs were not configured, network metadata is gone. Cloud forensic readiness is not optional -- it is a prerequisite for cloud forensic capability.

57.1.2 Volatile vs. Persistent Evidence in Cloud

Cloud environments fundamentally alter the volatility hierarchy that traditional forensics relies upon:

Evidence Type Traditional Volatility Cloud Volatility Notes
CPU Registers/Cache Seconds Seconds (inaccessible) Never available to customer in cloud
Memory (RAM) Minutes Minutes -- hours Available only while instance runs; requires proactive acquisition
Process State Minutes Minutes -- hours Must be captured before termination
Network Connections Minutes Minutes (metadata persists if flow logs enabled) Flow logs provide partial substitute
Disk Contents Days -- months Minutes -- permanent EBS/managed disks persist; instance store volumes are ephemeral
API/Audit Logs N/A (traditional) Days -- years (if configured) CloudTrail, Activity Log, Audit Log -- the primary evidence source
Configuration State Days Minutes -- permanent Infrastructure-as-code may preserve historical state
Network Logs Days 1--90 days (retention-dependent) VPC Flow Logs, NSG Flow Logs -- must be pre-configured

The API Log Inversion

In traditional forensics, disk evidence is king. In cloud forensics, API audit logs are king. Every action in a cloud environment -- creating instances, modifying IAM policies, accessing storage, changing security groups -- is an API call that can be logged. These logs are the single most important evidence source in cloud investigations. They are also the first thing sophisticated attackers try to disable (T1562.008).

57.1.3 Ephemeral Infrastructure Challenges

Modern cloud architectures create forensic challenges that have no traditional equivalent:

Auto-Scaling Groups: Instances are created and destroyed automatically based on load. An instance involved in an incident may be terminated and replaced before investigators are notified. Mitigation: configure auto-scaling lifecycle hooks to delay termination; tag forensic-hold instances.

Spot/Preemptible Instances: These instances can be terminated by the provider with minimal notice (2 minutes for AWS Spot). Any evidence on instance-store volumes is lost. Mitigation: use EBS-backed instances for workloads that may require forensic investigation; enable termination protection on critical instances.

Containers: Container filesystems are ephemeral by default. A container restart destroys the filesystem overlay. Container images are immutable, but runtime modifications (malware written to the filesystem, configuration changes) exist only in the overlay. Mitigation: use persistent volume claims for critical data; enable container runtime logging; configure image scanning.

Serverless Functions: Lambda/Cloud Functions/Azure Functions have no persistent disk, no accessible memory, and no traditional OS. Forensic evidence is limited to execution logs, input/output payloads (if logged), and API call records. Mitigation: enable detailed execution logging; configure dead-letter queues; instrument functions with tracing.

Infrastructure as Code: Terraform/CloudFormation/ARM templates define infrastructure declaratively. Changes to live infrastructure may not be reflected in IaC state, creating "drift" that is forensically significant. Mitigation: enable drift detection; audit IaC state files alongside live configuration.

Jurisdiction and Data Sovereignty

Cloud forensics introduces jurisdictional complexity that traditional forensics rarely encounters. A single cloud deployment may span multiple regions, each governed by different laws regarding data access, privacy, and evidence admissibility. Key considerations:

  • Data residency: Evidence stored in EU regions may be subject to GDPR restrictions on cross-border transfer, even for forensic purposes
  • Provider cooperation: Cloud providers respond to legal process (subpoenas, court orders) but timelines vary; AWS and Azure have law enforcement request guides
  • Multi-tenancy: Forensic actions must not access or impact other tenants' data -- this limits traditional forensic techniques like full disk imaging at the hypervisor level
  • Terms of service: Some forensic techniques (active scanning, penetration testing) may violate cloud provider ToS without prior notification
  • Chain of custody: Digital evidence from cloud APIs must be documented with the same rigor as physical evidence -- hash verification, timestamped collection logs, examiner identification
  • Mutual Legal Assistance Treaties (MLATs): Cross-border evidence requests between countries may require MLATs, adding weeks or months to investigation timelines

57.1.5 Cloud Forensic Process Model

The traditional forensic process (Identify -- Preserve -- Collect -- Analyze -- Report) adapts for cloud environments:

flowchart TD
    A[Incident Detected] --> B[Scope Assessment]
    B --> C{Cloud Provider<br/>Identification}
    C -->|AWS| D1[AWS Evidence Sources]
    C -->|Azure| D2[Azure Evidence Sources]
    C -->|GCP| D3[GCP Evidence Sources]
    C -->|Multi-Cloud| D4[All Providers]
    D1 --> E[Evidence Preservation]
    D2 --> E
    D3 --> E
    D4 --> E
    E --> F[Volatile Evidence<br/>Memory, Process State]
    E --> G[API/Audit Log<br/>Collection]
    E --> H[Disk/Storage<br/>Snapshot]
    E --> I[Network Log<br/>Collection]
    F --> J[Forensic Analysis]
    G --> J
    H --> J
    I --> J
    J --> K[Timeline<br/>Reconstruction]
    K --> L[Impact<br/>Assessment]
    L --> M[Forensic Report]
    M --> N[Lessons Learned /<br/>Readiness Update]

57.2 AWS Forensics

57.2.1 AWS Evidence Sources Overview

AWS provides extensive logging and monitoring capabilities, but most require explicit enablement:

Evidence Source Default State Retention Forensic Value
CloudTrail (Management Events) Enabled (90-day lookup) 90 days (free) or S3 (custom) Critical -- all API calls
CloudTrail (Data Events) Disabled Custom (S3 trail) High -- S3 object access, Lambda invocations
VPC Flow Logs Disabled CloudWatch or S3 High -- network metadata
GuardDuty Disabled 90 days High -- threat detection findings
S3 Access Logging Disabled Target bucket Medium -- bucket-level access
ELB Access Logs Disabled S3 bucket Medium -- HTTP request logs
CloudWatch Logs Application-dependent Custom Variable -- application logs
AWS Config Disabled S3 bucket High -- configuration history
Route 53 Query Logs Disabled CloudWatch Logs Medium -- DNS queries
Security Hub Disabled 90 days Medium -- aggregated findings

Forensic Readiness Baseline

At minimum, every AWS account should have:

  1. CloudTrail enabled in all regions with an organization trail writing to a dedicated S3 bucket
  2. CloudTrail data events enabled for S3 and Lambda
  3. VPC Flow Logs enabled on all VPCs (at minimum, REJECT records)
  4. GuardDuty enabled in all regions
  5. AWS Config enabled with a comprehensive rule set
  6. CloudWatch Logs agent on all EC2 instances
  7. S3 bucket versioning and access logging on all critical buckets

57.2.2 CloudTrail Log Analysis

CloudTrail is the single most important evidence source in AWS forensics. Every API call to AWS services generates a CloudTrail event record.

CloudTrail Event Structure (Key Fields):

{
    "eventVersion": "1.08",
    "userIdentity": {
        "type": "IAMUser",
        "principalId": "AIDAEXAMPLEID12345678",
        "arn": "arn:aws:iam::123456789012:user/testuser",
        "accountId": "123456789012",
        "accessKeyId": "AKIAIOSFODNN7EXAMPLE",
        "userName": "testuser",
        "sessionContext": {
            "attributes": {
                "mfaAuthenticated": "false",
                "creationDate": "2026-03-15T14:22:31Z"
            }
        }
    },
    "eventTime": "2026-03-15T14:25:47Z",
    "eventSource": "iam.amazonaws.com",
    "eventName": "CreateUser",
    "awsRegion": "us-east-1",
    "sourceIPAddress": "203.0.113.42",
    "userAgent": "aws-cli/2.15.0 Python/3.11.6",
    "requestParameters": {
        "userName": "backdoor-admin"
    },
    "responseElements": {
        "user": {
            "userName": "backdoor-admin",
            "userId": "AIDAEXAMPLEBACKDOOR01",
            "arn": "arn:aws:iam::123456789012:user/backdoor-admin",
            "createDate": "2026-03-15T14:25:47Z"
        }
    },
    "requestID": "a]b2c3d4-e5f6-7890-abcd-ef1234567890",
    "eventID": "12345678-abcd-ef01-2345-6789abcdef01",
    "eventType": "AwsApiCall",
    "managementEvent": true,
    "recipientAccountId": "123456789012"
}

Key Investigation Queries Using AWS CLI:

# Search for IAM user creation events in the last 7 days
aws cloudtrail lookup-events \
    --lookup-attributes AttributeKey=EventName,AttributeValue=CreateUser \
    --start-time "2026-03-08T00:00:00Z" \
    --end-time "2026-03-15T23:59:59Z" \
    --region us-east-1

# Search for console logins from a specific IP
aws cloudtrail lookup-events \
    --lookup-attributes AttributeKey=EventName,AttributeValue=ConsoleLogin \
    --start-time "2026-03-01T00:00:00Z" \
    --query 'Events[?contains(CloudTrailEvent, `203.0.113.42`)]'

# Search for events by a specific user
aws cloudtrail lookup-events \
    --lookup-attributes AttributeKey=Username,AttributeValue=testuser \
    --start-time "2026-03-10T00:00:00Z" \
    --max-results 50

# Search for S3 data exfiltration indicators (GetObject from unusual IPs)
aws cloudtrail lookup-events \
    --lookup-attributes AttributeKey=EventName,AttributeValue=GetObject \
    --start-time "2026-03-14T00:00:00Z" \
    --query 'Events[?contains(CloudTrailEvent, `203.0.113`)]'

Athena Queries for CloudTrail (S3 Trail):

-- Find all API calls from a suspicious IP address
SELECT eventTime, eventName, eventSource, userIdentity.arn,
       sourceIPAddress, userAgent, errorCode, errorMessage
FROM cloudtrail_logs
WHERE sourceIPAddress = '203.0.113.42'
  AND eventTime BETWEEN '2026-03-14' AND '2026-03-16'
ORDER BY eventTime ASC;

-- Detect IAM credential compromise -- access key used from multiple IPs
SELECT userIdentity.accessKeyId, 
       COUNT(DISTINCT sourceIPAddress) as unique_ips,
       ARRAY_AGG(DISTINCT sourceIPAddress) as ip_list
FROM cloudtrail_logs
WHERE eventTime BETWEEN '2026-03-01' AND '2026-03-15'
  AND userIdentity.accessKeyId IS NOT NULL
GROUP BY userIdentity.accessKeyId
HAVING COUNT(DISTINCT sourceIPAddress) > 5
ORDER BY unique_ips DESC;

-- Detect CloudTrail tampering (T1562.008)
SELECT eventTime, eventName, userIdentity.arn, sourceIPAddress,
       requestParameters
FROM cloudtrail_logs
WHERE eventName IN ('StopLogging', 'DeleteTrail', 'UpdateTrail',
                     'PutEventSelectors')
  AND eventTime BETWEEN '2026-03-01' AND '2026-03-15'
ORDER BY eventTime ASC;

-- Detect privilege escalation via IAM policy attachment
SELECT eventTime, eventName, userIdentity.arn, sourceIPAddress,
       requestParameters
FROM cloudtrail_logs
WHERE eventName IN ('AttachUserPolicy', 'AttachRolePolicy',
                     'PutUserPolicy', 'PutRolePolicy',
                     'CreatePolicyVersion', 'SetDefaultPolicyVersion')
  AND eventTime BETWEEN '2026-03-01' AND '2026-03-15'
ORDER BY eventTime ASC;

57.2.3 VPC Flow Logs Analysis

VPC Flow Logs capture IP traffic metadata for network interfaces in a VPC:

# VPC Flow Log format (version 2 default)
# version account-id interface-id srcaddr dstaddr srcport dstport protocol packets bytes start end action log-status
2 123456789012 eni-0a1b2c3d4e5f67890 203.0.113.42 10.0.1.50 54321 22 6 25 3000 1710500000 1710500060 ACCEPT OK
2 123456789012 eni-0a1b2c3d4e5f67890 10.0.1.50 203.0.113.42 22 54321 6 30 150000 1710500000 1710500060 ACCEPT OK
2 123456789012 eni-0a1b2c3d4e5f67890 198.51.100.77 10.0.1.50 45678 3389 6 5 300 1710500060 1710500120 REJECT OK

Athena Queries for VPC Flow Logs:

-- Find SSH connections from external IPs
SELECT srcaddr, dstaddr, srcport, dstport, 
       SUM(packets) as total_packets, SUM(bytes) as total_bytes,
       MIN(start) as first_seen, MAX("end") as last_seen
FROM vpc_flow_logs
WHERE dstport = 22 
  AND action = 'ACCEPT'
  AND srcaddr NOT LIKE '10.%'
  AND srcaddr NOT LIKE '172.16.%'
  AND srcaddr NOT LIKE '192.168.%'
  AND start BETWEEN 1710400000 AND 1710600000
GROUP BY srcaddr, dstaddr, srcport, dstport
ORDER BY total_bytes DESC;

-- Detect large data transfers (potential exfiltration)
SELECT srcaddr, dstaddr, dstport,
       SUM(bytes) as total_bytes,
       SUM(packets) as total_packets
FROM vpc_flow_logs
WHERE action = 'ACCEPT'
  AND srcaddr LIKE '10.0.%'
  AND dstaddr NOT LIKE '10.%'
  AND dstaddr NOT LIKE '172.16.%'
  AND start BETWEEN 1710400000 AND 1710600000
GROUP BY srcaddr, dstaddr, dstport
HAVING SUM(bytes) > 1073741824  -- Greater than 1 GB
ORDER BY total_bytes DESC;

-- Detect port scanning (many rejected connections from single source)
SELECT srcaddr, COUNT(DISTINCT dstport) as ports_scanned,
       COUNT(*) as total_attempts
FROM vpc_flow_logs
WHERE action = 'REJECT'
  AND start BETWEEN 1710400000 AND 1710600000
GROUP BY srcaddr
HAVING COUNT(DISTINCT dstport) > 100
ORDER BY ports_scanned DESC;

57.2.4 GuardDuty Findings Investigation

GuardDuty provides intelligent threat detection. When investigating findings:

# List active GuardDuty findings sorted by severity
aws guardduty list-findings \
    --detector-id abcdef1234567890abcdef1234567890 \
    --finding-criteria '{"Criterion":{"severity":{"Gte":7}}}' \
    --sort-criteria '{"AttributeName":"severity","OrderBy":"DESC"}'

# Get detailed finding information
aws guardduty get-findings \
    --detector-id abcdef1234567890abcdef1234567890 \
    --finding-ids "12345678901234567890123456789012"

# Export all findings for forensic timeline
aws guardduty get-findings \
    --detector-id abcdef1234567890abcdef1234567890 \
    --finding-ids $(aws guardduty list-findings \
        --detector-id abcdef1234567890abcdef1234567890 \
        --query 'FindingIds' --output text) \
    --output json > guardduty_findings_export.json

Key GuardDuty Finding Types for Investigation:

Finding Type Forensic Significance
UnauthorizedAccess:IAMUser/InstanceCredentialExfiltration IMDS credential theft -- correlate with instance metadata access logs
Recon:EC2/PortProbeUnprotectedPort External reconnaissance -- correlate with VPC Flow Logs
CryptoCurrency:EC2/BitcoinTool.B!DNS Cryptomining compromise -- check for unauthorized processes
Trojan:EC2/BlackholeTraffic C2 communication -- capture network traffic immediately
Exfiltration:S3/MaliciousIPCaller Data theft from S3 -- correlate with S3 access logs and CloudTrail data events
Impact:EC2/WinRMBruteForce Brute force attack -- check for successful authentication
CredentialAccess:IAMUser/AnomalousBehavior Unusual API patterns -- review full CloudTrail activity for the principal

57.2.5 EC2 Memory and Disk Acquisition

EBS Snapshot Acquisition (Disk Forensics):

# Step 1: Identify the instance and attached volumes
aws ec2 describe-instances \
    --instance-ids i-0a1b2c3d4e5f67890 \
    --query 'Reservations[].Instances[].{ID:InstanceId,State:State.Name,Volumes:BlockDeviceMappings[].Ebs.VolumeId}' \
    --output table

# Step 2: Create forensic snapshots of all attached volumes
# Tag with investigation metadata for chain of custody
aws ec2 create-snapshot \
    --volume-id vol-0a1b2c3d4e5f67890 \
    --description "Forensic snapshot - Case IR-2026-0042 - Examiner: testuser" \
    --tag-specifications 'ResourceType=snapshot,Tags=[{Key=ForensicCase,Value=IR-2026-0042},{Key=Examiner,Value=testuser},{Key=AcquisitionTime,Value=2026-03-15T14:30:00Z},{Key=ChainOfCustody,Value=initial-acquisition}]'

# Step 3: Wait for snapshot completion
aws ec2 describe-snapshots \
    --snapshot-ids snap-0a1b2c3d4e5f67890 \
    --query 'Snapshots[].{ID:SnapshotId,State:State,Progress:Progress}'

# Step 4: Create a forensic analysis volume from snapshot
# Use a dedicated forensic analysis VPC/account
aws ec2 create-volume \
    --snapshot-id snap-0a1b2c3d4e5f67890 \
    --availability-zone us-east-1a \
    --volume-type gp3 \
    --tag-specifications 'ResourceType=volume,Tags=[{Key=ForensicCase,Value=IR-2026-0042},{Key=Purpose,Value=forensic-analysis},{Key=ReadOnly,Value=true}]'

# Step 5: Attach to forensic workstation (read-only mount!)
aws ec2 attach-volume \
    --volume-id vol-0forensicvolume1234 \
    --instance-id i-0forensicworkstation \
    --device /dev/sdf

# Step 6: On the forensic workstation -- mount read-only and hash
# sudo mount -o ro,noexec,nosuid /dev/xvdf1 /mnt/forensic
# sha256sum /dev/xvdf > /evidence/IR-2026-0042/disk_hash.sha256

Memory Acquisition with AVML (Azure VMAccess for Linux Memory):

# AVML (Acquire Volatile Memory for Linux) -- works on AWS EC2 Linux instances
# Download AVML to the target instance via SSM
aws ssm send-command \
    --instance-ids i-0a1b2c3d4e5f67890 \
    --document-name "AWS-RunShellScript" \
    --parameters 'commands=[
        "curl -L https://github.com/microsoft/avml/releases/latest/download/avml -o /tmp/avml",
        "chmod +x /tmp/avml",
        "/tmp/avml /tmp/memory_dump.lime",
        "sha256sum /tmp/memory_dump.lime > /tmp/memory_dump.sha256",
        "aws s3 cp /tmp/memory_dump.lime s3://forensic-evidence-bucket/IR-2026-0042/memory/",
        "aws s3 cp /tmp/memory_dump.sha256 s3://forensic-evidence-bucket/IR-2026-0042/memory/"
    ]'

# Alternative: LiME (Linux Memory Extractor) for kernel module-based acquisition
# Requires kernel headers matching the target instance
aws ssm send-command \
    --instance-ids i-0a1b2c3d4e5f67890 \
    --document-name "AWS-RunShellScript" \
    --parameters 'commands=[
        "sudo insmod /tmp/lime-$(uname -r).ko path=/tmp/memory.lime format=lime",
        "sha256sum /tmp/memory.lime > /tmp/memory.sha256",
        "aws s3 cp /tmp/memory.lime s3://forensic-evidence-bucket/IR-2026-0042/memory/",
        "aws s3 cp /tmp/memory.sha256 s3://forensic-evidence-bucket/IR-2026-0042/memory/"
    ]'

Memory Acquisition Timing

Memory acquisition must happen before instance isolation or termination. Once an instance is stopped, memory is lost. The acquisition sequence should be: (1) acquire memory, (2) snapshot disks, (3) isolate network (modify security group to deny all), (4) preserve instance state. Never reverse this order.

57.2.6 S3 Access Logging Investigation

# Enable S3 server access logging (if not already enabled)
aws s3api put-bucket-logging \
    --bucket target-data-bucket.example.com \
    --bucket-logging-status '{
        "LoggingEnabled": {
            "TargetBucket": "s3-access-logs.example.com",
            "TargetPrefix": "target-data-bucket/"
        }
    }'

# S3 access log format analysis
# bucket_owner bucket time remote_ip requester request_id operation key
# request_uri http_status error_code bytes_sent object_size total_time
# turnaround_time referrer user_agent version_id host_id
# signature_version cipher_suite authentication_type host_header tls_version

# Query S3 access logs with Athena for unusual access patterns
# SELECT * FROM s3_access_logs
# WHERE bucket = 'sensitive-data.example.com'
#   AND remoteip LIKE '203.0.113.%'
#   AND operation = 'REST.GET.OBJECT'
#   AND parse_datetime(requestdatetime, 'dd/MMM/yyyy:HH:mm:ss Z')
#       BETWEEN timestamp '2026-03-14' AND timestamp '2026-03-16'
# ORDER BY requestdatetime;

57.2.7 IAM Credential Compromise Investigation

When IAM credentials are suspected compromised, follow this investigation workflow:

#!/usr/bin/env python3
"""AWS IAM Credential Compromise Investigation Script.

Investigates potentially compromised IAM credentials by analyzing
CloudTrail events, access patterns, and IAM configuration changes.
All data is collected for forensic analysis.

Case: IR-2026-0042
Examiner: testuser
"""

import boto3
import json
import hashlib
from datetime import datetime, timedelta

# Initialize clients
cloudtrail = boto3.client('cloudtrail', region_name='us-east-1')
iam = boto3.client('iam')

CASE_ID = "IR-2026-0042"
SUSPECT_USER = "testuser"
SUSPECT_KEY = "AKIAIOSFODNN7EXAMPLE"
INVESTIGATION_START = datetime(2026, 3, 1)
INVESTIGATION_END = datetime(2026, 3, 15)

def investigate_credential_usage():
    """Collect all API calls made with the suspect access key."""
    events = []
    paginator = cloudtrail.get_paginator('lookup_events')

    for page in paginator.paginate(
        LookupAttributes=[{
            'AttributeKey': 'AccessKeyId',
            'AttributeValue': SUSPECT_KEY
        }],
        StartTime=INVESTIGATION_START,
        EndTime=INVESTIGATION_END
    ):
        for event in page['Events']:
            event_data = json.loads(event['CloudTrailEvent'])
            events.append({
                'timestamp': event['EventTime'].isoformat(),
                'event_name': event['EventName'],
                'source_ip': event_data.get('sourceIPAddress', 'N/A'),
                'user_agent': event_data.get('userAgent', 'N/A'),
                'region': event_data.get('awsRegion', 'N/A'),
                'error_code': event_data.get('errorCode', 'None'),
                'resources': event.get('Resources', [])
            })

    return events

def analyze_source_ips(events):
    """Identify unique source IPs and flag anomalies."""
    ip_map = {}
    for event in events:
        ip = event['source_ip']
        if ip not in ip_map:
            ip_map[ip] = {'count': 0, 'first_seen': event['timestamp'],
                          'last_seen': event['timestamp'], 'events': []}
        ip_map[ip]['count'] += 1
        ip_map[ip]['last_seen'] = event['timestamp']
        ip_map[ip]['events'].append(event['event_name'])

    return ip_map

def check_iam_changes():
    """Check for IAM configuration changes by the suspect user."""
    iam_events = []
    iam_event_names = [
        'CreateUser', 'CreateAccessKey', 'AttachUserPolicy',
        'AttachRolePolicy', 'PutUserPolicy', 'PutRolePolicy',
        'CreateRole', 'UpdateAssumeRolePolicy', 'CreateLoginProfile',
        'UpdateLoginProfile', 'AddUserToGroup'
    ]

    paginator = cloudtrail.get_paginator('lookup_events')
    for event_name in iam_event_names:
        for page in paginator.paginate(
            LookupAttributes=[{
                'AttributeKey': 'EventName',
                'AttributeValue': event_name
            }],
            StartTime=INVESTIGATION_START,
            EndTime=INVESTIGATION_END
        ):
            for event in page['Events']:
                event_data = json.loads(event['CloudTrailEvent'])
                identity = event_data.get('userIdentity', {})
                if identity.get('accessKeyId') == SUSPECT_KEY or \
                   identity.get('userName') == SUSPECT_USER:
                    iam_events.append({
                        'timestamp': event['EventTime'].isoformat(),
                        'event_name': event['EventName'],
                        'details': event_data.get('requestParameters', {}),
                        'source_ip': event_data.get('sourceIPAddress', 'N/A')
                    })

    return iam_events

def generate_report(events, ip_analysis, iam_changes):
    """Generate forensic investigation report."""
    report = {
        'case_id': CASE_ID,
        'examiner': 'testuser',
        'generated_at': datetime.utcnow().isoformat(),
        'investigation_period': {
            'start': INVESTIGATION_START.isoformat(),
            'end': INVESTIGATION_END.isoformat()
        },
        'suspect_credential': {
            'user': SUSPECT_USER,
            'access_key': SUSPECT_KEY
        },
        'total_events': len(events),
        'unique_source_ips': len(ip_analysis),
        'source_ip_analysis': ip_analysis,
        'iam_changes_detected': len(iam_changes),
        'iam_changes': iam_changes,
        'findings': []
    }

    # Flag suspicious patterns
    if len(ip_analysis) > 3:
        report['findings'].append({
            'severity': 'HIGH',
            'finding': f'Access key used from {len(ip_analysis)} unique IPs -- '
                       f'indicates potential credential compromise'
        })

    if iam_changes:
        report['findings'].append({
            'severity': 'CRITICAL',
            'finding': f'{len(iam_changes)} IAM changes detected -- '
                       f'attacker may have established persistence'
        })

    # Write report with integrity hash
    report_json = json.dumps(report, indent=2, default=str)
    report_hash = hashlib.sha256(report_json.encode()).hexdigest()

    with open(f'{CASE_ID}_credential_investigation.json', 'w') as f:
        f.write(report_json)

    print(f"Report generated: {CASE_ID}_credential_investigation.json")
    print(f"SHA-256: {report_hash}")
    print(f"Total events analyzed: {len(events)}")
    print(f"Unique source IPs: {len(ip_analysis)}")
    print(f"IAM changes detected: {len(iam_changes)}")
    print(f"Findings: {len(report['findings'])}")

if __name__ == '__main__':
    print(f"[*] Starting credential investigation for case {CASE_ID}")
    events = investigate_credential_usage()
    ip_analysis = analyze_source_ips(events)
    iam_changes = check_iam_changes()
    generate_report(events, ip_analysis, iam_changes)

57.2.8 Lambda Forensics

Serverless function forensics requires different approaches since there is no persistent infrastructure:

# Retrieve Lambda function configuration (check for modifications)
aws lambda get-function \
    --function-name compromised-function-example \
    --query '{FunctionName:Configuration.FunctionName,Runtime:Configuration.Runtime,Handler:Configuration.Handler,LastModified:Configuration.LastModified,Environment:Configuration.Environment,Layers:Configuration.Layers}'

# Get function code for analysis
aws lambda get-function \
    --function-name compromised-function-example \
    --query 'Code.Location' --output text | xargs curl -o function_code.zip

# Retrieve execution logs from CloudWatch
aws logs filter-log-events \
    --log-group-name "/aws/lambda/compromised-function-example" \
    --start-time 1710400000000 \
    --end-time 1710600000000 \
    --filter-pattern "ERROR" \
    --output json > lambda_error_logs.json

# Check function version history (detect code injection)
aws lambda list-versions-by-function \
    --function-name compromised-function-example \
    --query 'Versions[].{Version:Version,LastModified:LastModified,CodeSha256:CodeSha256}'

# Check for event source mapping changes (input manipulation)
aws lambda list-event-source-mappings \
    --function-name compromised-function-example

# Review IAM role attached to Lambda for over-permission
aws iam get-role-policy \
    --role-name lambda-execution-role-example \
    --policy-name inline-policy-example

57.3 Azure Forensics

57.3.1 Azure Evidence Sources Overview

Evidence Source Default State Retention Forensic Value
Azure Activity Log Enabled 90 days (free) Critical -- subscription-level operations
Azure AD Sign-in Logs Enabled (P1/P2) 30 days (free) Critical -- authentication events
Azure AD Audit Logs Enabled (P1/P2) 30 days (free) Critical -- directory changes
NSG Flow Logs Disabled Storage account High -- network metadata
Microsoft Defender for Cloud Free tier limited 30 days High -- security alerts
Azure Monitor / Log Analytics Requires workspace Custom (30-730 days) High -- centralized logging
Diagnostic Logs Disabled (per resource) Storage / Log Analytics Variable -- resource-specific
Key Vault Audit Logs Disabled Storage / Log Analytics High -- secret access tracking
Storage Analytics Disabled Storage account Medium -- blob/queue/table access
Azure Firewall Logs Requires diagnostic setting Log Analytics High -- firewall decisions

57.3.2 Azure Activity Log Analysis

# Query Azure Activity Log for specific operations
az monitor activity-log list \
    --start-time "2026-03-14T00:00:00Z" \
    --end-time "2026-03-16T00:00:00Z" \
    --caller "testuser@example.com" \
    --output table

# Filter for write/delete operations only (most forensically relevant)
az monitor activity-log list \
    --start-time "2026-03-14T00:00:00Z" \
    --end-time "2026-03-16T00:00:00Z" \
    --query "[?authorization.action && (contains(authorization.action, 'write') || contains(authorization.action, 'delete'))]" \
    --output json

# Export Activity Log for forensic preservation
az monitor activity-log list \
    --start-time "2026-03-01T00:00:00Z" \
    --end-time "2026-03-15T23:59:59Z" \
    --output json > activity_log_export_IR-2026-0042.json

# Compute hash for chain of custody
sha256sum activity_log_export_IR-2026-0042.json

57.3.3 Azure AD Sign-in and Audit Log Investigation

KQL Queries for Azure AD Investigation (Log Analytics / Sentinel):

// Detect impossible travel -- same user authenticating from distant locations
SigninLogs
| where TimeGenerated between (datetime(2026-03-14) .. datetime(2026-03-16))
| where ResultType == 0  // Successful sign-ins only
| project TimeGenerated, UserPrincipalName, IPAddress, Location,
          AppDisplayName, ClientAppUsed, DeviceDetail
| order by UserPrincipalName, TimeGenerated asc
| serialize
| extend PrevLocation = prev(Location), PrevTime = prev(TimeGenerated),
         PrevUser = prev(UserPrincipalName)
| where UserPrincipalName == PrevUser
| extend TimeDiffMinutes = datetime_diff('minute', TimeGenerated, PrevTime)
| where Location != PrevLocation and TimeDiffMinutes < 60
| project TimeGenerated, UserPrincipalName, IPAddress, Location,
          PrevLocation, TimeDiffMinutes, AppDisplayName

// Detect brute force attacks against Azure AD
SigninLogs
| where TimeGenerated > ago(24h)
| where ResultType != 0  // Failed sign-ins
| summarize FailedAttempts = count(),
            DistinctUsers = dcount(UserPrincipalName),
            TargetUsers = make_set(UserPrincipalName)
    by IPAddress, bin(TimeGenerated, 1h)
| where FailedAttempts > 20
| order by FailedAttempts desc

// Detect MFA bypass or MFA fatigue attacks
SigninLogs
| where TimeGenerated > ago(7d)
| where ResultType == 0
| where AuthenticationRequirement == "multiFactorAuthentication"
| summarize MFAPrompts = count(), 
            SuccessfulMFA = countif(Status.additionalDetails == "MFA completed in Azure AD"),
            DeniedMFA = countif(Status.additionalDetails has "denied")
    by UserPrincipalName, IPAddress
| where MFAPrompts > 10

// Detect Azure AD privilege escalation -- role assignment changes
AuditLogs
| where TimeGenerated between (datetime(2026-03-14) .. datetime(2026-03-16))
| where OperationName has_any ("Add member to role", "Add eligible member to role",
                                "Add scoped member to role")
| project TimeGenerated, OperationName, 
          InitiatedBy = tostring(InitiatedBy.user.userPrincipalName),
          TargetUser = tostring(TargetResources[0].userPrincipalName),
          RoleName = tostring(TargetResources[0].modifiedProperties[1].newValue),
          Result
| order by TimeGenerated desc

// Detect application consent grants (T1098 -- persistence via OAuth app)
AuditLogs
| where TimeGenerated > ago(30d)
| where OperationName == "Consent to application"
| project TimeGenerated,
          ConsentedBy = tostring(InitiatedBy.user.userPrincipalName),
          AppName = tostring(TargetResources[0].displayName),
          Permissions = tostring(TargetResources[0].modifiedProperties),
          Result
| order by TimeGenerated desc

// Investigate specific compromised user -- full activity timeline
let suspect_user = "testuser@example.com";
union SigninLogs, AuditLogs
| where TimeGenerated between (datetime(2026-03-14) .. datetime(2026-03-16))
| where (UserPrincipalName == suspect_user) or 
        (tostring(InitiatedBy.user.userPrincipalName) == suspect_user)
| project TimeGenerated, Type, OperationName, 
          IPAddress = coalesce(IPAddress, "N/A"),
          Result = coalesce(tostring(ResultType), tostring(Result)),
          Details = coalesce(tostring(ResultDescription), tostring(TargetResources))
| order by TimeGenerated asc

57.3.4 NSG Flow Logs Analysis

// Detect large data transfers from Azure VMs to external IPs
AzureNetworkAnalytics_CL
| where TimeGenerated > ago(24h)
| where FlowDirection_s == "O"  // Outbound
| where SrcIP_s startswith "10." or SrcIP_s startswith "172.16." or SrcIP_s startswith "192.168."
| where not(DestIP_s startswith "10." or DestIP_s startswith "172.16." or DestIP_s startswith "192.168.")
| summarize TotalBytes = sum(BytesSentToDestination_d),
            TotalPackets = sum(PacketsSentToDestination_d),
            FlowCount = count()
    by SrcIP_s, DestIP_s, DestPort_d, bin(TimeGenerated, 1h)
| where TotalBytes > 1073741824  // 1 GB threshold
| order by TotalBytes desc

// Detect port scanning from external sources
AzureNetworkAnalytics_CL
| where TimeGenerated > ago(24h)
| where FlowDirection_s == "I"  // Inbound
| where FlowStatus_s == "D"  // Denied
| summarize PortsScanned = dcount(DestPort_d),
            AttemptsCount = count()
    by SrcIP_s
| where PortsScanned > 50
| order by PortsScanned desc

57.3.5 Azure VM Disk Snapshot Acquisition

# Step 1: Identify the VM and its disks
az vm show \
    --resource-group forensic-rg \
    --name suspect-vm-example \
    --query '{Name:name, OSDisk:storageProfile.osDisk.name, DataDisks:storageProfile.dataDisks[].name}' \
    --output json

# Step 2: Create snapshot of OS disk
az snapshot create \
    --resource-group forensic-evidence-rg \
    --name "IR-2026-0042-osdisk-snapshot" \
    --source "/subscriptions/00000000-0000-0000-0000-000000000000/resourceGroups/forensic-rg/providers/Microsoft.Compute/disks/suspect-vm-osdisk" \
    --tags CaseID=IR-2026-0042 Examiner=testuser AcquisitionTime=2026-03-15T14:30:00Z ChainOfCustody=initial

# Step 3: Create a managed disk from snapshot for analysis
az disk create \
    --resource-group forensic-evidence-rg \
    --name "IR-2026-0042-analysis-disk" \
    --source "IR-2026-0042-osdisk-snapshot" \
    --tags CaseID=IR-2026-0042 Purpose=forensic-analysis ReadOnly=true

# Step 4: Grant SAS URI for download (offline analysis)
az snapshot grant-access \
    --resource-group forensic-evidence-rg \
    --name "IR-2026-0042-osdisk-snapshot" \
    --duration-in-seconds 86400 \
    --access-level Read \
    --query accessSas --output tsv

57.3.6 Key Vault Access Investigation

// Detect unauthorized Key Vault access attempts
AzureDiagnostics
| where ResourceProvider == "MICROSOFT.KEYVAULT"
| where TimeGenerated between (datetime(2026-03-14) .. datetime(2026-03-16))
| where OperationName has_any ("SecretGet", "SecretList", "KeyGet", 
                                "CertificateGet", "VaultGet")
| project TimeGenerated, OperationName, 
          CallerIPAddress, 
          Identity = identity_claim_upn_s,
          ResultType,
          Resource
| order by TimeGenerated desc

// Detect Key Vault access from unusual IPs
AzureDiagnostics
| where ResourceProvider == "MICROSOFT.KEYVAULT"
| where TimeGenerated > ago(30d)
| where ResultType == "Success"
| summarize AccessCount = count(),
            Operations = make_set(OperationName)
    by CallerIPAddress, identity_claim_upn_s
| join kind=leftanti (
    AzureDiagnostics
    | where ResourceProvider == "MICROSOFT.KEYVAULT"
    | where TimeGenerated between (ago(90d) .. ago(30d))
    | where ResultType == "Success"
    | distinct CallerIPAddress
) on CallerIPAddress
| order by AccessCount desc

// Detect bulk secret enumeration
AzureDiagnostics
| where ResourceProvider == "MICROSOFT.KEYVAULT"
| where TimeGenerated > ago(24h)
| where OperationName == "SecretList" or OperationName == "SecretGet"
| summarize SecretAccesses = count(),
            UniqueSecrets = dcount(id_s)
    by CallerIPAddress, identity_claim_upn_s, bin(TimeGenerated, 5m)
| where SecretAccesses > 20
| order by SecretAccesses desc

57.3.7 Azure Storage Analytics Investigation

# Enable storage analytics logging
az storage logging update \
    --account-name storageexample \
    --services b \
    --log rwd \
    --retention 90

# Query storage analytics logs for suspicious access
az storage blob list \
    --account-name storageexample \
    --container-name '$logs' \
    --prefix "blob/2026/03/15" \
    --output table

57.4 GCP Forensics

57.4.1 GCP Evidence Sources Overview

Evidence Source Default State Retention Forensic Value
Cloud Audit Logs (Admin Activity) Enabled (always) 400 days Critical -- admin operations
Cloud Audit Logs (Data Access) Disabled (most services) 30 days High -- data access tracking
Cloud Audit Logs (System Event) Enabled (always) 400 days Medium -- system events
VPC Flow Logs Disabled 30 days (Cloud Logging) High -- network metadata
Security Command Center Free tier limited Varies High -- security findings
Cloud Load Balancing Logs Enabled 30 days Medium -- HTTP requests
DNS Logging Disabled 30 days Medium -- DNS queries
Firewall Rules Logging Disabled 30 days High -- firewall decisions
Access Transparency Logs Enterprise only 400 days Medium -- Google staff access

57.4.2 Cloud Audit Log Analysis

# Query admin activity logs for a specific time range
gcloud logging read '
    logName="projects/example-project-123/logs/cloudaudit.googleapis.com%2Factivity"
    AND timestamp >= "2026-03-14T00:00:00Z"
    AND timestamp <= "2026-03-16T00:00:00Z"
' --project example-project-123 \
  --format json \
  --limit 500 > audit_activity_logs.json

# Search for IAM policy changes
gcloud logging read '
    logName="projects/example-project-123/logs/cloudaudit.googleapis.com%2Factivity"
    AND protoPayload.methodName=("SetIamPolicy" OR "google.iam.admin.v1.CreateServiceAccountKey")
    AND timestamp >= "2026-03-14T00:00:00Z"
' --project example-project-123 \
  --format json

# Search for compute instance modifications
gcloud logging read '
    logName="projects/example-project-123/logs/cloudaudit.googleapis.com%2Factivity"
    AND protoPayload.methodName:("compute.instances" OR "compute.firewalls")
    AND timestamp >= "2026-03-14T00:00:00Z"
' --project example-project-123 \
  --format json

# Search for data access events (must be enabled)
gcloud logging read '
    logName="projects/example-project-123/logs/cloudaudit.googleapis.com%2Fdata_access"
    AND protoPayload.methodName="storage.objects.get"
    AND protoPayload.authenticationInfo.principalEmail="testuser@example.com"
    AND timestamp >= "2026-03-14T00:00:00Z"
' --project example-project-123 \
  --format json

# Search for suspicious callerIp patterns
gcloud logging read '
    logName="projects/example-project-123/logs/cloudaudit.googleapis.com%2Factivity"
    AND protoPayload.requestMetadata.callerIp="203.0.113.42"
    AND timestamp >= "2026-03-14T00:00:00Z"
' --project example-project-123 \
  --format json

57.4.3 GCP VPC Flow Logs

# Enable VPC Flow Logs on a subnet
gcloud compute networks subnets update example-subnet \
    --region us-central1 \
    --enable-flow-logs \
    --logging-flow-sampling 1.0 \
    --logging-aggregation-interval interval-5-sec \
    --logging-metadata include-all

# Query VPC Flow Logs for external connections
gcloud logging read '
    resource.type="gce_subnetwork"
    AND logName="projects/example-project-123/logs/compute.googleapis.com%2Fvpc_flows"
    AND jsonPayload.connection.dest_ip="203.0.113.42"
    AND timestamp >= "2026-03-14T00:00:00Z"
' --project example-project-123 \
  --format json \
  --limit 100

57.4.4 Compute Engine Snapshot Acquisition

# Step 1: List disks attached to the suspect instance
gcloud compute instances describe suspect-instance-example \
    --zone us-central1-a \
    --project example-project-123 \
    --format 'json(disks[].source)'

# Step 2: Create forensic snapshot
gcloud compute disks snapshot suspect-instance-disk \
    --zone us-central1-a \
    --project example-project-123 \
    --snapshot-names "ir-2026-0042-disk-snapshot" \
    --description "Forensic snapshot - Case IR-2026-0042 - Examiner: testuser - Time: 2026-03-15T14:30:00Z" \
    --labels case=ir-2026-0042,examiner=testuser,purpose=forensic

# Step 3: Create analysis disk from snapshot (in forensic project)
gcloud compute disks create ir-2026-0042-analysis-disk \
    --source-snapshot ir-2026-0042-disk-snapshot \
    --zone us-central1-a \
    --project forensic-analysis-project \
    --labels case=ir-2026-0042,purpose=analysis

# Step 4: Attach to forensic workstation
gcloud compute instances attach-disk forensic-workstation \
    --disk ir-2026-0042-analysis-disk \
    --mode ro \
    --zone us-central1-a \
    --project forensic-analysis-project

57.4.5 IAM Policy Analysis

# Get current IAM policy for the project
gcloud projects get-iam-policy example-project-123 \
    --format json > project_iam_policy.json

# List service account keys (check for unauthorized keys)
gcloud iam service-accounts keys list \
    --iam-account testuser@example-project-123.iam.gserviceaccount.com \
    --format 'table(name, validAfterTime, validBeforeTime, keyType)'

# Check for recently created service accounts
gcloud iam service-accounts list \
    --project example-project-123 \
    --format 'table(email, displayName, disabled)' \
    --filter "createTime > 2026-03-01"

# Analyze IAM policy changes in audit logs
gcloud logging read '
    protoPayload.methodName="SetIamPolicy"
    AND timestamp >= "2026-03-01T00:00:00Z"
    AND timestamp <= "2026-03-15T23:59:59Z"
' --project example-project-123 \
  --format json > iam_policy_changes.json

57.4.6 Cloud Functions Investigation

# List Cloud Functions and check for modifications
gcloud functions list --project example-project-123 --format json

# Get function source code for analysis
gcloud functions describe suspicious-function-example \
    --project example-project-123 \
    --format json

# Download function source
gcloud functions get-function-source suspicious-function-example \
    --project example-project-123 \
    --gen2 \
    --region us-central1

# Query execution logs
gcloud logging read '
    resource.type="cloud_function"
    AND resource.labels.function_name="suspicious-function-example"
    AND severity >= WARNING
    AND timestamp >= "2026-03-14T00:00:00Z"
' --project example-project-123 \
  --format json \
  --limit 200

57.4.7 Security Command Center Findings

# List active security findings
gcloud scc findings list \
    --organization 123456789 \
    --source "organizations/123456789/sources/-" \
    --filter 'state="ACTIVE" AND category="PERSISTENCE"' \
    --format json

# Get finding details
gcloud scc findings describe finding-id-example \
    --organization 123456789 \
    --source "organizations/123456789/sources/source-id"

57.5 Container & Kubernetes Forensics

57.5.1 Container Forensics Fundamentals

Container forensics is uniquely challenging because of the layered filesystem architecture and ephemeral nature of containers. Understanding the relationship between images, layers, and the writable overlay is essential.

flowchart TB
    subgraph Container_Runtime["Container at Runtime"]
        W[Writable Layer -- overlay<br/>Malware, config changes, artifacts]
        L3[Image Layer 3 -- app code]
        L2[Image Layer 2 -- dependencies]
        L1[Image Layer 1 -- base OS]
    end

    subgraph Evidence["Forensic Evidence Sources"]
        E1[Container Logs -- stdout/stderr]
        E2[Writable Layer Diff -- filesystem changes]
        E3[Image Layers -- known-good baseline]
        E4[Volume Mounts -- persistent data]
        E5[Network Captures -- container traffic]
        E6[Runtime Events -- syscalls, process exec]
    end

    W --> E2
    Container_Runtime --> E1
    Container_Runtime --> E5
    Container_Runtime --> E6
    L3 --> E3

Container Evidence Volatility

When a container is stopped, the writable layer persists on the host. When a container is removed (docker rm), the writable layer is deleted. When a pod is restarted in Kubernetes, the previous container's writable layer is lost. Always capture container state before any restart or removal.

57.5.2 Container Image Analysis

# Export a running container's filesystem (including writable layer)
docker export suspicious-container-example > \
    /evidence/IR-2026-0042/container_filesystem.tar
sha256sum /evidence/IR-2026-0042/container_filesystem.tar > \
    /evidence/IR-2026-0042/container_filesystem.sha256

# Compare running container against its base image
# This shows files added, modified, or deleted at runtime
docker diff suspicious-container-example
# Output:
# C /etc
# A /etc/cron.d/backdoor
# C /tmp
# A /tmp/reverse_shell.py
# A /tmp/.hidden_config
# C /var/log
# D /var/log/auth.log

# Save the container image for offline analysis
docker save suspicious-image:latest | gzip > \
    /evidence/IR-2026-0042/container_image.tar.gz

# Inspect image layers and configuration
docker inspect suspicious-container-example --format '{{json .}}' | \
    python3 -m json.tool > container_inspect.json

# Analyze image history (detect injected layers)
docker history suspicious-image:latest --no-trunc --format \
    'table {{.CreatedAt}}\t{{.CreatedBy}}\t{{.Size}}'

# Extract specific layer for analysis
mkdir -p /evidence/IR-2026-0042/layers
tar xf container_image.tar -C /evidence/IR-2026-0042/layers/
# Each directory in manifest.json corresponds to a layer

Container Image Forensic Analysis Script:

#!/usr/bin/env python3
"""Container image forensic analysis.

Analyzes container image layers to identify suspicious modifications,
embedded credentials, and malicious additions.

Case: IR-2026-0042
"""

import json
import tarfile
import hashlib
import os
from pathlib import Path

EVIDENCE_DIR = "/evidence/IR-2026-0042"
SUSPICIOUS_PATTERNS = [
    '/etc/cron', '/etc/shadow', '/etc/passwd',
    '.ssh/authorized_keys', 'reverse_shell', 'backdoor',
    'nc ', 'ncat', 'socat', '/dev/tcp', '/dev/shm',
    'curl.*|.*sh', 'wget.*|.*sh', 'base64.*decode'
]

SUSPICIOUS_EXTENSIONS = ['.sh', '.py', '.pl', '.rb', '.elf', '.so']

def analyze_container_diff(container_name):
    """Analyze docker diff output for suspicious changes."""
    import subprocess
    result = subprocess.run(
        ['docker', 'diff', container_name],
        capture_output=True, text=True
    )

    findings = []
    for line in result.stdout.strip().split('\n'):
        if not line:
            continue
        change_type = line[0]  # A=Added, C=Changed, D=Deleted
        filepath = line[2:]

        # Flag suspicious paths
        for pattern in SUSPICIOUS_PATTERNS:
            if pattern in filepath.lower():
                findings.append({
                    'type': 'SUSPICIOUS_PATH',
                    'change': change_type,
                    'path': filepath,
                    'pattern_matched': pattern,
                    'severity': 'HIGH'
                })

        # Flag suspicious file types added
        if change_type == 'A':
            ext = Path(filepath).suffix.lower()
            if ext in SUSPICIOUS_EXTENSIONS:
                findings.append({
                    'type': 'SUSPICIOUS_FILE_ADDED',
                    'change': change_type,
                    'path': filepath,
                    'extension': ext,
                    'severity': 'MEDIUM'
                })

        # Flag deleted log files (anti-forensics)
        if change_type == 'D' and ('log' in filepath.lower() or 
                                    'history' in filepath.lower()):
            findings.append({
                'type': 'LOG_DELETION',
                'change': change_type,
                'path': filepath,
                'severity': 'HIGH'
            })

    return findings

def analyze_image_layers(image_tar_path):
    """Analyze image layers for embedded secrets or suspicious content."""
    findings = []

    with tarfile.open(image_tar_path, 'r:gz') as tar:
        manifest = json.loads(
            tar.extractfile('manifest.json').read()
        )

        for layer_info in manifest:
            config_file = layer_info.get('Config', '')
            if config_file:
                config = json.loads(tar.extractfile(config_file).read())

                # Check environment variables for embedded credentials
                env_vars = config.get('config', {}).get('Env', [])
                for env in env_vars:
                    key = env.split('=')[0] if '=' in env else env
                    sensitive_keys = ['PASSWORD', 'SECRET', 'TOKEN', 
                                     'API_KEY', 'PRIVATE_KEY', 'AWS_']
                    if any(s in key.upper() for s in sensitive_keys):
                        findings.append({
                            'type': 'EMBEDDED_CREDENTIAL',
                            'env_var': key,
                            'severity': 'CRITICAL',
                            'note': 'Value redacted for report'
                        })

    return findings

if __name__ == '__main__':
    print(f"[*] Container forensic analysis for case IR-2026-0042")

    diff_findings = analyze_container_diff('suspicious-container-example')
    print(f"[*] Container diff findings: {len(diff_findings)}")
    for f in diff_findings:
        print(f"  [{f['severity']}] {f['type']}: {f['path']}")

    image_tar = f"{EVIDENCE_DIR}/container_image.tar.gz"
    if os.path.exists(image_tar):
        layer_findings = analyze_image_layers(image_tar)
        print(f"[*] Image layer findings: {len(layer_findings)}")
        for f in layer_findings:
            print(f"  [{f['severity']}] {f['type']}: {f.get('env_var', 'N/A')}")

57.5.3 Kubernetes Forensics

Kubernetes adds orchestration complexity on top of container forensics. Key evidence sources:

# Collect pod logs (including previous container if restarted)
kubectl logs suspicious-pod-example -n production --all-containers \
    > /evidence/IR-2026-0042/pod_current_logs.txt
kubectl logs suspicious-pod-example -n production --all-containers --previous \
    > /evidence/IR-2026-0042/pod_previous_logs.txt 2>/dev/null

# Get pod details (including events, resource specs, security context)
kubectl describe pod suspicious-pod-example -n production \
    > /evidence/IR-2026-0042/pod_describe.txt

# Get pod YAML (full specification)
kubectl get pod suspicious-pod-example -n production -o yaml \
    > /evidence/IR-2026-0042/pod_spec.yaml

# List all events in the namespace (sorted by time)
kubectl get events -n production --sort-by='.lastTimestamp' \
    > /evidence/IR-2026-0042/namespace_events.txt

# Check for suspicious RBAC bindings
kubectl get clusterrolebindings -o json | \
    python3 -c "
import json, sys
data = json.load(sys.stdin)
for item in data['items']:
    for subject in item.get('subjects', []):
        role = item['roleRef']['name']
        if role in ['cluster-admin', 'admin']:
            print(f'ALERT: {subject.get(\"name\",\"unknown\")} '
                  f'({subject.get(\"kind\",\"unknown\")}) bound to {role}')
"

# Check for privileged containers in the cluster
kubectl get pods --all-namespaces -o json | \
    python3 -c "
import json, sys
data = json.load(sys.stdin)
for pod in data['items']:
    ns = pod['metadata']['namespace']
    name = pod['metadata']['name']
    for container in pod['spec'].get('containers', []):
        sc = container.get('securityContext', {})
        if sc.get('privileged', False):
            print(f'PRIVILEGED: {ns}/{name} container={container[\"name\"]}')
        if sc.get('runAsUser') == 0:
            print(f'ROOT: {ns}/{name} container={container[\"name\"]}')
"

# Capture network policies
kubectl get networkpolicies -n production -o yaml \
    > /evidence/IR-2026-0042/network_policies.yaml

# Check for suspicious CronJobs (persistence mechanism)
kubectl get cronjobs --all-namespaces -o json | \
    python3 -c "
import json, sys
data = json.load(sys.stdin)
for cj in data['items']:
    ns = cj['metadata']['namespace']
    name = cj['metadata']['name']
    schedule = cj['spec']['schedule']
    image = cj['spec']['jobTemplate']['spec']['template']['spec']['containers'][0]['image']
    created = cj['metadata']['creationTimestamp']
    print(f'{ns}/{name} schedule={schedule} image={image} created={created}')
"

57.5.4 etcd Forensics

etcd stores all Kubernetes cluster state. Forensic access to etcd can reveal deleted resources, modified configurations, and historical state:

# List all keys in etcd (requires etcd client access)
ETCDCTL_API=3 etcdctl get "" --prefix --keys-only \
    --endpoints=https://10.0.1.10:2379 \
    --cacert=/etc/kubernetes/pki/etcd/ca.crt \
    --cert=/etc/kubernetes/pki/etcd/server.crt \
    --key=/etc/kubernetes/pki/etcd/server.key

# Get specific secret (may contain compromised credentials)
ETCDCTL_API=3 etcdctl get /registry/secrets/production/suspicious-secret \
    --endpoints=https://10.0.1.10:2379 \
    --cacert=/etc/kubernetes/pki/etcd/ca.crt \
    --cert=/etc/kubernetes/pki/etcd/server.crt \
    --key=/etc/kubernetes/pki/etcd/server.key

# Snapshot etcd for forensic preservation
ETCDCTL_API=3 etcdctl snapshot save /evidence/IR-2026-0042/etcd_snapshot.db \
    --endpoints=https://10.0.1.10:2379 \
    --cacert=/etc/kubernetes/pki/etcd/ca.crt \
    --cert=/etc/kubernetes/pki/etcd/server.crt \
    --key=/etc/kubernetes/pki/etcd/server.key

sha256sum /evidence/IR-2026-0042/etcd_snapshot.db > \
    /evidence/IR-2026-0042/etcd_snapshot.sha256

# Verify snapshot integrity
ETCDCTL_API=3 etcdctl snapshot status /evidence/IR-2026-0042/etcd_snapshot.db \
    --write-out=table

etcd Security Warning

etcd contains all cluster secrets in base64 encoding (not encryption, unless encryption-at-rest is configured). Direct etcd access should be strictly controlled and audited. Forensic etcd snapshots must be treated as highly sensitive evidence containing credentials for every service in the cluster.

57.5.5 Service Mesh Forensics

If a service mesh (Istio, Linkerd) is deployed, it provides additional forensic evidence:

# Istio -- query Envoy access logs for suspicious traffic
kubectl logs -l app=suspicious-service -n production \
    -c istio-proxy --tail=1000 \
    > /evidence/IR-2026-0042/envoy_access_logs.txt

# Check for modified Istio policies (potential lateral movement enablement)
kubectl get authorizationpolicies --all-namespaces -o yaml \
    > /evidence/IR-2026-0042/istio_auth_policies.yaml

kubectl get peerauthentications --all-namespaces -o yaml \
    > /evidence/IR-2026-0042/istio_peer_auth.yaml

# Check for destination rules that may bypass mTLS
kubectl get destinationrules --all-namespaces -o yaml \
    > /evidence/IR-2026-0042/istio_destination_rules.yaml

57.6 Evidence Collection & Preservation

57.6.1 Chain of Custody in Cloud Environments

Traditional chain of custody tracks physical evidence transfer between custodians. In cloud environments, the concept adapts to track digital evidence collection, storage, and access:

flowchart LR
    A[Evidence<br/>Identified] --> B[Collection<br/>Initiated]
    B --> C{Evidence Type}
    C -->|API Logs| D[API Export with<br/>Timestamp + Hash]
    C -->|Disk| E[Snapshot with<br/>Tags + Hash]
    C -->|Memory| F[Memory Dump with<br/>Hash + Metadata]
    C -->|Network| G[Flow Log Export<br/>with Hash]
    D --> H[Forensic Evidence<br/>Storage Bucket]
    E --> H
    F --> H
    G --> H
    H --> I[Integrity<br/>Verification]
    I --> J[Access Log<br/>Auditing]
    J --> K[Analysis<br/>Workstation]
    K --> L[Report<br/>Generation]

Chain of Custody Record Template:

{
    "case_id": "IR-2026-0042",
    "evidence_id": "EVD-001",
    "evidence_type": "EBS Snapshot",
    "source": {
        "aws_account": "123456789012",
        "region": "us-east-1",
        "resource_id": "vol-0a1b2c3d4e5f67890",
        "instance_id": "i-0a1b2c3d4e5f67890"
    },
    "acquisition": {
        "method": "aws ec2 create-snapshot",
        "snapshot_id": "snap-0a1b2c3d4e5f67890",
        "timestamp": "2026-03-15T14:30:00Z",
        "examiner": "testuser",
        "examiner_id": "arn:aws:iam::123456789012:user/testuser"
    },
    "integrity": {
        "hash_algorithm": "SHA-256",
        "hash_value": "a1b2c3d4e5f6789012345678901234567890abcdef1234567890abcdef12345678",
        "verification_timestamp": "2026-03-15T14:35:00Z"
    },
    "storage": {
        "location": "s3://forensic-evidence-bucket/IR-2026-0042/EVD-001/",
        "encryption": "AES-256 (SSE-S3)",
        "access_logging": true,
        "versioning": true,
        "object_lock": "COMPLIANCE mode, 365 days"
    },
    "custody_log": [
        {
            "timestamp": "2026-03-15T14:30:00Z",
            "action": "ACQUIRED",
            "custodian": "testuser",
            "notes": "Initial acquisition from production instance"
        },
        {
            "timestamp": "2026-03-15T15:00:00Z",
            "action": "TRANSFERRED",
            "from_custodian": "testuser",
            "to_custodian": "forensic-analyst-1",
            "notes": "Transferred to forensic analysis team"
        }
    ]
}

57.6.2 Forensic Evidence Storage Architecture

Evidence Storage Best Practices

  • Use a dedicated AWS account/Azure subscription/GCP project for forensic evidence -- separate from production
  • Enable S3 Object Lock (Compliance mode) or Azure Immutable Blob Storage to prevent evidence tampering
  • Enable versioning on all evidence storage
  • Enable access logging to track who accessed evidence and when
  • Use server-side encryption (SSE-S3, SSE-KMS, or Azure Storage Service Encryption)
  • Implement least-privilege IAM -- only forensic analysts can access the evidence bucket
  • Configure lifecycle policies to transition evidence to cold storage after case closure
# AWS: Create forensic evidence bucket with Object Lock
aws s3api create-bucket \
    --bucket forensic-evidence-ir-2026-0042 \
    --region us-east-1 \
    --object-lock-enabled-for-bucket

# Enable versioning (required for Object Lock)
aws s3api put-bucket-versioning \
    --bucket forensic-evidence-ir-2026-0042 \
    --versioning-configuration Status=Enabled

# Set default Object Lock retention
aws s3api put-object-lock-configuration \
    --bucket forensic-evidence-ir-2026-0042 \
    --object-lock-configuration '{
        "ObjectLockEnabled": "Enabled",
        "Rule": {
            "DefaultRetention": {
                "Mode": "COMPLIANCE",
                "Days": 365
            }
        }
    }'

# Enable server-side encryption
aws s3api put-bucket-encryption \
    --bucket forensic-evidence-ir-2026-0042 \
    --server-side-encryption-configuration '{
        "Rules": [{
            "ApplyServerSideEncryptionByDefault": {
                "SSEAlgorithm": "aws:kms",
                "KMSMasterKeyID": "arn:aws:kms:us-east-1:123456789012:key/forensic-key-id"
            }
        }]
    }'

# Enable access logging
aws s3api put-bucket-logging \
    --bucket forensic-evidence-ir-2026-0042 \
    --bucket-logging-status '{
        "LoggingEnabled": {
            "TargetBucket": "forensic-access-logs",
            "TargetPrefix": "evidence-bucket/"
        }
    }'

# Block public access
aws s3api put-public-access-block \
    --bucket forensic-evidence-ir-2026-0042 \
    --public-access-block-configuration \
        BlockPublicAcls=true,IgnorePublicAcls=true,BlockPublicPolicy=true,RestrictPublicBuckets=true

57.6.3 Memory Acquisition Tools

Tool Platform Method Output Format Notes
AVML Linux /proc/kcore LiME, raw Microsoft open-source; no kernel module required
LiME Linux Kernel module LiME, raw, padded Requires matching kernel headers
Volatility 3 Analysis N/A Analysis framework Analyzes memory dumps from AVML/LiME
WinPMEM Windows Driver AFF4, raw Windows memory acquisition
Magnet RAM Capture Windows User-mode Raw Free; minimal footprint
AWS SSM + AVML AWS Linux Remote command LiME No SSH required; uses SSM agent
Azure Run Command Azure Linux/Win Remote command Various Uses VM agent
GCP startup-script GCP Linux Metadata script Various Can run at instance boot

57.6.4 Write Blockers in Cloud Context

Traditional write blockers prevent modification of physical media during acquisition. In cloud, the equivalent concepts are:

  1. Read-only disk attachment: Attach forensic disk copies in read-only mode (--mode ro in GCP, mount with -o ro in Linux)
  2. Snapshot immutability: Snapshots are inherently read-only; create volumes from snapshots for analysis
  3. Object Lock: S3 Object Lock in COMPLIANCE mode prevents anyone (including root) from deleting or modifying evidence
  4. IAM restrictions: Grant only s3:GetObject (not PutObject or DeleteObject) to analysts
  5. Azure Immutable Blob: Legal hold or time-based retention prevents modification

57.6.5 Evidence Integrity Verification

#!/usr/bin/env python3
"""Evidence integrity verification script.

Generates and verifies SHA-256 hashes for all evidence files,
maintaining a chain of custody hash manifest.

Case: IR-2026-0042
"""

import hashlib
import json
import os
from datetime import datetime
from pathlib import Path

EVIDENCE_DIR = "/evidence/IR-2026-0042"
MANIFEST_FILE = f"{EVIDENCE_DIR}/hash_manifest.json"

def compute_hash(filepath, algorithm='sha256'):
    """Compute cryptographic hash of a file."""
    h = hashlib.new(algorithm)
    with open(filepath, 'rb') as f:
        while True:
            chunk = f.read(8192)
            if not chunk:
                break
            h.update(chunk)
    return h.hexdigest()

def generate_manifest(evidence_dir):
    """Generate hash manifest for all evidence files."""
    manifest = {
        'case_id': 'IR-2026-0042',
        'generated_at': datetime.utcnow().isoformat(),
        'generated_by': 'testuser',
        'hash_algorithm': 'SHA-256',
        'files': []
    }

    for root, dirs, files in os.walk(evidence_dir):
        for filename in files:
            if filename == 'hash_manifest.json':
                continue
            filepath = os.path.join(root, filename)
            relative_path = os.path.relpath(filepath, evidence_dir)
            file_hash = compute_hash(filepath)
            file_size = os.path.getsize(filepath)

            manifest['files'].append({
                'path': relative_path,
                'size_bytes': file_size,
                'sha256': file_hash,
                'collected_at': datetime.fromtimestamp(
                    os.path.getmtime(filepath)
                ).isoformat()
            })
            print(f"  {file_hash}  {relative_path} ({file_size} bytes)")

    manifest['total_files'] = len(manifest['files'])
    manifest['manifest_hash'] = hashlib.sha256(
        json.dumps(manifest['files'], sort_keys=True).encode()
    ).hexdigest()

    with open(MANIFEST_FILE, 'w') as f:
        json.dump(manifest, f, indent=2)

    print(f"\nManifest written: {MANIFEST_FILE}")
    print(f"Total files: {manifest['total_files']}")
    print(f"Manifest integrity hash: {manifest['manifest_hash']}")

def verify_manifest(manifest_file):
    """Verify evidence integrity against stored manifest."""
    with open(manifest_file, 'r') as f:
        manifest = json.load(f)

    evidence_dir = os.path.dirname(manifest_file)
    passed = 0
    failed = 0

    for entry in manifest['files']:
        filepath = os.path.join(evidence_dir, entry['path'])
        if not os.path.exists(filepath):
            print(f"  MISSING: {entry['path']}")
            failed += 1
            continue

        current_hash = compute_hash(filepath)
        if current_hash == entry['sha256']:
            print(f"  PASS: {entry['path']}")
            passed += 1
        else:
            print(f"  FAIL: {entry['path']} "
                  f"(expected {entry['sha256'][:16]}... "
                  f"got {current_hash[:16]}...)")
            failed += 1

    print(f"\nVerification complete: {passed} passed, {failed} failed")
    return failed == 0

if __name__ == '__main__':
    import sys
    if len(sys.argv) > 1 and sys.argv[1] == '--verify':
        verify_manifest(MANIFEST_FILE)
    else:
        generate_manifest(EVIDENCE_DIR)

57.7 Cloud-Native Detection Queries

57.7.1 KQL Queries for Azure Cloud Attacks

// T1078.004 -- Detect compromised cloud credentials (impossible travel)
SigninLogs
| where TimeGenerated > ago(7d)
| where ResultType == 0
| project TimeGenerated, UserPrincipalName, IPAddress, 
          Latitude = toreal(LocationDetails.geoCoordinates.latitude),
          Longitude = toreal(LocationDetails.geoCoordinates.longitude),
          City = tostring(LocationDetails.city),
          Country = tostring(LocationDetails.countryOrRegion)
| order by UserPrincipalName, TimeGenerated asc
| serialize
| extend PrevLat = prev(Latitude), PrevLon = prev(Longitude),
         PrevTime = prev(TimeGenerated), PrevUser = prev(UserPrincipalName),
         PrevCity = prev(City)
| where UserPrincipalName == PrevUser
| extend TimeDiffMinutes = datetime_diff('minute', TimeGenerated, PrevTime)
| extend DistanceKm = geo_distance_2points(Longitude, Latitude, PrevLon, PrevLat) / 1000
| extend SpeedKmH = iff(TimeDiffMinutes > 0, DistanceKm / (TimeDiffMinutes / 60.0), 0.0)
| where SpeedKmH > 900  // Faster than commercial aircraft
| project TimeGenerated, UserPrincipalName, IPAddress, City, Country,
          PrevCity, TimeDiffMinutes, DistanceKm, SpeedKmH

// T1562.008 -- Detect cloud logging disabled
AzureActivity
| where TimeGenerated > ago(24h)
| where OperationNameValue has_any (
    "MICROSOFT.INSIGHTS/DIAGNOSTICSETTINGS/DELETE",
    "MICROSOFT.INSIGHTS/LOGPROFILES/DELETE",
    "MICROSOFT.SECURITY/AUTOPROVISIONINGSETTINGS/WRITE"
)
| project TimeGenerated, Caller, OperationNameValue, 
          ResourceGroup, _ResourceId, ActivityStatusValue

// T1530 -- Detect mass download from Azure Blob Storage
StorageBlobLogs
| where TimeGenerated > ago(24h)
| where OperationName == "GetBlob"
| where StatusCode == 200
| summarize DownloadCount = count(),
            TotalBytesRead = sum(ResponseBodySize),
            UniqueBlobs = dcount(ObjectKey)
    by CallerIpAddress, AccountName, bin(TimeGenerated, 1h)
| where DownloadCount > 100 or TotalBytesRead > 1073741824
| order by TotalBytesRead desc

// T1098 -- Detect Azure AD application permission grants (persistence)
AuditLogs
| where TimeGenerated > ago(7d)
| where OperationName has_any ("Add app role assignment to service principal",
                                "Add delegated permission grant",
                                "Add application")
| extend InitiatedByUser = tostring(InitiatedBy.user.userPrincipalName)
| extend TargetApp = tostring(TargetResources[0].displayName)
| project TimeGenerated, InitiatedByUser, OperationName, TargetApp,
          Result, CorrelationId

// T1537 -- Detect data transfer to external storage accounts
AzureActivity
| where TimeGenerated > ago(24h)
| where OperationNameValue has "MICROSOFT.STORAGE"
| where OperationNameValue has_any ("WRITE", "COPY")
| extend TargetResource = tostring(parse_json(Properties).resource)
| where TargetResource !startswith "/subscriptions/"  // External target
| project TimeGenerated, Caller, OperationNameValue, TargetResource

// Detect Azure VM created in unusual region (lateral movement / staging)
AzureActivity
| where TimeGenerated > ago(7d)
| where OperationNameValue == "MICROSOFT.COMPUTE/VIRTUALMACHINES/WRITE"
| where ActivityStatusValue == "Success"
| extend VMLocation = tostring(parse_json(Properties).resource)
| summarize VMsCreated = count() by Caller, ResourceGroup, 
            bin(TimeGenerated, 1d)
| where VMsCreated > 3  // Unusual burst of VM creation

57.7.2 SPL Queries for AWS (via CloudTrail)

// T1078.004 -- Detect AWS console login from unusual location
index=cloudtrail eventName=ConsoleLogin
| iplocation sourceIPAddress
| stats dc(Country) as country_count, values(Country) as countries,
        dc(sourceIPAddress) as ip_count, values(sourceIPAddress) as ips
    by userIdentity.arn
| where country_count > 1
| sort -country_count

// T1562.008 -- Detect CloudTrail logging disabled
index=cloudtrail eventName IN ("StopLogging", "DeleteTrail", "UpdateTrail",
                                 "PutEventSelectors")
| table _time, eventName, userIdentity.arn, sourceIPAddress, awsRegion,
        requestParameters.name, errorCode

// T1530 -- Detect mass S3 object access
index=cloudtrail eventName=GetObject
| stats count as access_count, dc(requestParameters.key) as unique_objects,
        sum(additionalEventData.bytesTransferredOut) as bytes_out
    by sourceIPAddress, requestParameters.bucketName, 
       userIdentity.arn
| where access_count > 100 OR bytes_out > 1073741824
| sort -bytes_out

// T1098 -- Detect IAM privilege escalation
index=cloudtrail eventName IN ("AttachUserPolicy", "AttachRolePolicy",
                                 "PutUserPolicy", "PutRolePolicy",
                                 "CreatePolicyVersion", "AddUserToGroup",
                                 "AttachGroupPolicy")
| eval is_admin_policy=if(match(requestParameters.policyArn, 
    ".*AdministratorAccess.*|.*PowerUserAccess.*|.*IAMFullAccess.*"), 1, 0)
| where is_admin_policy=1
| table _time, eventName, userIdentity.arn, sourceIPAddress,
        requestParameters.policyArn, requestParameters.userName

// T1537 -- Detect cross-account S3 copy (exfiltration)
index=cloudtrail eventName IN ("PutObject", "CopyObject", "UploadPart")
| where recipientAccountId != userIdentity.accountId
| table _time, eventName, userIdentity.arn, sourceIPAddress,
        requestParameters.bucketName, recipientAccountId

// Detect EC2 instance launched in unusual region
index=cloudtrail eventName=RunInstances
| stats count by awsRegion, userIdentity.arn
| eventstats avg(count) as avg_launches, stdev(count) as std_launches by userIdentity.arn
| where count > (avg_launches + 2 * std_launches)
| sort -count

// Detect IMDS credential theft (T1552.005)
index=cloudtrail eventName=* userIdentity.type=AssumedRole
| where userIdentity.arn LIKE "%:i-%"
| stats dc(sourceIPAddress) as unique_ips, values(sourceIPAddress) as ips
    by userIdentity.arn
| where unique_ips > 1
| mvexpand ips
| iplocation ips
| stats dc(Country) as countries by userIdentity.arn
| where countries > 1

// Detect security group modification (firewall weakening)
index=cloudtrail eventName IN ("AuthorizeSecurityGroupIngress", 
                                 "AuthorizeSecurityGroupEgress")
| spath requestParameters.ipPermissions.items{}.ipRanges.items{}.cidrIp
| search "requestParameters.ipPermissions.items{}.ipRanges.items{}.cidrIp"="0.0.0.0/0"
| table _time, userIdentity.arn, sourceIPAddress, 
        requestParameters.groupId,
        requestParameters.ipPermissions.items{}.fromPort,
        requestParameters.ipPermissions.items{}.toPort

57.8 Serverless & PaaS Forensics

57.8.1 Serverless Forensic Challenges

Serverless architectures present the most extreme forensic challenges in cloud computing:

Challenge Description Mitigation
No persistent infrastructure Functions execute on ephemeral compute; no disk, no OS access Maximize logging; enable X-Ray/Application Insights tracing
Short execution time Functions may run for milliseconds to minutes Log all inputs, outputs, and state transitions
No memory access Cannot acquire function memory Instrument code with memory-state logging
Provider-managed runtime Cannot install forensic agents Use provider-native logging and monitoring
Cold start ambiguity New execution environments may destroy traces from previous invocations Enable provisioned concurrency for critical functions
Event-driven complexity Trigger chains span multiple services Implement distributed tracing (correlation IDs)

57.8.2 AWS Lambda Forensic Investigation

# Get all Lambda function versions (detect code injection)
aws lambda list-versions-by-function \
    --function-name payment-processor-example \
    --query 'Versions[].{Version:Version,LastModified:LastModified,SHA:CodeSha256,Runtime:Runtime}' \
    --output table

# Compare code hashes between versions to detect unauthorized changes
aws lambda get-function \
    --function-name payment-processor-example \
    --qualifier 5 \
    --query 'Configuration.CodeSha256'

# Retrieve CloudWatch Insights for function execution analysis
aws logs start-query \
    --log-group-name "/aws/lambda/payment-processor-example" \
    --start-time 1710400000 \
    --end-time 1710600000 \
    --query-string '
        fields @timestamp, @message
        | filter @message like /ERROR|Exception|Traceback|unauthorized|denied/
        | sort @timestamp asc
        | limit 500
    '

# Check for function URL or resource policy changes
aws lambda get-policy \
    --function-name payment-processor-example \
    --output json

# Review event source mappings (compromised input sources)
aws lambda list-event-source-mappings \
    --function-name payment-processor-example \
    --output json

# Check Lambda environment variables for injected values
aws lambda get-function-configuration \
    --function-name payment-processor-example \
    --query '{EnvVars:Environment.Variables,Layers:Layers[].Arn,Role:Role,Timeout:Timeout}'

57.8.3 API Gateway Forensics

# Enable API Gateway access logging (if not already enabled)
# Check current stage settings
aws apigateway get-stage \
    --rest-api-id abc123def4 \
    --stage-name production \
    --query '{AccessLog:accessLogSettings,MethodSettings:methodSettings}'

# Query API Gateway access logs in CloudWatch
aws logs filter-log-events \
    --log-group-name "API-Gateway-Access-Logs_abc123def4/production" \
    --start-time 1710400000000 \
    --end-time 1710600000000 \
    --filter-pattern '{ $.status = 403 || $.status = 401 || $.status = 500 }' \
    --output json

# Query for unusual request patterns (injection attempts)
aws logs filter-log-events \
    --log-group-name "API-Gateway-Access-Logs_abc123def4/production" \
    --start-time 1710400000000 \
    --end-time 1710600000000 \
    --filter-pattern '"SELECT" OR "UNION" OR "../" OR "<script" OR "eval("' \
    --output json

57.8.4 Managed Database Forensics

# AWS RDS -- retrieve database audit logs
aws rds describe-db-log-files \
    --db-instance-identifier production-db-example \
    --filename-contains "audit"

aws rds download-db-log-file-portion \
    --db-instance-identifier production-db-example \
    --log-file-name "audit/audit.log.2026-03-15" \
    --output text > /evidence/IR-2026-0042/rds_audit_log.txt

# Azure SQL -- query auditing logs
az sql db audit-policy show \
    --resource-group production-rg \
    --server sql-server-example \
    --name production-db

# Query Azure SQL audit logs via Log Analytics
# AzureDiagnostics
# | where ResourceProvider == "MICROSOFT.SQL"
# | where Category == "SQLSecurityAuditEvents"
# | where TimeGenerated > ago(24h)
# | where statement_s has_any ("DROP", "ALTER", "GRANT", "xp_cmdshell")
# | project TimeGenerated, server_principal_name_s, 
#           database_name_s, statement_s, client_ip_s

57.8.5 SaaS Application Forensics

SaaS forensics relies entirely on provider APIs and audit logs:

SaaS Platform Audit Log Source Retention Access Method
Microsoft 365 Unified Audit Log 90 days (E3) / 1 year (E5) Search-UnifiedAuditLog cmdlet
Google Workspace Admin Audit Log 6 months Reports API / Admin Console
Salesforce Event Monitoring 30 days (Shield: 1 year) REST API / Event Log Files
Slack Audit Logs API Enterprise Grid only Audit Logs API
GitHub Audit Log Enterprise: 6 months REST API / streaming
Okta System Log 90 days System Log API
Zoom Activity Reports 12 months Dashboard Reports API
# Microsoft 365 -- search unified audit log (PowerShell)
# Connect-ExchangeOnline -UserPrincipalName testuser@example.com
# Search-UnifiedAuditLog -StartDate "2026-03-14" -EndDate "2026-03-16" \
#     -UserIds "testuser@example.com" \
#     -Operations "FileDownloaded","FileAccessed","FileModified" \
#     -ResultSize 5000 | Export-Csv -Path "m365_audit.csv"

# Google Workspace -- query admin audit logs
# Using Google Admin SDK Reports API
# GET https://admin.googleapis.com/admin/reports/v1/activity/users/all/applications/admin
#     ?startTime=2026-03-14T00:00:00.000Z
#     &endTime=2026-03-16T23:59:59.000Z
#     &eventName=CHANGE_USER_PASSWORD

57.9 Multi-Cloud Investigation

57.9.1 Correlating Evidence Across Providers

Multi-cloud investigations are the most complex forensic scenarios. Evidence from different providers uses different formats, timestamps (though all use UTC), terminology, and access methods.

Multi-Cloud Evidence Correlation Framework:

flowchart TD
    subgraph AWS_Evidence["AWS Evidence"]
        A1[CloudTrail Logs]
        A2[VPC Flow Logs]
        A3[GuardDuty Findings]
        A4[S3 Access Logs]
    end

    subgraph Azure_Evidence["Azure Evidence"]
        B1[Activity Log]
        B2[Sign-in Logs]
        B3[NSG Flow Logs]
        B4[Defender Alerts]
    end

    subgraph GCP_Evidence["GCP Evidence"]
        C1[Cloud Audit Logs]
        C2[VPC Flow Logs]
        C3[SCC Findings]
    end

    AWS_Evidence --> D[SIEM / Analysis<br/>Platform]
    Azure_Evidence --> D
    GCP_Evidence --> D

    D --> E[Unified Timeline<br/>Construction]
    E --> F[Correlation by<br/>IP / User / Time]
    F --> G[Attack Chain<br/>Reconstruction]
    G --> H[Impact<br/>Assessment]

57.9.2 Unified Timeline Construction

The unified timeline is the central artifact of multi-cloud investigation. It normalizes events from all providers into a single chronological view:

#!/usr/bin/env python3
"""Multi-cloud forensic timeline builder.

Normalizes events from AWS CloudTrail, Azure Activity Log, and
GCP Audit Logs into a unified investigation timeline.

Case: IR-2026-0042
"""

import json
from datetime import datetime
from typing import List, Dict

class UnifiedTimelineEvent:
    """Normalized event from any cloud provider."""
    def __init__(self, timestamp, provider, event_type, actor,
                 source_ip, action, target, result, raw_event):
        self.timestamp = timestamp
        self.provider = provider
        self.event_type = event_type
        self.actor = actor
        self.source_ip = source_ip
        self.action = action
        self.target = target
        self.result = result
        self.raw_event = raw_event

    def to_dict(self):
        return {
            'timestamp': self.timestamp.isoformat(),
            'provider': self.provider,
            'event_type': self.event_type,
            'actor': self.actor,
            'source_ip': self.source_ip,
            'action': self.action,
            'target': self.target,
            'result': self.result
        }

def parse_cloudtrail_event(event: Dict) -> UnifiedTimelineEvent:
    """Parse AWS CloudTrail event into unified format."""
    identity = event.get('userIdentity', {})
    actor = identity.get('arn', identity.get('userName', 'unknown'))

    return UnifiedTimelineEvent(
        timestamp=datetime.fromisoformat(
            event['eventTime'].replace('Z', '+00:00')
        ),
        provider='AWS',
        event_type=event.get('eventSource', 'unknown').split('.')[0],
        actor=actor,
        source_ip=event.get('sourceIPAddress', 'N/A'),
        action=event.get('eventName', 'unknown'),
        target=json.dumps(event.get('requestParameters', {}))[:200],
        result='SUCCESS' if not event.get('errorCode') else 
               f"FAILED: {event.get('errorCode')}",
        raw_event=event
    )

def parse_azure_activity_event(event: Dict) -> UnifiedTimelineEvent:
    """Parse Azure Activity Log event into unified format."""
    return UnifiedTimelineEvent(
        timestamp=datetime.fromisoformat(
            event['eventTimestamp'].replace('Z', '+00:00')
        ),
        provider='AZURE',
        event_type=event.get('resourceProviderName', {}).get('value', 'unknown'),
        actor=event.get('caller', 'unknown'),
        source_ip=event.get('httpRequest', {}).get('clientIpAddress', 'N/A'),
        action=event.get('operationName', {}).get('value', 'unknown'),
        target=event.get('resourceId', 'unknown')[:200],
        result=event.get('status', {}).get('value', 'unknown'),
        raw_event=event
    )

def parse_gcp_audit_event(event: Dict) -> UnifiedTimelineEvent:
    """Parse GCP Cloud Audit Log event into unified format."""
    payload = event.get('protoPayload', {})
    auth_info = payload.get('authenticationInfo', {})
    request_metadata = payload.get('requestMetadata', {})

    return UnifiedTimelineEvent(
        timestamp=datetime.fromisoformat(
            event['timestamp'].replace('Z', '+00:00')
        ),
        provider='GCP',
        event_type=payload.get('serviceName', 'unknown'),
        actor=auth_info.get('principalEmail', 'unknown'),
        source_ip=request_metadata.get('callerIp', 'N/A'),
        action=payload.get('methodName', 'unknown'),
        target=payload.get('resourceName', 'unknown')[:200],
        result='SUCCESS' if not payload.get('status', {}).get('code') 
               else f"FAILED: {payload.get('status', {}).get('message', '')}",
        raw_event=event
    )

def build_unified_timeline(
    cloudtrail_events: List[Dict],
    azure_events: List[Dict],
    gcp_events: List[Dict]
) -> List[Dict]:
    """Build unified timeline from all provider events."""
    timeline = []

    for event in cloudtrail_events:
        timeline.append(parse_cloudtrail_event(event))

    for event in azure_events:
        timeline.append(parse_azure_activity_event(event))

    for event in gcp_events:
        timeline.append(parse_gcp_audit_event(event))

    # Sort by timestamp
    timeline.sort(key=lambda e: e.timestamp)

    return [e.to_dict() for e in timeline]

def correlate_by_ip(timeline: List[Dict]) -> Dict:
    """Find events correlated by source IP across providers."""
    ip_map = {}
    for event in timeline:
        ip = event['source_ip']
        if ip == 'N/A':
            continue
        if ip not in ip_map:
            ip_map[ip] = {'providers': set(), 'events': []}
        ip_map[ip]['providers'].add(event['provider'])
        ip_map[ip]['events'].append(event)

    # Flag IPs appearing in multiple providers
    cross_provider = {
        ip: {
            'providers': list(data['providers']),
            'event_count': len(data['events']),
            'first_seen': data['events'][0]['timestamp'],
            'last_seen': data['events'][-1]['timestamp']
        }
        for ip, data in ip_map.items()
        if len(data['providers']) > 1
    }

    return cross_provider

if __name__ == '__main__':
    # Example usage with loaded event files
    # cloudtrail = json.load(open('cloudtrail_events.json'))
    # azure = json.load(open('azure_activity_events.json'))
    # gcp = json.load(open('gcp_audit_events.json'))
    # timeline = build_unified_timeline(cloudtrail, azure, gcp)
    # correlations = correlate_by_ip(timeline)
    print("[*] Multi-cloud timeline builder ready")
    print("[*] Load events from each provider and call build_unified_timeline()")

57.9.3 SIEM Integration Patterns

SIEM AWS Integration Azure Integration GCP Integration
Microsoft Sentinel S3 connector / SQS Native connector Pub/Sub connector
Splunk Splunk Add-on for AWS Azure Monitor Add-on GCP Add-on for Splunk
Elastic SIEM AWS module (Filebeat) Azure module GCP module
Chronicle (Google) CloudTrail ingestion Azure AD connector Native integration
IBM QRadar AWS CloudTrail DSM Azure Event Hub GCP Pub/Sub

SIEM Best Practice for Multi-Cloud

Configure all cloud providers to stream logs to a single SIEM instance. Use consistent field mapping (CIM/ECS) to enable cross-provider correlation. Key fields to normalize: timestamp, source IP, user/principal, action, target resource, and result.


57.10 Forensic Automation

57.10.1 Automated Evidence Collection Pipeline

flowchart LR
    A[Security Alert<br/>Triggered] --> B{Severity<br/>Assessment}
    B -->|Critical/High| C[Automated<br/>Collection]
    B -->|Medium| D[Queue for<br/>Analyst Review]
    B -->|Low| E[Log and<br/>Monitor]

    C --> F[Memory<br/>Acquisition]
    C --> G[Disk<br/>Snapshots]
    C --> H[Log<br/>Export]
    C --> I[Config<br/>Capture]

    F --> J[Evidence<br/>Storage Bucket]
    G --> J
    H --> J
    I --> J

    J --> K[Hash<br/>Verification]
    K --> L[Chain of Custody<br/>Record]
    L --> M[Analyst<br/>Notification]

57.10.2 AWS Automated Evidence Collection (Lambda-Based)

#!/usr/bin/env python3
"""AWS automated forensic evidence collection.

Triggered by GuardDuty findings via EventBridge.
Automatically collects disk snapshots, instance metadata,
and relevant logs for forensic investigation.

This function should be deployed as an AWS Lambda function
with appropriate IAM permissions.
"""

import boto3
import json
import hashlib
from datetime import datetime

ec2 = boto3.client('ec2')
ssm = boto3.client('ssm')
s3 = boto3.client('s3')
sns = boto3.client('sns')

EVIDENCE_BUCKET = 'forensic-evidence-automated'
SNS_TOPIC = 'arn:aws:sns:us-east-1:123456789012:forensic-alerts'

def lambda_handler(event, context):
    """Process GuardDuty finding and collect evidence."""
    finding = event['detail']
    finding_type = finding['type']
    severity = finding['severity']

    # Only auto-collect for high/critical findings
    if severity < 7:
        print(f"Severity {severity} below threshold -- skipping auto-collection")
        return {'status': 'skipped', 'reason': 'below_threshold'}

    # Extract instance information
    resource = finding.get('resource', {})
    instance_details = resource.get('instanceDetails', {})
    instance_id = instance_details.get('instanceId', '')

    if not instance_id:
        print("No instance ID in finding -- checking for other resource types")
        return handle_non_instance_finding(finding)

    case_id = f"AUTO-{datetime.utcnow().strftime('%Y%m%d-%H%M%S')}-{instance_id}"
    print(f"Starting automated collection for case {case_id}")

    evidence_manifest = {
        'case_id': case_id,
        'trigger': finding_type,
        'severity': severity,
        'instance_id': instance_id,
        'collection_start': datetime.utcnow().isoformat(),
        'evidence_items': []
    }

    # Step 1: Snapshot all attached EBS volumes
    try:
        instance = ec2.describe_instances(
            InstanceIds=[instance_id]
        )['Reservations'][0]['Instances'][0]

        for block_device in instance.get('BlockDeviceMappings', []):
            volume_id = block_device['Ebs']['VolumeId']
            device_name = block_device['DeviceName']

            snapshot = ec2.create_snapshot(
                VolumeId=volume_id,
                Description=f"Forensic auto-capture - {case_id}",
                TagSpecifications=[{
                    'ResourceType': 'snapshot',
                    'Tags': [
                        {'Key': 'ForensicCase', 'Value': case_id},
                        {'Key': 'SourceInstance', 'Value': instance_id},
                        {'Key': 'SourceDevice', 'Value': device_name},
                        {'Key': 'AutoCollected', 'Value': 'true'},
                        {'Key': 'CollectionTime', 
                         'Value': datetime.utcnow().isoformat()}
                    ]
                }]
            )

            evidence_manifest['evidence_items'].append({
                'type': 'EBS_SNAPSHOT',
                'snapshot_id': snapshot['SnapshotId'],
                'source_volume': volume_id,
                'device': device_name,
                'timestamp': datetime.utcnow().isoformat()
            })
            print(f"Created snapshot {snapshot['SnapshotId']} "
                  f"for volume {volume_id}")
    except Exception as e:
        evidence_manifest['errors'] = evidence_manifest.get('errors', [])
        evidence_manifest['errors'].append(f"Snapshot failed: {str(e)}")

    # Step 2: Capture instance metadata
    try:
        metadata = {
            'instance_details': instance,
            'security_groups': instance.get('SecurityGroups', []),
            'network_interfaces': instance.get('NetworkInterfaces', []),
            'iam_profile': instance.get('IamInstanceProfile', {}),
            'tags': instance.get('Tags', [])
        }

        metadata_key = f"{case_id}/instance_metadata.json"
        metadata_json = json.dumps(metadata, default=str, indent=2)

        s3.put_object(
            Bucket=EVIDENCE_BUCKET,
            Key=metadata_key,
            Body=metadata_json,
            ServerSideEncryption='aws:kms',
            Metadata={
                'sha256': hashlib.sha256(
                    metadata_json.encode()
                ).hexdigest()
            }
        )

        evidence_manifest['evidence_items'].append({
            'type': 'INSTANCE_METADATA',
            's3_key': metadata_key,
            'timestamp': datetime.utcnow().isoformat()
        })
    except Exception as e:
        evidence_manifest['errors'] = evidence_manifest.get('errors', [])
        evidence_manifest['errors'].append(f"Metadata capture failed: {str(e)}")

    # Step 3: Isolate the instance (modify security group)
    try:
        # Create forensic isolation security group
        vpc_id = instance['VpcId']

        try:
            sg = ec2.create_security_group(
                GroupName=f'forensic-isolation-{case_id}',
                Description=f'Forensic isolation for case {case_id}',
                VpcId=vpc_id
            )
            isolation_sg_id = sg['GroupId']

            # Remove default outbound rule
            ec2.revoke_security_group_egress(
                GroupId=isolation_sg_id,
                IpPermissions=[{
                    'IpProtocol': '-1',
                    'IpRanges': [{'CidrIp': '0.0.0.0/0'}]
                }]
            )
        except ec2.exceptions.ClientError:
            # Security group may already exist
            isolation_sg_id = None

        if isolation_sg_id:
            # Replace instance security groups with isolation group
            for eni in instance.get('NetworkInterfaces', []):
                ec2.modify_network_interface_attribute(
                    NetworkInterfaceId=eni['NetworkInterfaceId'],
                    Groups=[isolation_sg_id]
                )

            evidence_manifest['isolation'] = {
                'security_group': isolation_sg_id,
                'timestamp': datetime.utcnow().isoformat()
            }
    except Exception as e:
        evidence_manifest['errors'] = evidence_manifest.get('errors', [])
        evidence_manifest['errors'].append(f"Isolation failed: {str(e)}")

    # Step 4: Save evidence manifest
    evidence_manifest['collection_end'] = datetime.utcnow().isoformat()
    manifest_json = json.dumps(evidence_manifest, default=str, indent=2)

    s3.put_object(
        Bucket=EVIDENCE_BUCKET,
        Key=f"{case_id}/evidence_manifest.json",
        Body=manifest_json,
        ServerSideEncryption='aws:kms'
    )

    # Step 5: Notify forensic team
    sns.publish(
        TopicArn=SNS_TOPIC,
        Subject=f"[FORENSIC] Automated evidence collected - {case_id}",
        Message=json.dumps({
            'case_id': case_id,
            'finding_type': finding_type,
            'severity': severity,
            'instance_id': instance_id,
            'evidence_count': len(evidence_manifest['evidence_items']),
            'errors': evidence_manifest.get('errors', []),
            'evidence_bucket': EVIDENCE_BUCKET
        }, indent=2)
    )

    return {
        'status': 'collected',
        'case_id': case_id,
        'evidence_items': len(evidence_manifest['evidence_items'])
    }

def handle_non_instance_finding(finding):
    """Handle findings not associated with EC2 instances."""
    # Handle S3, IAM, and other resource types
    resource_type = finding.get('resource', {}).get('resourceType', 'unknown')
    print(f"Non-instance finding for resource type: {resource_type}")
    return {'status': 'logged', 'resource_type': resource_type}

57.10.3 Forensic Pipeline Architecture

Production Forensic Pipeline

A complete forensic automation pipeline integrates with IR playbooks from Chapter 28: Advanced Incident Response:

  1. Detection: GuardDuty/Defender/SCC generates finding
  2. Triage: EventBridge/Event Grid routes based on severity
  3. Collection: Lambda/Functions auto-collects evidence (snapshots, logs, metadata)
  4. Preservation: Evidence stored in locked, encrypted bucket with hash verification
  5. Notification: IR team notified via Slack/Teams/PagerDuty with case ID and evidence inventory
  6. Analysis: Analyst accesses evidence via forensic workstation with full audit trail
  7. Reporting: Findings documented in standardized forensic report format

57.11 Case Studies

57.11.1 Case Study: CloudNova Corp -- AWS IAM Credential Compromise

Case Study 1: IAM Access Key Exfiltration and Crypto-Mining

Organization: CloudNova Corp (fictional SaaS provider) Environment: AWS multi-account organization (12 accounts) Detection: GuardDuty finding -- CryptoCurrency:EC2/BitcoinTool.B!DNS Duration: 72 hours from initial compromise to detection

Timeline:

Time (UTC) Event Evidence Source
2026-03-12 09:14 Phishing email sent to developer (testuser@example.com) Email gateway logs
2026-03-12 09:22 Developer clicks link, enters AWS console credentials Azure AD sign-in logs (federated)
2026-03-12 09:25 Attacker authenticates to AWS console from 203.0.113.42 CloudTrail -- ConsoleLogin
2026-03-12 09:31 Attacker creates access key AKIAIOSFODNN7EXAMPLE CloudTrail -- CreateAccessKey
2026-03-12 09:35 Attacker disables CloudTrail in ap-southeast-1 CloudTrail -- StopLogging (us-east-1 org trail captured this)
2026-03-12 09:40 Attacker launches 20 x p3.16xlarge in ap-southeast-1 CloudTrail -- RunInstances
2026-03-12 09:42 Instances begin crypto-mining operations VPC Flow Logs -- connections to mining pool 198.51.100.88
2026-03-12 10:00 Attacker creates IAM user "svc-monitor" with admin policy CloudTrail -- CreateUser, AttachUserPolicy
2026-03-12 10:05 Attacker exports 3 S3 buckets to external account CloudTrail -- PutBucketPolicy (cross-account access)
2026-03-15 06:14 GuardDuty detects crypto-mining DNS queries GuardDuty -- CryptoCurrency:EC2/BitcoinTool.B!DNS
2026-03-15 06:20 SOC analyst begins investigation IR ticket created

Key Forensic Findings:

  1. Initial Access: Compromised credentials via phishing (T1078.004). The developer's access key had not been rotated in 180 days and had AdministratorAccess attached.
  2. Defense Evasion: Attacker disabled CloudTrail in the target region (T1562.008), but the organization trail in us-east-1 captured the StopLogging event.
  3. Impact: 20 GPU instances ran for 72 hours -- estimated $47,000 in compute costs. 3 S3 buckets containing customer data were copied to an external account (T1537).
  4. Persistence: Backdoor IAM user "svc-monitor" created with admin privileges (T1098).

Remediation Actions:

  • Disabled compromised access key
  • Deleted backdoor IAM user "svc-monitor"
  • Terminated all unauthorized EC2 instances
  • Restored CloudTrail in ap-southeast-1
  • Revoked cross-account S3 bucket policies
  • Implemented SCP to prevent CloudTrail disabling
  • Enforced MFA on all IAM users
  • Implemented 90-day access key rotation policy

57.11.2 Case Study: Stratos Health -- Azure AD Compromise with Data Exfiltration

Case Study 2: Azure AD Business Email Compromise

Organization: Stratos Health (fictional healthcare analytics) Environment: Azure (3 subscriptions) + Microsoft 365 E5 Detection: Microsoft Defender for Cloud Apps -- anomalous file download Duration: 14 days from initial compromise to detection

Timeline:

Time (UTC) Event Evidence Source
2026-03-01 14:22 Attacker sends targeted phishing to CFO M365 Defender -- email analysis
2026-03-01 14:38 CFO enters credentials on phishing site Azure AD Sign-in Logs -- login from 203.0.113.55
2026-03-01 14:40 Attacker registers OAuth app "Document Sync Pro" Azure AD Audit Log -- Add application
2026-03-01 14:42 Attacker grants app Mail.Read, Files.ReadWrite.All Azure AD Audit Log -- Consent to application
2026-03-01 15:00 Attacker creates mail forwarding rule M365 Audit Log -- Set-Mailbox -ForwardingSmtpAddress
2026-03-02 -- 2026-03-14 App silently reads all CFO email and SharePoint files M365 Audit Log -- FileAccessed, MailItemsAccessed
2026-03-10 08:15 Attacker accesses Azure Key Vault via CFO's credentials Key Vault Diagnostic Logs -- SecretGet
2026-03-10 08:20 Attacker retrieves database connection strings from Key Vault Key Vault Diagnostic Logs -- SecretGet x 5
2026-03-10 09:00 Attacker connects to Azure SQL from external IP Azure SQL Audit Log -- connection from 203.0.113.55
2026-03-10 09:15 Attacker exports patient analytics data (150,000 records) Azure SQL Audit Log -- bulk SELECT statements
2026-03-14 11:30 Defender for Cloud Apps alerts on anomalous download volume Defender for Cloud Apps -- Anomaly detection

Key Forensic Findings:

  1. Initial Access: Credential phishing against CFO (T1078.004). No MFA was enforced for the CFO account.
  2. Persistence: OAuth application "Document Sync Pro" with broad permissions (T1098). Mail forwarding rule to attacker-controlled address.
  3. Collection: 14 days of email and file access via OAuth app. Key Vault secrets accessed to reach backend database. 150,000 patient analytics records exfiltrated (T1530).
  4. Impact: HIPAA breach notification required. 150,000 individuals affected. Estimated breach cost: $2.1M (notification, credit monitoring, regulatory fines).

KQL Detection Query (retrospective):

// Detect the OAuth app consent grant
AuditLogs
| where OperationName == "Consent to application"
| where TimeGenerated between (datetime(2026-03-01) .. datetime(2026-03-02))
| extend ConsentedBy = tostring(InitiatedBy.user.userPrincipalName)
| extend AppName = tostring(TargetResources[0].displayName)
| extend Permissions = tostring(TargetResources[0].modifiedProperties)
| where Permissions has_any ("Mail.Read", "Files.ReadWrite.All")
| project TimeGenerated, ConsentedBy, AppName, Permissions

57.11.3 Case Study: TerraGrid Systems -- Multi-Cloud Lateral Movement

Case Study 3: GCP-to-AWS Lateral Movement via Service Account Key

Organization: TerraGrid Systems (fictional energy analytics) Environment: GCP (primary) + AWS (data lake) + Azure (identity) Detection: AWS GuardDuty -- UnauthorizedAccess:IAMUser/InstanceCredentialExfiltration Duration: 5 days from initial compromise to detection

Timeline:

Time (UTC) Event Evidence Source
2026-03-08 03:12 Attacker exploits public-facing app on GCP (T1190) GCP Cloud Audit Log -- anomalous API calls
2026-03-08 03:15 Attacker accesses GCE instance metadata (T1552.005) GCP Audit Log -- compute.instances.get
2026-03-08 03:18 Attacker finds service account key in instance environment Container logs -- environment variable access
2026-03-08 03:25 Attacker uses service account to list GCS buckets GCP Audit Log -- storage.buckets.list from 203.0.113.99
2026-03-08 03:30 Attacker discovers AWS credentials stored in GCS bucket GCP Data Access Log -- storage.objects.get
2026-03-08 04:00 Attacker authenticates to AWS with stolen credentials CloudTrail -- API call from 203.0.113.99
2026-03-08 04:10 Attacker enumerates AWS S3 buckets and IAM roles CloudTrail -- ListBuckets, ListRoles
2026-03-08 04:30 Attacker assumes cross-account role to data lake account CloudTrail -- AssumeRole
2026-03-09 -- 2026-03-12 Attacker exfiltrates data lake contents (2.3 TB) VPC Flow Logs -- large outbound transfers
2026-03-12 22:45 GuardDuty detects credential use from external IP GuardDuty -- InstanceCredentialExfiltration
2026-03-13 06:00 IR team begins multi-cloud investigation IR ticket created

Multi-Cloud Correlation:

The key forensic breakthrough was correlating the source IP 203.0.113.99 across both GCP and AWS logs. The GCP Audit Logs showed the initial exploitation and credential theft, while the AWS CloudTrail logs showed the lateral movement and data exfiltration. Without multi-cloud log correlation, each provider's investigation would have appeared as a separate, unrelated incident.

Key Forensic Findings:

  1. Initial Access: Public application vulnerability (T1190) in GCP-hosted web application
  2. Credential Access: Instance metadata service exploitation (T1552.005) + AWS credentials stored in GCS bucket
  3. Lateral Movement: GCP to AWS via stolen credentials -- cross-cloud lateral movement
  4. Exfiltration: 2.3 TB of data lake contents over 4 days (T1537)
  5. Root Cause: AWS credentials stored in a GCS bucket without encryption, accessible to the compromised service account

Lessons Learned:

  • Never store cross-cloud credentials in cloud storage -- use secrets managers with rotation
  • Correlate logs across all cloud providers -- the same attacker IP appeared in both GCP and AWS
  • Instance metadata protection (IMDSv2 in AWS, VM metadata concealment in GCP) would have prevented credential theft
  • Data lake access should require MFA and be monitored for anomalous volume

57.12 Cloud Forensics Program

57.12.1 Readiness Assessment Checklist

Use this checklist to assess your organization's cloud forensic readiness:

Category Requirement AWS Azure GCP
Logging API audit logs enabled (all regions) CloudTrail (all regions) Activity Log (enabled) Admin Activity (always on)
Logging Data access logs enabled CloudTrail data events Diagnostic settings Data Access audit logs
Logging Network flow logs enabled VPC Flow Logs NSG Flow Logs VPC Flow Logs
Logging DNS query logs enabled Route 53 Query Logs DNS Analytics Cloud DNS Logging
Retention Audit log retention >= 1 year S3 trail + lifecycle Log Analytics workspace Cloud Logging + BigQuery export
Retention Network log retention >= 90 days S3 + lifecycle Storage account Cloud Logging sink
Detection Threat detection enabled GuardDuty (all regions) Defender for Cloud SCC Premium
Evidence Forensic evidence storage configured S3 bucket (Object Lock) Immutable Blob Storage Locked GCS bucket
Evidence Cross-account/project forensic access Cross-account IAM role Lighthouse / RBAC Cross-project IAM
Process Evidence collection playbook documented Yes/No Yes/No Yes/No
Process Forensic workstation available Dedicated EC2/account Dedicated VM/subscription Dedicated GCE/project
Process Chain of custody template Yes/No Yes/No Yes/No
Team Trained cloud forensic examiners >= 2 >= 2 >= 2
Legal Provider legal process guide obtained AWS Law Enforcement Azure Compliance GCP Law Enforcement
Testing Forensic collection drill (annual) Yes/No Yes/No Yes/No

57.12.2 Tool Selection

Tool Category Open Source Options Commercial Options
Memory Acquisition AVML, LiME Magnet RAM Capture, Belkasoft
Memory Analysis Volatility 3 Magnet AXIOM, Cellebrite
Disk Forensics Autopsy, Sleuth Kit EnCase, FTK, X-Ways
Log Analysis Elasticsearch + Kibana, Timesketch Splunk, Sentinel, Chronicle
Timeline Timesketch, plaso/log2timeline Magnet AXIOM Cyber
Cloud-Native Prowler, ScoutSuite, CloudMapper Wiz, Orca, Lacework
Container Trivy, Falco, Sysdig OSS Sysdig Secure, Aqua
Automation AWS Lambda, Azure Functions SOAR (Cortex XSOAR, Splunk SOAR)
DFIR Platform GRR Rapid Response, Velociraptor CrowdStrike Falcon Forensics

57.12.3 Team Training Requirements

Role Required Skills Recommended Certifications
Cloud Forensic Examiner Cloud architecture, API log analysis, evidence handling, memory/disk forensics GCFA, GCFE, AWS Security Specialty, AZ-500
Cloud IR Lead Incident management, cross-provider coordination, legal/regulatory knowledge GCIH, GCIA, CCSP
Forensic Automation Engineer Python/Go, cloud APIs, CI/CD, IaC AWS DevOps, CKA, Terraform Associate
Legal/Compliance Liaison Data privacy law, evidence admissibility, regulatory notification CIPP, CIPM

57.12.4 Retainer Agreements

DFIR Retainer Considerations

Organizations that lack in-house cloud forensic capability should establish retainer agreements with DFIR firms. Key provisions:

  • Response time SLA: 2-4 hour initial response for critical incidents
  • Cloud provider expertise: Ensure the firm has demonstrated expertise in your specific cloud providers
  • Evidence handling: Confirm the firm follows forensically sound evidence handling procedures
  • Reporting format: Agree on report format and deliverables upfront
  • Data handling: Ensure the firm's data handling practices comply with your regulatory requirements
  • Conflict of interest: Verify the firm does not have conflicts with potential adversaries or opposing counsel
  • Retainer fee: Typically $5,000 -- $25,000/year for guaranteed response time; applied against hourly rates if engaged

57.12.5 Regulatory Requirements

Regulation Forensic Requirement Retention Minimum Notification Deadline
GDPR Document breach investigation, DPA notification Investigation duration + defense period 72 hours to DPA
HIPAA Forensic investigation of PHI breaches 6 years 60 days to individuals; annual to HHS
PCI DSS Forensic investigation by PFI for Level 1 merchants 1 year Varies by card brand
SOX Preserve evidence related to financial systems 7 years Immediate to audit committee
NIST 800-53 IR-4 (Incident Handling), AU-11 (Audit Retention) Per system categorization Per agency policy
SEC Rules Cybersecurity incident disclosure 5 years 4 business days (Form 8-K)
CCPA/CPRA Investigate breaches involving CA residents Investigation duration "Expeditious"

57.13 Cloud Forensics Comparison: AWS vs Azure vs GCP

Capability AWS Azure GCP
Primary Audit Log CloudTrail Activity Log + Azure AD Logs Cloud Audit Logs
Always-On Audit Logging Management events (90-day lookup) Activity Log (90 days) Admin Activity (400 days)
Data Access Logging CloudTrail Data Events (S3, Lambda) Diagnostic Settings (per resource) Data Access Logs (per service)
Network Flow Logs VPC Flow Logs NSG Flow Logs VPC Flow Logs
Threat Detection GuardDuty Microsoft Defender for Cloud Security Command Center
Disk Snapshot EBS Snapshot Managed Disk Snapshot Compute Engine Snapshot
Memory Acquisition SSM + AVML/LiME Run Command + AVML/LiME Startup Script + AVML/LiME
Evidence Storage S3 + Object Lock Immutable Blob Storage Locked GCS Bucket
Log Query Athena (SQL) / CloudWatch Insights KQL (Log Analytics / Sentinel) BigQuery (SQL) / Logs Explorer
SIEM Integration S3 / SQS / EventBridge Event Hub / Sentinel connectors Pub/Sub / Chronicle
Automation Lambda + EventBridge + Step Functions Functions + Event Grid + Logic Apps Cloud Functions + Eventarc + Workflows
Configuration History AWS Config Azure Resource Graph + Change Analysis Cloud Asset Inventory
Cost for Forensic Readiness Moderate (CloudTrail S3 trail, VPC Flow Logs, GuardDuty) Moderate (Log Analytics, Defender, NSG Flow Logs) Low-Moderate (Data Access Logs, VPC Flow Logs, SCC)

57.14 Key Takeaways

  1. Cloud forensic readiness must be established before an incident -- logging, retention, evidence storage, and collection procedures cannot be retroactively applied to past events
  2. API audit logs are the most important evidence source in cloud forensics -- CloudTrail, Activity Log, and Cloud Audit Logs are the cloud equivalent of disk evidence in traditional forensics
  3. Memory acquisition must happen first -- before network isolation or instance termination, because memory is lost when instances stop
  4. Chain of custody applies equally to cloud evidence -- hash verification, access logging, immutable storage, and custody documentation are all required
  5. Multi-cloud investigations require cross-provider correlation -- the same attacker IP, timestamp, or credential may appear in multiple providers' logs
  6. Container and serverless forensics are fundamentally different from VM forensics -- evidence is ephemeral, infrastructure is provider-managed, and traditional tools may not apply
  7. Automation reduces evidence collection time from hours to minutes -- invest in Lambda/Functions-based automated collection triggered by detection findings
  8. Legal jurisdiction complexity is unique to cloud -- data residency, provider cooperation, and cross-border evidence requests add dimensions that traditional forensics rarely encounters

Review Questions

  1. Explain how the shared responsibility model affects forensic evidence availability in IaaS vs PaaS vs SaaS environments. What evidence can the customer collect in each model?

  2. An attacker has compromised an AWS IAM access key and is using it from IP 203.0.113.42. Write the CloudTrail Athena query to identify all API calls made by this attacker and determine what resources were accessed.

  3. You discover that CloudTrail was disabled in the ap-southeast-1 region during an incident. How could the organization trail have captured this event, and what detective control should be implemented to prevent this in the future?

  4. Compare memory acquisition procedures for EC2 instances, Azure VMs, and GCE instances. What are the key differences in tooling and access methods?

  5. A Kubernetes pod in your cluster has been compromised. Outline the evidence collection sequence, identifying which evidence is volatile and which is persistent, and the order in which you would collect each.

  6. Design an automated forensic evidence collection pipeline for AWS that triggers on high-severity GuardDuty findings. What evidence should be automatically collected, and how should it be preserved?

  7. In a multi-cloud investigation, an attacker used the same IP address (203.0.113.99) across GCP and AWS. How would you construct a unified timeline from both providers' audit logs, and what correlation techniques would you use?

  8. Your organization uses Azure Key Vault for secret management. Write a KQL query to detect potential bulk secret enumeration by an unauthorized user.

  9. Explain why container forensics requires capturing the writable overlay layer before a container restart, and describe how you would preserve this evidence in a Kubernetes environment.

  10. An incident requires forensic evidence from an Azure VM. Document the complete chain of custody process from disk snapshot acquisition through evidence storage, including integrity verification steps.


References and Further Reading

  • NIST SP 800-201 -- NIST Cloud Computing Forensic Science Challenges (2020)
  • NIST SP 800-86 -- Guide to Integrating Forensic Techniques into Incident Response
  • AWS Security Incident Response Guide -- aws.amazon.com/security/incident-response
  • Azure Security Best Practices for Forensics -- learn.microsoft.com/en-us/azure/security
  • GCP Security Investigation Guide -- cloud.google.com/security/investigation
  • SANS Cloud Forensics Poster -- sans.org/posters/cloud-forensics
  • CSA Cloud Forensics Capability Matrix -- cloudsecurityalliance.org
  • MITRE ATT&CK Cloud Matrix -- attack.mitre.org/matrices/enterprise/cloud
  • Volatility 3 Documentation -- volatilityfoundation.org
  • AVML (Azure VMAccess for Linux Memory) -- github.com/microsoft/avml
  • Timesketch -- timesketch.org (open-source collaborative forensic timeline analysis)
  • Related chapters: Chapter 9: Incident Response Lifecycle, Chapter 20: Cloud Attack & Defense, Chapter 27: Digital Forensics, Chapter 28: Advanced Incident Response, Chapter 46: Cloud & Container Red Team, Chapter 51: Kubernetes Security