Lab 10: Threat Hunting — Finding the Unknown¶
Difficulty: ⭐⭐⭐⭐ Expert | Duration: 3–4 hours | Chapter: 38 (Advanced Threat Hunting)
Objectives¶
- Execute five structured hunts using the PEAK methodology on a synthetic log dataset
- Apply stack counting and long-tail analysis to identify anomalous process chains
- Write Python beaconing detection to identify C2 communication in network logs
- Document one hunt finding and convert it to a Sigma rule
- Calculate ATT&CK technique coverage from hunt results
Prerequisites¶
- Python 3.10+ with pandas, numpy, scipy installed
- A SIEM with the lab dataset loaded (Elastic or Splunk free tier), OR
- Jupyter notebook environment for offline Python-based hunting
Dataset¶
Download the synthetic hunting dataset:
# Synthetic Windows Event Logs + Sysmon + Network logs
# Contains embedded TTPs: Kerberoasting, LSASS dump, beaconing, WMI lateral movement
python docs/labs/datasets/generate-hunt-dataset.py --output ./hunt-data/
# Dataset includes:
# hunt-data/process_creation.jsonl — 50,000 Sysmon Event 1 records
# hunt-data/network_connections.jsonl — 200,000 network flow records
# hunt-data/auth_events.jsonl — 30,000 Windows auth events
# hunt-data/dns_queries.jsonl — 100,000 DNS query records
Embedded malicious activity (revealed at end of lab): - 1 Kerberoasting session (3 service account TGS requests) - 1 LSASS handle acquisition - 1 C2 beaconing session (60-second intervals to suspicious IP) - 1 WMI lateral movement chain - 1 scheduled task persistence
Part 1 — Environment Setup (20 min)¶
# lab10_hunting.py — base setup
import pandas as pd
import numpy as np
from scipy import stats
import json
from pathlib import Path
from collections import Counter
import warnings
warnings.filterwarnings('ignore')
# Load datasets
DATA_DIR = Path("./hunt-data")
proc_df = pd.read_json(DATA_DIR / "process_creation.jsonl", lines=True)
net_df = pd.read_json(DATA_DIR / "network_connections.jsonl", lines=True)
auth_df = pd.read_json(DATA_DIR / "auth_events.jsonl", lines=True)
dns_df = pd.read_json(DATA_DIR / "dns_queries.jsonl", lines=True)
# Convert timestamps
for df in [proc_df, net_df, auth_df, dns_df]:
df['timestamp'] = pd.to_datetime(df['timestamp'])
print(f"Process events: {len(proc_df):,}")
print(f"Network events: {len(net_df):,}")
print(f"Auth events: {len(auth_df):,}")
print(f"DNS events: {len(dns_df):,}")
# Preview schemas
print("\nProcess creation columns:", proc_df.columns.tolist())
print("Network columns:", net_df.columns.tolist())
Part 2 — Hunt H-009: Unusual Parent→Child Process Chains (45 min)¶
Hypothesis: Attackers use unusual parent-child process relationships to execute malicious code. Legitimate software has consistent parent-child patterns; malware chains are rare.
Step 2.1 — Stack Count Parent→Child¶
def stack_count_process_chains(df: pd.DataFrame, min_occurrences: int = 5) -> pd.DataFrame:
"""
Stack count parent→child relationships.
Low-frequency combinations are hunting leads.
"""
chains = df.groupby(['parent_image', 'image']).size().reset_index(name='count')
total = len(df)
chains['frequency_pct'] = (chains['count'] / total * 100).round(4)
chains = chains.sort_values('count')
return chains[chains['count'] <= min_occurrences]
rare_chains = stack_count_process_chains(proc_df)
print(f"Found {len(rare_chains)} rare parent→child combinations")
print("\nTop 20 rarest (most suspicious):")
print(rare_chains.head(20).to_string(index=False))
Step 2.2 — Investigate Suspicious Chains¶
def investigate_chain(df: pd.DataFrame, parent: str, child: str) -> pd.DataFrame:
"""Pull all events for a specific parent→child combination."""
return df[
(df['parent_image'].str.endswith(parent, na=False)) &
(df['image'].str.endswith(child, na=False))
][['timestamp', 'hostname', 'username', 'image', 'command_line',
'parent_image', 'parent_command_line']].sort_values('timestamp')
# Exercise: Investigate any suspicious chains you find
# Look for: office apps spawning shells, scripting engines spawning network tools
# Example:
suspicious = investigate_chain(proc_df, 'winword.exe', 'powershell.exe')
if len(suspicious) > 0:
print("ALERT: Word → PowerShell chain found!")
print(suspicious.to_string())
Question 2.1: List the three most suspicious parent→child chains you found and explain why each is suspicious.
Question 2.2: What filtering would you add to reduce false positives from legitimate software?
Part 3 — Hunt H-001: Kerberoasting Detection (30 min)¶
Hypothesis: An attacker has requested RC4-encrypted TGS tickets for multiple service accounts in a short window.
def hunt_kerberoasting(auth_df: pd.DataFrame) -> pd.DataFrame:
"""
Detect Kerberoasting: RC4 TGS requests for service accounts.
Signature: EventID 4769, EncryptionType 0x17, ServiceName doesn't end in $
"""
# Filter for TGS requests
tgs = auth_df[
(auth_df['event_id'] == 4769) &
(auth_df['ticket_encryption_type'] == '0x17') &
(~auth_df['service_name'].str.endswith('$', na=True)) &
(auth_df['service_name'] != 'krbtgt')
].copy()
if len(tgs) == 0:
print("No Kerberoasting indicators found")
return pd.DataFrame()
# Group by source account and 10-minute windows
tgs['window'] = tgs['timestamp'].dt.floor('10min')
grouped = tgs.groupby(['source_account', 'source_ip', 'window']).agg(
unique_spns=('service_name', 'nunique'),
spn_list=('service_name', list),
request_count=('service_name', 'count')
).reset_index()
# Flag: multiple SPNs in one window
suspicious = grouped[grouped['unique_spns'] >= 2]
return suspicious.sort_values('unique_spns', ascending=False)
results = hunt_kerberoasting(auth_df)
if len(results) > 0:
print("KERBEROASTING DETECTED:")
for _, row in results.iterrows():
print(f" Account: {row['source_account']} | From: {row['source_ip']}")
print(f" SPNs targeted: {row['spn_list']}")
print(f" Window: {row['window']}")
Question 3.1: What account performed the Kerberoasting? What service accounts were targeted?
Question 3.2: Why does RC4 (0x17) indicate Kerberoasting? What would AES (0x12) indicate?
Part 4 — Hunt H-005: Beaconing Detection (45 min)¶
Hypothesis: A compromised host is communicating with a C2 server at regular intervals.
Step 4.1 — Implement Beaconing Detector¶
def detect_beaconing(net_df: pd.DataFrame,
min_connections: int = 10,
max_cv: float = 0.15,
min_interval_secs: int = 30,
max_interval_secs: int = 7200) -> pd.DataFrame:
"""
Detect C2 beaconing by analyzing connection interval regularity.
Low coefficient of variation = suspiciously regular timing.
"""
results = []
# Group connections by source IP → destination IP/port
groups = net_df.groupby(['src_ip', 'dst_ip', 'dst_port'])
for (src, dst, port), group in groups:
if len(group) < min_connections:
continue
timestamps = group['timestamp'].sort_values()
intervals = timestamps.diff().dt.total_seconds().dropna()
intervals = intervals[(intervals >= min_interval_secs) &
(intervals <= max_interval_secs)]
if len(intervals) < 5:
continue
mean_interval = intervals.mean()
std_interval = intervals.std()
cv = std_interval / mean_interval if mean_interval > 0 else float('inf')
if cv <= max_cv:
results.append({
'src_ip': src,
'dst_ip': dst,
'dst_port': port,
'connection_count': len(group),
'mean_interval_secs': round(mean_interval, 1),
'std_interval_secs': round(std_interval, 1),
'coefficient_of_variation': round(cv, 4),
'confidence': 'HIGH' if cv < 0.05 else 'MEDIUM',
'first_seen': group['timestamp'].min(),
'last_seen': group['timestamp'].max()
})
return pd.DataFrame(results).sort_values('coefficient_of_variation') if results else pd.DataFrame()
beaconing_results = detect_beaconing(net_df)
print(f"Found {len(beaconing_results)} potential beaconing connections")
if len(beaconing_results) > 0:
print(beaconing_results.to_string(index=False))
Step 4.2 — Visualize Beaconing Pattern¶
import matplotlib.pyplot as plt
def plot_connection_intervals(net_df: pd.DataFrame,
src_ip: str, dst_ip: str, dst_port: int):
"""Plot connection timing to visualize beaconing pattern."""
connections = net_df[
(net_df['src_ip'] == src_ip) &
(net_df['dst_ip'] == dst_ip) &
(net_df['dst_port'] == dst_port)
].sort_values('timestamp')
timestamps = connections['timestamp']
intervals = timestamps.diff().dt.total_seconds().dropna()
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(14, 4))
fig.suptitle(f'Connection: {src_ip} → {dst_ip}:{dst_port}', fontsize=12)
# Timeline
ax1.eventplot(timestamps.astype(np.int64) / 1e9, orientation='horizontal')
ax1.set_title('Connection Timeline')
ax1.set_xlabel('Time')
# Interval distribution
ax2.hist(intervals, bins=20, color='#58a6ff', alpha=0.7)
ax2.axvline(intervals.mean(), color='#ff7b72', linestyle='--',
label=f'Mean: {intervals.mean():.0f}s')
ax2.set_title('Inter-Connection Intervals')
ax2.set_xlabel('Seconds between connections')
ax2.legend()
plt.tight_layout()
plt.savefig('beaconing_analysis.png', dpi=150, bbox_inches='tight')
print("Plot saved: beaconing_analysis.png")
# Plot the most suspicious beaconing connection
if len(beaconing_results) > 0:
top = beaconing_results.iloc[0]
plot_connection_intervals(net_df, top['src_ip'], top['dst_ip'], int(top['dst_port']))
Question 4.1: What is the beaconing interval? What common C2 framework uses this sleep interval?
Question 4.2: The CV for the beaconing connection is extremely low. What does this tell you about the jitter configuration of the C2 framework?
Part 5 — Hunt H-005: WMI Lateral Movement (30 min)¶
def hunt_wmi_lateral_movement(proc_df: pd.DataFrame) -> pd.DataFrame:
"""
Detect lateral WMI execution: WmiPrvSE.exe spawning unusual child processes.
"""
wmi_children = proc_df[
proc_df['parent_image'].str.endswith('WmiPrvSE.exe', na=False)
].copy()
# Filter known legitimate WMI children
legitimate = [
'WmiPrvSE.exe', 'svchost.exe', 'msiexec.exe',
'TrustedInstaller.exe', 'SearchIndexer.exe', 'wmiadap.exe'
]
suspicious = wmi_children[
~wmi_children['image'].apply(
lambda x: any(x.endswith(l) for l in legitimate) if pd.notna(x) else True
)
]
return suspicious[['timestamp', 'hostname', 'username', 'image',
'command_line', 'parent_command_line']].sort_values('timestamp')
wmi_results = hunt_wmi_lateral_movement(proc_df)
print(f"WMI lateral movement candidates: {len(wmi_results)}")
if len(wmi_results) > 0:
print(wmi_results.to_string(index=False))
Part 6 — Document and Convert (45 min)¶
Step 6.1 — Hunt Documentation¶
Complete this template for your most significant finding:
# Hunt Report: [HUNT-ID]-[DATE]
## Hypothesis
[What were you hunting for?]
## Data Sources Used
- [List log sources]
## Query / Method
[Paste your Python code or SIEM query]
## Finding
**Severity:** [Critical/High/Medium/Low]
**Affected Host(s):** [List]
**Affected Account(s):** [List]
**Timeframe:** [Start] → [End]
## Evidence
[Key data points supporting the finding]
## ATT&CK Mapping
- Tactic: [e.g., Credential Access]
- Technique: [e.g., T1558.003 — Kerberoasting]
## Recommended Detection Rule
[Describe rule logic or paste Sigma]
Step 6.2 — Write a Sigma Rule¶
Convert your most significant finding into a Sigma rule using the template from Chapter 36:
title: [Your Title]
id: [Generate UUID: python -c "import uuid; print(uuid.uuid4())"]
status: experimental
description: [What does this detect?]
author: [Your Name] — Nexus SecOps Lab 10
date: [Today]
tags:
- attack.[tactic]
- attack.[technique_id]
logsource:
category: [category]
product: windows
detection:
selection:
[Your detection logic]
condition: selection
falsepositives:
- [List false positives you observed]
level: [critical/high/medium/low]
Part 7 — Coverage Measurement (20 min)¶
HUNTS_EXECUTED = {
'H-009': {'technique': 'T1059', 'result': 'TBD', 'rule_written': False},
'H-001': {'technique': 'T1558.003', 'result': 'TBD', 'rule_written': False},
'H-BEACON': {'technique': 'T1071', 'result': 'TBD', 'rule_written': False},
'H-005': {'technique': 'T1047', 'result': 'TBD', 'rule_written': False},
'H-PERSIST': {'technique': 'T1053.005', 'result': 'TBD', 'rule_written': False},
}
# Fill in your results
# result: 'FOUND' | 'NOT_FOUND' | 'FALSE_POSITIVE'
# rule_written: True | False
findings = sum(1 for h in HUNTS_EXECUTED.values() if h['result'] == 'FOUND')
rules = sum(1 for h in HUNTS_EXECUTED.values() if h['rule_written'])
coverage = len(HUNTS_EXECUTED) / 196 * 100 # 196 Enterprise sub-techniques
print(f"Hunts executed: {len(HUNTS_EXECUTED)}")
print(f"Findings: {findings} ({findings/len(HUNTS_EXECUTED)*100:.0f}% finding rate)")
print(f"Rules written: {rules}")
print(f"ATT&CK coverage added: {coverage:.1f}%")
Graded Questions¶
-
(3 pts) What was the mean beaconing interval? Based on this, which specific C2 framework sleep configuration does this resemble?
-
(2 pts) You find 847 process creation events from winword.exe spawning a child process. This appears in your stack count as a low-frequency chain. Why does "low frequency" still require investigation when the absolute count is 847?
-
(3 pts) Describe the difference between hunting with a TTP hypothesis and hunting with an IOC. Give one example of each approach for detecting the same attack.
-
(2 pts) Your beaconing detector has a false positive rate of 20% (flagging legitimate software update mechanisms). What two parameters would you tune to reduce this, and what is the trade-off?
-
(3 pts) Describe the complete lifecycle for the Kerberoasting finding: from hunt to remediation. Include the Sigma rule logic, the IR actions, and the AD remediation steps.
-
(2 pts) Calculate the ROI of this hunt session: 4 analyst hours at $75/hour, one incident found that would have cost $150,000 if discovered later. Show your calculation.