Data Pipeline Patterns for Security Operations¶
Security data pipeline reliability directly determines SOC capability. A SIEM that misses 20% of events due to pipeline failures is operating with a 20% detection blind spot. This document describes the canonical patterns for building reliable, low-latency log ingestion pipelines.
Complete Log Ingestion Pipeline¶
flowchart LR
SRC["Log Source\n(OS / App / Network)"]
AGT["Agent /\nCollector"]
BUF["Message Buffer\n(Kafka / Queue)"]
PARSE["Parser\n(Field extraction)"]
NORM["Normalizer\n(Schema mapping)"]
ENRICH["Enricher\n(Asset / User / TI)"]
SIEM["SIEM / Data Lake\n(Storage + Index)"]
DLQ["Dead Letter Queue\n(Failed events)"]
SRC -->|Raw logs| AGT
AGT -->|Compressed / TLS| BUF
BUF --> PARSE
PARSE -->|Parse error| DLQ
PARSE --> NORM
NORM --> ENRICH
ENRICH --> SIEM
DLQ -->|Manual review| SIEM Latency Budget (target end-to-end):
| Stage | Target Latency (P95) | Failure Mode |
|---|---|---|
| Source → Agent | < 1 second | Event lost on crash if no local buffer |
| Agent → Buffer | < 5 seconds | Agent disk buffer absorbs bursts |
| Buffer → Parser | < 10 seconds | Buffer provides backpressure |
| Parser → Normalizer | < 2 seconds | CPU-bound; scale horizontally |
| Normalizer → Enricher | < 3 seconds | I/O-bound; cache enrichment data |
| Enricher → SIEM | < 15 seconds | Index write latency |
| Total (event to queryable) | < 60 seconds | Nexus SecOps-002 target |
Collection Patterns¶
Pattern 1: Agent-Based Collection¶
An agent (software process) runs on the monitored host and collects logs locally before forwarding.
How it works:
[Host OS]
├── Windows Event Log / /var/log
├── Application logs
└── [Agent Process]
├── Read local log sources
├── Buffer locally to disk
├── Compress
└── Forward via TLS to collector/buffer
Advantages: - Reliable: local disk buffer survives network interruption - Efficient: can filter/compress before transmission - Rich collection: process execution, file events, network events available only locally - No firewall rule changes needed from source
Disadvantages: - Agent must be deployed and maintained on every host - Agent consumes local CPU/memory (typically 1–3%) - Agent software is an additional attack surface - Deployment at scale requires automation (SCCM, Ansible, etc.)
When to use: Managed endpoints (workstations, servers). Default choice for endpoint telemetry collection.
Nexus SecOps control: Nexus SecOps-006 (≥98% agent coverage), Nexus SecOps-010 (endpoint process logging)
Pattern 2: Agentless API Collection¶
A central collector periodically calls an API to retrieve logs from a cloud service or SaaS platform.
How it works:
[Central Collector / Lambda]
└── Every N minutes:
├── Call cloud API (REST / GraphQL)
├── Request events since last checkpoint
├── Store checkpoint for next run
└── Forward to buffer/SIEM
Advantages: - No agent deployment on source systems - Works for SaaS and cloud services where agents cannot be installed - Centrally managed
Disadvantages: - Pull interval creates inherent latency (event may be 5–15 minutes old by arrival) - API rate limits can cause gaps during high-volume periods - Checkpoint management required to avoid duplicate or missed events - If collector fails, gaps occur until manually recovered
When to use: Cloud services (Azure AD, AWS CloudTrail, Office 365, Salesforce). Any system where agent installation is not permitted.
Nexus SecOps control: Nexus SecOps-008 (cloud API log collection)
Example — AWS CloudTrail pull:
# Pseudo-code for API pull pattern
def pull_cloudtrail_events(last_checkpoint: datetime) -> list:
client = boto3.client('cloudtrail')
events = []
paginator = client.get_paginator('lookup_events')
for page in paginator.paginate(
StartTime=last_checkpoint,
EndTime=datetime.utcnow()
):
events.extend(page['Events'])
save_checkpoint(datetime.utcnow())
return events
Pattern 3: Syslog-Based Collection¶
Network devices and legacy systems forward events using the syslog protocol (UDP/TCP port 514, or TLS on 6514).
How it works:
Advantages: - Universal: nearly every network device supports syslog - No agent needed - Low overhead
Disadvantages: - UDP syslog has no delivery confirmation — events can be silently lost - No TLS in legacy syslog (use TLS syslog / TCP on port 6514) - High-volume syslog floods can cause buffer loss - Syslog messages are unstructured — parsing is complex and brittle
When to use: Network devices (firewalls, switches, routers, load balancers), legacy Unix systems, appliances without API.
Nexus SecOps control: Nexus SecOps-003 (encrypted transport — use TLS syslog), Nexus SecOps-001
Use TCP syslog with TLS (port 6514) not UDP syslog (port 514). UDP syslog provides no delivery guarantees and no encryption.
Pattern 4: Cloud-Native Forwarding¶
Cloud providers offer native log export capabilities that push logs to a centralized target.
Examples: - AWS: CloudWatch Logs → Kinesis → Lambda/Firehose → SIEM - Azure: Diagnostic Settings → Event Hub → SIEM connector - GCP: Cloud Logging → Pub/Sub → SIEM
Advantages: - Managed by cloud provider; no infrastructure to maintain - Native integration with cloud services - Scales automatically with event volume - High reliability (cloud-managed)
Disadvantages: - Vendor lock-in for pipeline components - Cost at scale (data egress charges) - Less control over pipeline behavior
Nexus SecOps control: Nexus SecOps-008, Nexus SecOps-121 (cloud security operations)
Normalization Pipeline Stages¶
Raw logs from heterogeneous sources must be transformed into a consistent schema before correlation is possible.
Stage 1: Raw (Original)¶
The log exactly as emitted by the source. Preserve this as the canonical record.
{
"EventID": 4625,
"TimeCreated": "2024-11-15T09:22:41.1234567Z",
"SubjectUserName": "JSMITH",
"IpAddress": "192.168.1.45",
"FailureReason": "Unknown user name or bad password"
}
Stage 2: Parsed (Fields Extracted)¶
Discrete fields extracted from raw format (splitting syslog messages, parsing JSON, extracting CEF fields).
{
"raw_event_id": "4625",
"raw_timestamp": "2024-11-15T09:22:41.1234567Z",
"raw_user": "JSMITH",
"raw_src_ip": "192.168.1.45",
"raw_failure": "Unknown user name or bad password",
"parse_status": "success",
"log_source": "Windows Security"
}
Stage 3: Normalized (Schema Mapped)¶
Fields mapped to a canonical schema (OCSF, ECS, CIM, or internal standard).
{
"timestamp": "2024-11-15T09:22:41Z", // UTC-enforced
"event_type": "authentication",
"event_outcome": "failure",
"user_name": "jsmith", // lowercased
"src_ip": "192.168.1.45",
"auth_protocol": "NTLM",
"failure_reason": "bad_credentials",
"log_source_type": "windows_security",
"schema_version": "1.2"
}
Stage 4: Enriched (Context Added)¶
Asset and user context added from CMDB and directory.
{
// ... all normalized fields ...
"user_department": "Finance",
"user_admin": false,
"user_manager": "dmiller",
"src_asset_name": "FINANCE-WS-042",
"src_asset_criticality": "high",
"src_asset_owner": "Finance Dept",
"src_ip_geo_country": "US",
"src_ip_asn": "AS7922",
"src_ip_reputation": "clean"
}
Stage 5: Correlated (Multi-Event Context)¶
Detection engine correlates enriched events into alerts.
{
"alert_id": "ALERT-2024-11-15-00042",
"rule_id": "DET-089",
"rule_name": "Brute Force — Failed Auth Threshold",
"severity": "high",
"trigger_count": 47,
"trigger_window_minutes": 5,
"subject_events": ["event-001", "event-002", "..."],
"user_name": "jsmith",
"src_ip": "192.168.1.45",
"mitre_tactic": "TA0006",
"mitre_technique": "T1110"
}
Worked Example: Windows Security Event Log¶
Trace Event ID 4625 (failed logon) through the full pipeline:
| Stage | Action | Output |
|---|---|---|
| Source | Windows logs Event ID 4625 to Security event log | Binary EVTX |
| Agent | Elastic Agent reads event log; serializes to JSON | JSON string |
| Buffer | Published to Kafka topic windows.security | Kafka message |
| Parser | event_id = 4625 → category = authentication_failure | Parsed JSON |
| Normalizer | event.type = authentication; event.outcome = failure | ECS-compliant event |
| Enricher | IP lookup: 192.168.1.45 → Finance-WS-042 (critical asset) | Enriched event |
| SIEM Index | Written to SIEM index authentication-* | Searchable |
| Detection | 47 failures in 5 min from same IP → ALERT fired | Alert record |
Total pipeline latency: ~35 seconds from event occurrence to searchable alert
Error Handling¶
Dead Letter Queue (DLQ)¶
Events that fail parsing or normalization are routed to a DLQ instead of being silently dropped.
DLQ operations: - Monitor DLQ volume; spike = parsing problem needing fix - Alert if DLQ volume exceeds [1000 events/hour] — Nexus SecOps-025 - Replay from DLQ after fixing the parser - Retain DLQ events for [7 days] to enable replay
Backpressure¶
When the SIEM cannot keep up with ingest volume, the buffer absorbs the excess:
Volume spike → Buffer fills → Backpressure signal → Agent slows send rate
↓
SIEM catches up → Buffer drains → Normal rate resumes
If the buffer fills completely, events will be dropped. Monitor buffer consumer lag and alert when lag exceeds [X minutes] — Nexus SecOps-007.
Schema Evolution¶
When a log source changes its format or fields:
- Detect: Parse error rate increases for the log source
- Identify: Compare new log sample to current parser rule
- Update: Modify parser (non-breaking: add new fields; breaking: schema migration)
- Version: Tag parser with version; maintain backward compatibility for retention period
- Test: Validate against 100 historical events before deploying
- Deploy: Via change control process (Nexus SecOps-202, Nexus SecOps-203)
Schema versioning: Include schema_version field in all normalized events. Allows querying to distinguish events parsed with different schema versions.
See Reference Architecture | Integration Patterns Nexus SecOps controls: TEL domain (Nexus SecOps-001–015), DQN domain (Nexus SecOps-016–030)