Skip to content

Lab 10: Threat Hunting — Finding the Unknown

Difficulty: ⭐⭐⭐⭐ Expert | Duration: 3–4 hours | Chapter: 38 (Advanced Threat Hunting)


Objectives

  1. Execute five structured hunts using the PEAK methodology on a synthetic log dataset
  2. Apply stack counting and long-tail analysis to identify anomalous process chains
  3. Write Python beaconing detection to identify C2 communication in network logs
  4. Document one hunt finding and convert it to a Sigma rule
  5. Calculate ATT&CK technique coverage from hunt results

Prerequisites

  • Python 3.10+ with pandas, numpy, scipy installed
  • A SIEM with the lab dataset loaded (Elastic or Splunk free tier), OR
  • Jupyter notebook environment for offline Python-based hunting
pip install pandas numpy scipy matplotlib jupyter

Dataset

Download the synthetic hunting dataset:

# Synthetic Windows Event Logs + Sysmon + Network logs
# Contains embedded TTPs: Kerberoasting, LSASS dump, beaconing, WMI lateral movement
python docs/labs/datasets/generate-hunt-dataset.py --output ./hunt-data/

# Dataset includes:
# hunt-data/process_creation.jsonl  — 50,000 Sysmon Event 1 records
# hunt-data/network_connections.jsonl — 200,000 network flow records
# hunt-data/auth_events.jsonl       — 30,000 Windows auth events
# hunt-data/dns_queries.jsonl       — 100,000 DNS query records

Embedded malicious activity (revealed at end of lab): - 1 Kerberoasting session (3 service account TGS requests) - 1 LSASS handle acquisition - 1 C2 beaconing session (60-second intervals to suspicious IP) - 1 WMI lateral movement chain - 1 scheduled task persistence


Part 1 — Environment Setup (20 min)

# lab10_hunting.py — base setup
import pandas as pd
import numpy as np
from scipy import stats
import json
from pathlib import Path
from collections import Counter
import warnings
warnings.filterwarnings('ignore')

# Load datasets
DATA_DIR = Path("./hunt-data")

proc_df = pd.read_json(DATA_DIR / "process_creation.jsonl", lines=True)
net_df  = pd.read_json(DATA_DIR / "network_connections.jsonl", lines=True)
auth_df = pd.read_json(DATA_DIR / "auth_events.jsonl", lines=True)
dns_df  = pd.read_json(DATA_DIR / "dns_queries.jsonl", lines=True)

# Convert timestamps
for df in [proc_df, net_df, auth_df, dns_df]:
    df['timestamp'] = pd.to_datetime(df['timestamp'])

print(f"Process events: {len(proc_df):,}")
print(f"Network events: {len(net_df):,}")
print(f"Auth events:    {len(auth_df):,}")
print(f"DNS events:     {len(dns_df):,}")

# Preview schemas
print("\nProcess creation columns:", proc_df.columns.tolist())
print("Network columns:", net_df.columns.tolist())

Part 2 — Hunt H-009: Unusual Parent→Child Process Chains (45 min)

Hypothesis: Attackers use unusual parent-child process relationships to execute malicious code. Legitimate software has consistent parent-child patterns; malware chains are rare.

Step 2.1 — Stack Count Parent→Child

def stack_count_process_chains(df: pd.DataFrame, min_occurrences: int = 5) -> pd.DataFrame:
    """
    Stack count parent→child relationships.
    Low-frequency combinations are hunting leads.
    """
    chains = df.groupby(['parent_image', 'image']).size().reset_index(name='count')
    total = len(df)
    chains['frequency_pct'] = (chains['count'] / total * 100).round(4)
    chains = chains.sort_values('count')
    return chains[chains['count'] <= min_occurrences]

rare_chains = stack_count_process_chains(proc_df)
print(f"Found {len(rare_chains)} rare parent→child combinations")
print("\nTop 20 rarest (most suspicious):")
print(rare_chains.head(20).to_string(index=False))

Step 2.2 — Investigate Suspicious Chains

def investigate_chain(df: pd.DataFrame, parent: str, child: str) -> pd.DataFrame:
    """Pull all events for a specific parent→child combination."""
    return df[
        (df['parent_image'].str.endswith(parent, na=False)) &
        (df['image'].str.endswith(child, na=False))
    ][['timestamp', 'hostname', 'username', 'image', 'command_line',
       'parent_image', 'parent_command_line']].sort_values('timestamp')

# Exercise: Investigate any suspicious chains you find
# Look for: office apps spawning shells, scripting engines spawning network tools
# Example:
suspicious = investigate_chain(proc_df, 'winword.exe', 'powershell.exe')
if len(suspicious) > 0:
    print("ALERT: Word → PowerShell chain found!")
    print(suspicious.to_string())

Question 2.1: List the three most suspicious parent→child chains you found and explain why each is suspicious.

Question 2.2: What filtering would you add to reduce false positives from legitimate software?


Part 3 — Hunt H-001: Kerberoasting Detection (30 min)

Hypothesis: An attacker has requested RC4-encrypted TGS tickets for multiple service accounts in a short window.

def hunt_kerberoasting(auth_df: pd.DataFrame) -> pd.DataFrame:
    """
    Detect Kerberoasting: RC4 TGS requests for service accounts.
    Signature: EventID 4769, EncryptionType 0x17, ServiceName doesn't end in $
    """
    # Filter for TGS requests
    tgs = auth_df[
        (auth_df['event_id'] == 4769) &
        (auth_df['ticket_encryption_type'] == '0x17') &
        (~auth_df['service_name'].str.endswith('$', na=True)) &
        (auth_df['service_name'] != 'krbtgt')
    ].copy()

    if len(tgs) == 0:
        print("No Kerberoasting indicators found")
        return pd.DataFrame()

    # Group by source account and 10-minute windows
    tgs['window'] = tgs['timestamp'].dt.floor('10min')
    grouped = tgs.groupby(['source_account', 'source_ip', 'window']).agg(
        unique_spns=('service_name', 'nunique'),
        spn_list=('service_name', list),
        request_count=('service_name', 'count')
    ).reset_index()

    # Flag: multiple SPNs in one window
    suspicious = grouped[grouped['unique_spns'] >= 2]
    return suspicious.sort_values('unique_spns', ascending=False)

results = hunt_kerberoasting(auth_df)
if len(results) > 0:
    print("KERBEROASTING DETECTED:")
    for _, row in results.iterrows():
        print(f"  Account: {row['source_account']} | From: {row['source_ip']}")
        print(f"  SPNs targeted: {row['spn_list']}")
        print(f"  Window: {row['window']}")

Question 3.1: What account performed the Kerberoasting? What service accounts were targeted?

Question 3.2: Why does RC4 (0x17) indicate Kerberoasting? What would AES (0x12) indicate?


Part 4 — Hunt H-005: Beaconing Detection (45 min)

Hypothesis: A compromised host is communicating with a C2 server at regular intervals.

Step 4.1 — Implement Beaconing Detector

def detect_beaconing(net_df: pd.DataFrame,
                     min_connections: int = 10,
                     max_cv: float = 0.15,
                     min_interval_secs: int = 30,
                     max_interval_secs: int = 7200) -> pd.DataFrame:
    """
    Detect C2 beaconing by analyzing connection interval regularity.
    Low coefficient of variation = suspiciously regular timing.
    """
    results = []

    # Group connections by source IP → destination IP/port
    groups = net_df.groupby(['src_ip', 'dst_ip', 'dst_port'])

    for (src, dst, port), group in groups:
        if len(group) < min_connections:
            continue

        timestamps = group['timestamp'].sort_values()
        intervals = timestamps.diff().dt.total_seconds().dropna()
        intervals = intervals[(intervals >= min_interval_secs) &
                              (intervals <= max_interval_secs)]

        if len(intervals) < 5:
            continue

        mean_interval = intervals.mean()
        std_interval = intervals.std()
        cv = std_interval / mean_interval if mean_interval > 0 else float('inf')

        if cv <= max_cv:
            results.append({
                'src_ip': src,
                'dst_ip': dst,
                'dst_port': port,
                'connection_count': len(group),
                'mean_interval_secs': round(mean_interval, 1),
                'std_interval_secs': round(std_interval, 1),
                'coefficient_of_variation': round(cv, 4),
                'confidence': 'HIGH' if cv < 0.05 else 'MEDIUM',
                'first_seen': group['timestamp'].min(),
                'last_seen': group['timestamp'].max()
            })

    return pd.DataFrame(results).sort_values('coefficient_of_variation') if results else pd.DataFrame()

beaconing_results = detect_beaconing(net_df)
print(f"Found {len(beaconing_results)} potential beaconing connections")
if len(beaconing_results) > 0:
    print(beaconing_results.to_string(index=False))

Step 4.2 — Visualize Beaconing Pattern

import matplotlib.pyplot as plt

def plot_connection_intervals(net_df: pd.DataFrame,
                               src_ip: str, dst_ip: str, dst_port: int):
    """Plot connection timing to visualize beaconing pattern."""
    connections = net_df[
        (net_df['src_ip'] == src_ip) &
        (net_df['dst_ip'] == dst_ip) &
        (net_df['dst_port'] == dst_port)
    ].sort_values('timestamp')

    timestamps = connections['timestamp']
    intervals = timestamps.diff().dt.total_seconds().dropna()

    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(14, 4))
    fig.suptitle(f'Connection: {src_ip}{dst_ip}:{dst_port}', fontsize=12)

    # Timeline
    ax1.eventplot(timestamps.astype(np.int64) / 1e9, orientation='horizontal')
    ax1.set_title('Connection Timeline')
    ax1.set_xlabel('Time')

    # Interval distribution
    ax2.hist(intervals, bins=20, color='#58a6ff', alpha=0.7)
    ax2.axvline(intervals.mean(), color='#ff7b72', linestyle='--',
                label=f'Mean: {intervals.mean():.0f}s')
    ax2.set_title('Inter-Connection Intervals')
    ax2.set_xlabel('Seconds between connections')
    ax2.legend()

    plt.tight_layout()
    plt.savefig('beaconing_analysis.png', dpi=150, bbox_inches='tight')
    print("Plot saved: beaconing_analysis.png")

# Plot the most suspicious beaconing connection
if len(beaconing_results) > 0:
    top = beaconing_results.iloc[0]
    plot_connection_intervals(net_df, top['src_ip'], top['dst_ip'], int(top['dst_port']))

Question 4.1: What is the beaconing interval? What common C2 framework uses this sleep interval?

Question 4.2: The CV for the beaconing connection is extremely low. What does this tell you about the jitter configuration of the C2 framework?


Part 5 — Hunt H-005: WMI Lateral Movement (30 min)

def hunt_wmi_lateral_movement(proc_df: pd.DataFrame) -> pd.DataFrame:
    """
    Detect lateral WMI execution: WmiPrvSE.exe spawning unusual child processes.
    """
    wmi_children = proc_df[
        proc_df['parent_image'].str.endswith('WmiPrvSE.exe', na=False)
    ].copy()

    # Filter known legitimate WMI children
    legitimate = [
        'WmiPrvSE.exe', 'svchost.exe', 'msiexec.exe',
        'TrustedInstaller.exe', 'SearchIndexer.exe', 'wmiadap.exe'
    ]

    suspicious = wmi_children[
        ~wmi_children['image'].apply(
            lambda x: any(x.endswith(l) for l in legitimate) if pd.notna(x) else True
        )
    ]

    return suspicious[['timestamp', 'hostname', 'username', 'image',
                       'command_line', 'parent_command_line']].sort_values('timestamp')

wmi_results = hunt_wmi_lateral_movement(proc_df)
print(f"WMI lateral movement candidates: {len(wmi_results)}")
if len(wmi_results) > 0:
    print(wmi_results.to_string(index=False))

Part 6 — Document and Convert (45 min)

Step 6.1 — Hunt Documentation

Complete this template for your most significant finding:

# Hunt Report: [HUNT-ID]-[DATE]

## Hypothesis
[What were you hunting for?]

## Data Sources Used
- [List log sources]

## Query / Method
[Paste your Python code or SIEM query]

## Finding
**Severity:** [Critical/High/Medium/Low]
**Affected Host(s):** [List]
**Affected Account(s):** [List]
**Timeframe:** [Start] → [End]

## Evidence
[Key data points supporting the finding]

## ATT&CK Mapping
- Tactic: [e.g., Credential Access]
- Technique: [e.g., T1558.003 — Kerberoasting]

## Recommended Detection Rule
[Describe rule logic or paste Sigma]

Step 6.2 — Write a Sigma Rule

Convert your most significant finding into a Sigma rule using the template from Chapter 36:

title: [Your Title]
id: [Generate UUID: python -c "import uuid; print(uuid.uuid4())"]
status: experimental
description: [What does this detect?]
author: [Your Name] — Nexus SecOps Lab 10
date: [Today]
tags:
  - attack.[tactic]
  - attack.[technique_id]
logsource:
  category: [category]
  product: windows
detection:
  selection:
    [Your detection logic]
  condition: selection
falsepositives:
  - [List false positives you observed]
level: [critical/high/medium/low]

Part 7 — Coverage Measurement (20 min)

HUNTS_EXECUTED = {
    'H-009': {'technique': 'T1059', 'result': 'TBD', 'rule_written': False},
    'H-001': {'technique': 'T1558.003', 'result': 'TBD', 'rule_written': False},
    'H-BEACON': {'technique': 'T1071', 'result': 'TBD', 'rule_written': False},
    'H-005': {'technique': 'T1047', 'result': 'TBD', 'rule_written': False},
    'H-PERSIST': {'technique': 'T1053.005', 'result': 'TBD', 'rule_written': False},
}

# Fill in your results
# result: 'FOUND' | 'NOT_FOUND' | 'FALSE_POSITIVE'
# rule_written: True | False

findings = sum(1 for h in HUNTS_EXECUTED.values() if h['result'] == 'FOUND')
rules = sum(1 for h in HUNTS_EXECUTED.values() if h['rule_written'])
coverage = len(HUNTS_EXECUTED) / 196 * 100  # 196 Enterprise sub-techniques

print(f"Hunts executed: {len(HUNTS_EXECUTED)}")
print(f"Findings: {findings} ({findings/len(HUNTS_EXECUTED)*100:.0f}% finding rate)")
print(f"Rules written: {rules}")
print(f"ATT&CK coverage added: {coverage:.1f}%")

Graded Questions

  1. (3 pts) What was the mean beaconing interval? Based on this, which specific C2 framework sleep configuration does this resemble?

  2. (2 pts) You find 847 process creation events from winword.exe spawning a child process. This appears in your stack count as a low-frequency chain. Why does "low frequency" still require investigation when the absolute count is 847?

  3. (3 pts) Describe the difference between hunting with a TTP hypothesis and hunting with an IOC. Give one example of each approach for detecting the same attack.

  4. (2 pts) Your beaconing detector has a false positive rate of 20% (flagging legitimate software update mechanisms). What two parameters would you tune to reduce this, and what is the trade-off?

  5. (3 pts) Describe the complete lifecycle for the Kerberoasting finding: from hunt to remediation. Include the Sigma rule logic, the IR actions, and the AD remediation steps.

  6. (2 pts) Calculate the ROI of this hunt session: 4 analyst hours at $75/hour, one incident found that would have cost $150,000 if discovered later. Show your calculation.


Cleanup

# Remove generated datasets
rm -rf ./hunt-data/
rm -f beaconing_analysis.png