Chapter 38: Advanced Threat Hunting¶
Overview¶
Threat hunting is the proactive, hypothesis-driven search for adversary activity that has evaded automated detection. This chapter elevates hunting from basic indicator searching to advanced behavioral analytics, stack counting, frequency analysis, long-tail analysis, and graph-based hunting. It covers structured hunting methodologies, building a hunting program, automating hunts into detections, and measuring hunting effectiveness.
Learning Objectives¶
- Apply the PEAK and TaHiTI hunting methodologies to structure hunts
- Perform stack counting, frequency analysis, and long-tail analysis for anomaly discovery
- Write advanced hunting queries in KQL, SPL, and EQL targeting specific TTPs
- Use graph analysis to identify lateral movement and privilege escalation paths
- Automate successful hunts into production detection rules
- Measure hunting program maturity and effectiveness
Prerequisites¶
- Chapter 5 (Detection Engineering)
- Chapter 7 (Threat Intelligence)
- Chapter 36 (Purple Team Operations)
- Proficiency in at least one SIEM query language (KQL or SPL)
Hunting Is Not Searching for IOCs
Scanning logs for known bad IPs and file hashes is threat intelligence enrichment, not threat hunting. Real hunting starts with a hypothesis about attacker behavior and searches for evidence of that behavior regardless of whether indicators are known. The value of hunting comes from finding the unknown — TTPs that have no existing detection signature.
38.1 Hunting Methodologies¶
PEAK Framework¶
PEAK (Prepare, Execute, Act with Knowledge) is a structured hunting methodology:
flowchart TD
P[PREPARE\nHypothesis formation\nData source inventory\nQuery design] --> E
E[EXECUTE\nData collection\nAnalysis\nAnomalies documented] --> A
A[ACT WITH KNOWLEDGE\nConvert to detections\nUpdate threat model\nShare intelligence]
A --> P
P1[Technique hypothesis\nfrom ATT&CK/CTI] --> P
P2[Available data\nlog sources inventory] --> P
P3[Success criteria\ndefined before hunting] --> P
style P fill:#58a6ff22,stroke:#58a6ff
style E fill:#f0883e22,stroke:#f0883e
style A fill:#3fb95022,stroke:#3fb950 TaHiTI Framework¶
TaHiTI (Targeted Hunting integrating Threat Intelligence) links hunts directly to threat intelligence:
| Phase | Activity |
|---|---|
| 1. Intelligence | Consume CTI; identify actor TTPs applicable to your environment |
| 2. Focus | Select specific technique to hunt; define data requirements |
| 3. Detect | Hunt for behavioral evidence of technique |
| 4. Report | Document findings, FPs, and detection gaps |
| 5. Improve | Convert hunt to detection rule; update threat model |
Hunt Hypothesis Types¶
| Type | Example Hypothesis | Data Source |
|---|---|---|
| Intelligence-driven | "APT29 uses ADFS token forgery — do we have evidence?" | Azure AD, ADFS logs |
| TTP-driven | "Living-off-the-land execution via certutil — any downloads?" | Process creation, Sysmon |
| Analytics-driven | "Which processes are spawning cmd.exe most infrequently?" | Process creation |
| Situational | "Post-breach: are there any connections to the C2 we found?" | Network, DNS, proxy |
38.2 Statistical Hunting Techniques¶
Stack Counting¶
Stack counting aggregates a field and looks for anomalies in the distribution. Legitimate activity produces consistent, high-count stacks; attackers produce small stacks (rare values).
// KQL: Stack count parent→child process relationships
// Low-frequency parent→child combinations are suspicious
let timeframe = 30d;
SecurityEvent
| where TimeGenerated > ago(timeframe)
| where EventID == 4688
| summarize count = count() by ParentProcessName, NewProcessName
| sort by count asc
// Examine bottom 1% — rare combinations are hunting leads
// "svchost.exe → cmd.exe" appearing once when you have 50K svchost events = hunt lead
// SPL: Stack count with relative frequency
index=wineventlog EventCode=4688
| stats count by ParentProcessName, NewProcessName
| sort count
| eval pct=round(count/total*100,2)
| where count < 5
| table ParentProcessName, NewProcessName, count
What to look for:
| Low-frequency Combination | Why Suspicious |
|---|---|
winword.exe → powershell.exe | Macro execution |
excel.exe → cmd.exe | Malicious spreadsheet |
msiexec.exe → rundll32.exe | MSI-based malware |
svchost.exe → net.exe | Service-based recon |
explorer.exe → regsvr32.exe | Living off the land |
wscript.exe → powershell.exe | Script-based dropper |
Long-Tail Analysis¶
Long-tail analysis identifies values that appear very rarely in a field — the "long tail" of a frequency distribution. Attackers often create unique artifacts.
import pandas as pd
import numpy as np
from collections import Counter
def long_tail_analysis(df: pd.DataFrame, field: str,
percentile_threshold: float = 0.01) -> pd.DataFrame:
"""
Identify values in the bottom percentile of frequency.
These are hunting leads — statistically unusual activity.
Args:
df: DataFrame with log data
field: Column to analyze
percentile_threshold: Bottom X% of frequency (default 1%)
"""
counts = df[field].value_counts()
total = len(df)
threshold = counts.quantile(percentile_threshold)
rare_values = counts[counts <= threshold]
result = pd.DataFrame({
'value': rare_values.index,
'count': rare_values.values,
'frequency_pct': (rare_values.values / total * 100).round(4)
})
return result.sort_values('count')
# Example: hunt for rare UserAgent strings in proxy logs
# Common: Chrome, Firefox, Edge = thousands of occurrences
# Rare: curl/7.68.0, python-requests/2.28 = 1-5 occurrences
# Curl/python in enterprise = likely attacker tool or misconfigured app
Frequency Analysis — Time Series Beaconing Detection¶
import numpy as np
from scipy import stats
def detect_beaconing(timestamps: list, threshold_jitter: float = 0.1) -> dict:
"""
Detect C2 beaconing by analyzing inter-arrival time regularity.
Legitimate traffic is irregular; C2 beacons are periodic.
Args:
timestamps: List of Unix timestamps for connections to a destination
threshold_jitter: Coefficient of variation below which beaconing suspected
"""
if len(timestamps) < 10:
return {"beaconing": False, "reason": "Insufficient data points"}
timestamps_sorted = sorted(timestamps)
intervals = np.diff(timestamps_sorted)
if len(intervals) == 0:
return {"beaconing": False, "reason": "Single connection"}
mean_interval = np.mean(intervals)
std_interval = np.std(intervals)
cv = std_interval / mean_interval if mean_interval > 0 else float('inf')
# Low coefficient of variation = regular timing = beaconing
is_beaconing = cv < threshold_jitter
return {
"beaconing": is_beaconing,
"mean_interval_seconds": round(mean_interval, 1),
"std_interval_seconds": round(std_interval, 1),
"coefficient_of_variation": round(cv, 3),
"connection_count": len(timestamps),
"confidence": "HIGH" if cv < 0.05 else "MEDIUM" if cv < 0.1 else "LOW",
"likely_period": f"~{int(mean_interval)}s ({int(mean_interval/60)}min)" if mean_interval > 60 else f"~{int(mean_interval)}s"
}
# KQL equivalent — beaconing detection in Sentinel
BEACONING_QUERY = """
// Detect regular connection intervals to external IPs (C2 beaconing)
let timeframe = 24h;
let min_connections = 10;
let max_cv = 0.1; // Coefficient of variation threshold
CommonSecurityLog
| where TimeGenerated > ago(timeframe)
| where DeviceAction != "block"
| summarize
timestamps = make_list(TimeGenerated),
connection_count = count()
by SourceIP, DestinationIP, DestinationPort
| where connection_count >= min_connections
| mv-expand timestamps to typeof(datetime)
| sort by SourceIP, DestinationIP, timestamps asc
| serialize
| extend interval = datetime_diff('second', timestamps, prev(timestamps))
| where prev(SourceIP) == SourceIP and prev(DestinationIP) == DestinationIP
| summarize
mean_interval = avg(interval),
std_interval = stdev(interval),
count = count()
by SourceIP, DestinationIP, DestinationPort
| extend cv = std_interval / mean_interval
| where cv < max_cv and mean_interval between (30 .. 3600)
| sort by cv asc
| project SourceIP, DestinationIP, DestinationPort,
mean_interval_secs = round(mean_interval, 0),
cv = round(cv, 3),
connection_count = count,
threat_level = iff(cv < 0.05, "HIGH", "MEDIUM")
"""
38.3 Advanced KQL Hunting Queries¶
Hunt 1 — LSASS Handle Acquisition (Credential Dumping)¶
// Hunt for processes opening suspicious handles to lsass.exe
// Covers multiple dumping techniques beyond basic Mimikatz
DeviceEvents
| where ActionType == "OpenProcessApiCall"
| where FileName =~ "lsass.exe"
| where InitiatingProcessFileName !in~ (
"MsMpEng.exe", // Windows Defender
"MsSense.exe", // Defender for Endpoint
"csrss.exe",
"werfault.exe",
"taskmgr.exe",
"svchost.exe"
)
| extend AccessMask = tostring(AdditionalFields.DesiredAccess)
| where AccessMask in (
"0x1010", // PROCESS_VM_READ | PROCESS_QUERY_LIMITED_INFORMATION
"0x1410", // Read + QueryInfo
"0x1fffff" // PROCESS_ALL_ACCESS
)
| project Timestamp, DeviceName, InitiatingProcessFileName,
InitiatingProcessCommandLine, AccountName, AccessMask
| sort by Timestamp desc
Hunt 2 — Kerberos Ticket Anomalies¶
// Hunt for Kerberoasting indicators in Azure AD / on-prem DC logs
IdentityLogonEvents
| where ActionType == "LogonSuccess"
| where LogonType == "Kerberos"
| join kind=inner (
SecurityEvent
| where EventID == 4769
| where TicketEncryptionType == "0x17" // RC4
| where ServiceName !endswith "$"
| project
RequestTime = TimeGenerated,
ServiceName,
TicketEncryptionType,
ClientAddress
) on $left.DeviceName == $right.ClientAddress
| summarize
unique_services = dcount(ServiceName),
service_list = make_set(ServiceName, 20),
rc4_requests = count()
by AccountName, DeviceName
| where unique_services > 3 // Multiple SPNs in short window = Kerberoasting
| sort by unique_services desc
Hunt 3 — DNS-over-HTTPS Tunnel Detection¶
// Hunt for DoH being used to bypass DNS monitoring
// Legitimate DoH: browser to known providers; Attacker: unusual destinations
DeviceNetworkEvents
| where RemotePort == 443
| where RemoteUrl has_any (
"8.8.8.8", // Google DoH
"1.1.1.1", // Cloudflare DoH
"9.9.9.9", // Quad9 DoH
"dns.google",
"cloudflare-dns.com",
"doh.opendns.com"
)
// Known legitimate - these are fine:
| where InitiatingProcessFileName !in~ ("chrome.exe", "firefox.exe", "msedge.exe", "brave.exe")
// Flag: non-browser processes using DoH = likely attacker tool
| project Timestamp, DeviceName, InitiatingProcessFileName,
InitiatingProcessCommandLine, RemoteUrl, RemoteIP
| sort by Timestamp desc
Hunt 4 — Lateral Movement via WMI¶
// Hunt for WMI remote execution (living off the land lateral movement)
// Attacker pattern: wmic.exe /node:TARGET process call create "cmd.exe /c ..."
DeviceProcessEvents
| where InitiatingProcessFileName =~ "WmiPrvSE.exe"
| where FileName !in~ (
"WmiPrvSE.exe", "svchost.exe", "msiexec.exe",
"TrustedInstaller.exe", "SearchIndexer.exe"
)
// Filter known scheduled automation
| where InitiatingProcessCommandLine !has "SCCM"
| project Timestamp, DeviceName, FileName, ProcessCommandLine,
InitiatingProcessFileName, AccountName, AccountDomain
// Join to network to confirm remote origin
| join kind=leftouter (
DeviceNetworkEvents
| where RemotePort == 135 // DCOM
| where ActionType == "InboundConnectionAccepted"
| project NetworkTime = Timestamp, DeviceName, RemoteIP
) on DeviceName
| where isnotempty(RemoteIP)
| sort by Timestamp desc
Hunt 5 — DCSync Detection¶
// Hunt for DCSync — replication protocol abuse to dump all password hashes
// Requires DS-Replication rights; appears as domain replication events
SecurityEvent
| where EventID == 4662
| where ObjectType contains "domainDNS"
| where Properties has_any (
"1131f6aa-9c07-11d1-f79f-00c04fc2dcd2", // DS-Replication-Get-Changes
"1131f6ab-9c07-11d1-f79f-00c04fc2dcd2", // DS-Replication-Get-Changes-All
"89e95b76-444d-4c62-991a-0facbeda640c" // DS-Replication-Get-Changes-In-Filtered-Set
)
// Exclude legitimate DC-to-DC replication
| where SubjectDomainName != "NT AUTHORITY"
| where SubjectUserName !endswith "$" // Domain controller machine accounts
// Flag: user accounts performing DS-Replication = DCSync attack
| project TimeGenerated, SubjectUserName, SubjectDomainName,
SubjectLogonId, Computer, ObjectName
| sort by TimeGenerated desc
38.4 Graph-Based Hunting for Lateral Movement¶
Graph analysis reveals attack paths invisible in flat log analysis by modeling relationships between entities.
import networkx as nx
import pandas as pd
from collections import defaultdict
def build_lateral_movement_graph(auth_logs: pd.DataFrame) -> nx.DiGraph:
"""
Build directed graph of authentication events.
Nodes: hosts/users. Edges: authentication events with metadata.
"""
G = nx.DiGraph()
for _, row in auth_logs.iterrows():
source = row['source_host']
target = row['target_host']
user = row['username']
logon_type = row['logon_type']
timestamp = row['timestamp']
# Add/update edge with authentication count
if G.has_edge(source, target):
G[source][target]['count'] += 1
G[source][target]['users'].add(user)
else:
G.add_edge(source, target,
count=1,
users={user},
first_seen=timestamp,
last_seen=timestamp,
logon_types={logon_type})
return G
def find_unusual_paths(G: nx.DiGraph, normal_graph: nx.DiGraph) -> list:
"""
Find paths present in current window but absent from baseline.
New edges = potential lateral movement.
"""
suspicious = []
for u, v, data in G.edges(data=True):
if not normal_graph.has_edge(u, v):
# New authentication path — not seen in baseline
suspicious.append({
'source': u,
'target': v,
'count': data['count'],
'users': list(data['users']),
'first_seen': data['first_seen'],
'risk': 'HIGH' if len(data['users']) == 1 else 'MEDIUM'
})
return sorted(suspicious, key=lambda x: x['count'])
def find_high_degree_nodes(G: nx.DiGraph, baseline_avg_degree: float) -> list:
"""
Identify hosts with unusually high connectivity (scanning/spreading).
"""
suspicious = []
for node in G.nodes():
out_degree = G.out_degree(node)
if out_degree > baseline_avg_degree * 3:
suspicious.append({
'host': node,
'connections_to': out_degree,
'target_hosts': list(G.successors(node)),
'indicator': 'Possible C2 spreading or port scanning'
})
return suspicious
def detect_pass_the_hash_pattern(auth_logs: pd.DataFrame) -> pd.DataFrame:
"""
PtH pattern: same user, NTLM logon type 3, multiple target hosts,
short time window, no interactive logon preceding.
"""
ntlm_logons = auth_logs[
(auth_logs['logon_type'] == 3) &
(auth_logs['auth_package'] == 'NTLM')
].copy()
# Group by user and 5-minute windows
ntlm_logons['window'] = ntlm_logons['timestamp'].dt.floor('5min')
spread = ntlm_logons.groupby(['username', 'window']).agg(
unique_targets=('target_host', 'nunique'),
connection_count=('target_host', 'count'),
target_list=('target_host', list)
).reset_index()
# Flag: same user authenticating to many hosts in 5 minutes
return spread[spread['unique_targets'] >= 5].sort_values(
'unique_targets', ascending=False
)
38.5 Hunt Automation Pipeline¶
Successful hunts should be converted to automated detections. The hunt lifecycle:
flowchart LR
H[Hunt\nHypothesis] --> Q[Query\nDevelopment]
Q --> R[Manual\nReview]
R --> V{Finding?}
V -->|Yes| D[Document\nIncident]
V -->|No — but useful| S[Save as\nScheduled Hunt]
V -->|No — noise| X[Discard]
D --> C[Convert to\nDetection Rule]
C --> T[Tune FP rate\n< 5%]
T --> P[Production\nDetection]
S --> |Run weekly| Q
P --> M[Monitor\nMetrics]
style H fill:#58a6ff22,stroke:#58a6ff
style P fill:#3fb95022,stroke:#3fb950
style D fill:#ffa65722,stroke:#ffa657 class HuntAutomationPipeline:
"""
Pipeline converting successful hunts into scheduled detections.
Integrates with SIEM API for rule deployment.
"""
def __init__(self, siem_client, sigma_converter):
self.siem = siem_client
self.converter = sigma_converter
self.hunt_registry = {} # Track hunt history
def register_hunt(self, hunt_id: str, hypothesis: str,
query: str, data_sources: list, mitre_techniques: list):
"""Register a new hunt hypothesis."""
self.hunt_registry[hunt_id] = {
'hypothesis': hypothesis,
'query': query,
'data_sources': data_sources,
'mitre_techniques': mitre_techniques,
'runs': 0,
'findings': 0,
'false_positives': 0,
'status': 'active'
}
def run_hunt(self, hunt_id: str, lookback_hours: int = 24) -> dict:
"""Execute hunt and analyze results."""
hunt = self.hunt_registry[hunt_id]
results = self.siem.query(hunt['query'], lookback_hours=lookback_hours)
hunt['runs'] += 1
hunt['last_run'] = 'now'
hunt['last_result_count'] = len(results)
return {
'hunt_id': hunt_id,
'result_count': len(results),
'results': results,
'auto_promote': self._should_promote(hunt, results)
}
def _should_promote(self, hunt: dict, results: list) -> bool:
"""Determine if hunt should be promoted to detection."""
# Promote if: consistent findings, low FP rate, confirmed malicious
if hunt['runs'] < 3:
return False
fp_rate = hunt['false_positives'] / max(hunt['findings'], 1)
return hunt['findings'] > 0 and fp_rate < 0.1
def promote_to_detection(self, hunt_id: str) -> str:
"""Convert successful hunt to Sigma rule and deploy."""
hunt = self.hunt_registry[hunt_id]
sigma_rule = self.converter.query_to_sigma(
query=hunt['query'],
title=f"Automated: {hunt['hypothesis'][:50]}",
techniques=hunt['mitre_techniques']
)
rule_id = self.siem.deploy_rule(sigma_rule)
hunt['status'] = 'promoted'
return rule_id
38.6 Hunting Program Metrics¶
HUNT_PROGRAM_METRICS = {
# Volume metrics
"hunts_per_month": {
"target": 8,
"description": "Minimum hunts executed per month",
"maturity": {"L1": 1, "L2": 4, "L3": 8, "L4": 15, "L5": "continuous"}
},
"techniques_covered_pct": {
"target": 0.30,
"description": "ATT&CK techniques with at least one hunt completed",
"maturity": {"L1": 0.05, "L2": 0.15, "L3": 0.30, "L4": 0.50, "L5": 0.75}
},
# Quality metrics
"hunt_to_detection_rate": {
"target": 0.25,
"description": "Hunts that result in a new production detection rule",
"note": "Higher is better — shows hunts are finding real gaps"
},
"finding_rate": {
"target": 0.15,
"description": "Hunts that find genuine suspicious activity",
"note": "15% finding rate is healthy — too high = noisy hunts"
},
"time_to_hypothesis": {
"target_hours": 4,
"description": "Time from CTI report to first hunt execution",
"note": "Measures intelligence-to-action pipeline speed"
},
# Impact metrics
"mtth": {
"name": "Mean Time to Hunt (MTTH)",
"description": "Average time for new threat report to generate a hunt",
"target_days": 2
},
"detection_coverage_delta": {
"description": "Change in ATT&CK coverage from hunts vs. other means",
"note": "Hunting should account for >30% of new coverage"
}
}
def calculate_hunt_roi(hunts_this_quarter: list, incidents_found: int,
avg_incident_cost: float) -> dict:
"""
Calculate ROI of hunting program.
Incidents found early = lower remediation cost.
"""
total_hunt_hours = sum(h['analyst_hours'] for h in hunts_this_quarter)
analyst_cost_per_hour = 75 # Fully-loaded cost
program_cost = total_hunt_hours * analyst_cost_per_hour
# Early detection savings: 60% cost reduction vs. post-detection
incident_savings = incidents_found * avg_incident_cost * 0.60
return {
"program_cost_usd": program_cost,
"incident_savings_usd": incident_savings,
"roi_multiple": round(incident_savings / program_cost, 1),
"cost_per_hunt_hour": analyst_cost_per_hour,
"breakeven_incidents": round(program_cost / (avg_incident_cost * 0.60), 1)
}
38.7 Structured Hunt Library¶
| Hunt ID | Hypothesis | ATT&CK | Data Source | Query Language |
|---|---|---|---|---|
| H-001 | Kerberoasting via RC4 TGS requests | T1558.003 | Security Event 4769 | KQL/SPL |
| H-002 | LSASS handle access by non-system | T1003.001 | Sysmon Event 10 | KQL/SPL |
| H-003 | Living-off-the-land binary execution | T1218 | Process creation | KQL/SPL |
| H-004 | C2 beaconing via regular DNS intervals | T1071.004 | DNS logs | Python/KQL |
| H-005 | WMI lateral movement from non-system | T1047 | Sysmon/WinEvent | KQL/SPL |
| H-006 | New persistent scheduled task | T1053.005 | Security 4698 | KQL/SPL |
| H-007 | Pass-the-hash via NTLM logon type 3 | T1550.002 | Security 4624 | Python/KQL |
| H-008 | DCSync DS-Replication abuse | T1003.006 | Security 4662 | KQL/SPL |
| H-009 | Unusual parent→child process chains | T1059 | Process creation | Python |
| H-010 | DoH from non-browser process | T1071.004 | Network logs | KQL |
| H-011 | Lateral movement via unusual admin shares | T1021.002 | Security 5140 | KQL/SPL |
| H-012 | Token impersonation via privilege events | T1134 | Security 4672 | KQL/SPL |
| H-013 | ADCS ESC1 certificate request anomaly | T1649 | ADCS/Security | KQL |
| H-014 | PowerShell download cradle | T1059.001 | Sysmon 1/3 | KQL/SPL |
| H-015 | AWS credential exfil via unusual API | T1528 | CloudTrail | Athena/KQL |
Exam Prep & Certifications¶
Relevant Certifications
The topics in this chapter align with the following certifications:
Nexus SecOps Benchmark Controls¶
| Control ID | Description | Validation |
|---|---|---|
| Nexus SecOps-TH-01 | Minimum 4 structured hunts per month executed and documented | Hunt registry with timestamps; VECTR records |
| Nexus SecOps-TH-02 | Hunts triggered by CTI reports within 48 hours | CTI-to-hunt pipeline time log |
| Nexus SecOps-TH-03 | Hunt findings tracked; ≥20% converted to detection rules | Hunt register; Sigma rule repo commits |
| Nexus SecOps-TH-04 | ATT&CK hunt coverage documented and reported quarterly | Coverage heatmap; tactic coverage ≥ 30% |
| Nexus SecOps-TH-05 | Beaconing detection hunt run weekly on all egress traffic | Automated hunt scheduler; results log |
| Nexus SecOps-TH-06 | Hunt program ROI reported to leadership semi-annually | ROI calculation; incidents found attribution |
Key Terms¶
Beaconing — Regular, periodic C2 communication pattern with consistent intervals and low jitter; detectable via coefficient of variation analysis.
Coefficient of Variation (CV) — Standard deviation divided by mean; used in beaconing detection — CV < 0.1 indicates suspicious regularity.
Long-Tail Analysis — Examining the infrequent tail of a frequency distribution; rare values are hunting leads.
PEAK — Prepare, Execute, Act with Knowledge; structured threat hunting methodology.
Stack Counting — Aggregating field values and sorting by frequency; used to identify rare (suspicious) parent-child process combinations.
TaHiTI — Targeted Hunting integrating Threat Intelligence; methodology linking hunts to specific CTI-sourced actor TTPs.
Threat Hunting — Proactive, hypothesis-driven search for adversary activity that evaded automated detection; distinguished from IOC-based indicator searching.