Skip to content

Data Pipeline Patterns for Security Operations

Security data pipeline reliability directly determines SOC capability. A SIEM that misses 20% of events due to pipeline failures is operating with a 20% detection blind spot. This document describes the canonical patterns for building reliable, low-latency log ingestion pipelines.


Complete Log Ingestion Pipeline

flowchart LR
    SRC["Log Source\n(OS / App / Network)"]
    AGT["Agent /\nCollector"]
    BUF["Message Buffer\n(Kafka / Queue)"]
    PARSE["Parser\n(Field extraction)"]
    NORM["Normalizer\n(Schema mapping)"]
    ENRICH["Enricher\n(Asset / User / TI)"]
    SIEM["SIEM / Data Lake\n(Storage + Index)"]
    DLQ["Dead Letter Queue\n(Failed events)"]

    SRC -->|Raw logs| AGT
    AGT -->|Compressed / TLS| BUF
    BUF --> PARSE
    PARSE -->|Parse error| DLQ
    PARSE --> NORM
    NORM --> ENRICH
    ENRICH --> SIEM
    DLQ -->|Manual review| SIEM

Latency Budget (target end-to-end):

Stage Target Latency (P95) Failure Mode
Source → Agent < 1 second Event lost on crash if no local buffer
Agent → Buffer < 5 seconds Agent disk buffer absorbs bursts
Buffer → Parser < 10 seconds Buffer provides backpressure
Parser → Normalizer < 2 seconds CPU-bound; scale horizontally
Normalizer → Enricher < 3 seconds I/O-bound; cache enrichment data
Enricher → SIEM < 15 seconds Index write latency
Total (event to queryable) < 60 seconds Nexus SecOps-002 target

Collection Patterns

Pattern 1: Agent-Based Collection

An agent (software process) runs on the monitored host and collects logs locally before forwarding.

How it works:

[Host OS]
  ├── Windows Event Log / /var/log
  ├── Application logs
  └── [Agent Process]
       ├── Read local log sources
       ├── Buffer locally to disk
       ├── Compress
       └── Forward via TLS to collector/buffer

Advantages: - Reliable: local disk buffer survives network interruption - Efficient: can filter/compress before transmission - Rich collection: process execution, file events, network events available only locally - No firewall rule changes needed from source

Disadvantages: - Agent must be deployed and maintained on every host - Agent consumes local CPU/memory (typically 1–3%) - Agent software is an additional attack surface - Deployment at scale requires automation (SCCM, Ansible, etc.)

When to use: Managed endpoints (workstations, servers). Default choice for endpoint telemetry collection.

Nexus SecOps control: Nexus SecOps-006 (≥98% agent coverage), Nexus SecOps-010 (endpoint process logging)


Pattern 2: Agentless API Collection

A central collector periodically calls an API to retrieve logs from a cloud service or SaaS platform.

How it works:

[Central Collector / Lambda]
  └── Every N minutes:
       ├── Call cloud API (REST / GraphQL)
       ├── Request events since last checkpoint
       ├── Store checkpoint for next run
       └── Forward to buffer/SIEM

Advantages: - No agent deployment on source systems - Works for SaaS and cloud services where agents cannot be installed - Centrally managed

Disadvantages: - Pull interval creates inherent latency (event may be 5–15 minutes old by arrival) - API rate limits can cause gaps during high-volume periods - Checkpoint management required to avoid duplicate or missed events - If collector fails, gaps occur until manually recovered

When to use: Cloud services (Azure AD, AWS CloudTrail, Office 365, Salesforce). Any system where agent installation is not permitted.

Nexus SecOps control: Nexus SecOps-008 (cloud API log collection)

Example — AWS CloudTrail pull:

# Pseudo-code for API pull pattern
def pull_cloudtrail_events(last_checkpoint: datetime) -> list:
    client = boto3.client('cloudtrail')
    events = []
    paginator = client.get_paginator('lookup_events')
    for page in paginator.paginate(
        StartTime=last_checkpoint,
        EndTime=datetime.utcnow()
    ):
        events.extend(page['Events'])
    save_checkpoint(datetime.utcnow())
    return events


Pattern 3: Syslog-Based Collection

Network devices and legacy systems forward events using the syslog protocol (UDP/TCP port 514, or TLS on 6514).

How it works:

[Network Device / Legacy System]
  └── Sends syslog datagrams → [Syslog Receiver]
                                    └── Parse → Forward

Advantages: - Universal: nearly every network device supports syslog - No agent needed - Low overhead

Disadvantages: - UDP syslog has no delivery confirmation — events can be silently lost - No TLS in legacy syslog (use TLS syslog / TCP on port 6514) - High-volume syslog floods can cause buffer loss - Syslog messages are unstructured — parsing is complex and brittle

When to use: Network devices (firewalls, switches, routers, load balancers), legacy Unix systems, appliances without API.

Nexus SecOps control: Nexus SecOps-003 (encrypted transport — use TLS syslog), Nexus SecOps-001

Use TCP syslog with TLS (port 6514) not UDP syslog (port 514). UDP syslog provides no delivery guarantees and no encryption.


Pattern 4: Cloud-Native Forwarding

Cloud providers offer native log export capabilities that push logs to a centralized target.

Examples: - AWS: CloudWatch Logs → Kinesis → Lambda/Firehose → SIEM - Azure: Diagnostic Settings → Event Hub → SIEM connector - GCP: Cloud Logging → Pub/Sub → SIEM

Advantages: - Managed by cloud provider; no infrastructure to maintain - Native integration with cloud services - Scales automatically with event volume - High reliability (cloud-managed)

Disadvantages: - Vendor lock-in for pipeline components - Cost at scale (data egress charges) - Less control over pipeline behavior

Nexus SecOps control: Nexus SecOps-008, Nexus SecOps-121 (cloud security operations)


Normalization Pipeline Stages

Raw logs from heterogeneous sources must be transformed into a consistent schema before correlation is possible.

Stage 1: Raw (Original)

The log exactly as emitted by the source. Preserve this as the canonical record.

{
  "EventID": 4625,
  "TimeCreated": "2024-11-15T09:22:41.1234567Z",
  "SubjectUserName": "JSMITH",
  "IpAddress": "192.168.1.45",
  "FailureReason": "Unknown user name or bad password"
}

Stage 2: Parsed (Fields Extracted)

Discrete fields extracted from raw format (splitting syslog messages, parsing JSON, extracting CEF fields).

{
  "raw_event_id": "4625",
  "raw_timestamp": "2024-11-15T09:22:41.1234567Z",
  "raw_user": "JSMITH",
  "raw_src_ip": "192.168.1.45",
  "raw_failure": "Unknown user name or bad password",
  "parse_status": "success",
  "log_source": "Windows Security"
}

Stage 3: Normalized (Schema Mapped)

Fields mapped to a canonical schema (OCSF, ECS, CIM, or internal standard).

{
  "timestamp": "2024-11-15T09:22:41Z",  // UTC-enforced
  "event_type": "authentication",
  "event_outcome": "failure",
  "user_name": "jsmith",               // lowercased
  "src_ip": "192.168.1.45",
  "auth_protocol": "NTLM",
  "failure_reason": "bad_credentials",
  "log_source_type": "windows_security",
  "schema_version": "1.2"
}

Stage 4: Enriched (Context Added)

Asset and user context added from CMDB and directory.

{
  // ... all normalized fields ...
  "user_department": "Finance",
  "user_admin": false,
  "user_manager": "dmiller",
  "src_asset_name": "FINANCE-WS-042",
  "src_asset_criticality": "high",
  "src_asset_owner": "Finance Dept",
  "src_ip_geo_country": "US",
  "src_ip_asn": "AS7922",
  "src_ip_reputation": "clean"
}

Stage 5: Correlated (Multi-Event Context)

Detection engine correlates enriched events into alerts.

{
  "alert_id": "ALERT-2024-11-15-00042",
  "rule_id": "DET-089",
  "rule_name": "Brute Force — Failed Auth Threshold",
  "severity": "high",
  "trigger_count": 47,
  "trigger_window_minutes": 5,
  "subject_events": ["event-001", "event-002", "..."],
  "user_name": "jsmith",
  "src_ip": "192.168.1.45",
  "mitre_tactic": "TA0006",
  "mitre_technique": "T1110"
}

Worked Example: Windows Security Event Log

Trace Event ID 4625 (failed logon) through the full pipeline:

Stage Action Output
Source Windows logs Event ID 4625 to Security event log Binary EVTX
Agent Elastic Agent reads event log; serializes to JSON JSON string
Buffer Published to Kafka topic windows.security Kafka message
Parser event_id = 4625 → category = authentication_failure Parsed JSON
Normalizer event.type = authentication; event.outcome = failure ECS-compliant event
Enricher IP lookup: 192.168.1.45 → Finance-WS-042 (critical asset) Enriched event
SIEM Index Written to SIEM index authentication-* Searchable
Detection 47 failures in 5 min from same IP → ALERT fired Alert record

Total pipeline latency: ~35 seconds from event occurrence to searchable alert


Error Handling

Dead Letter Queue (DLQ)

Events that fail parsing or normalization are routed to a DLQ instead of being silently dropped.

DLQ operations: - Monitor DLQ volume; spike = parsing problem needing fix - Alert if DLQ volume exceeds [1000 events/hour] — Nexus SecOps-025 - Replay from DLQ after fixing the parser - Retain DLQ events for [7 days] to enable replay

Backpressure

When the SIEM cannot keep up with ingest volume, the buffer absorbs the excess:

Volume spike → Buffer fills → Backpressure signal → Agent slows send rate
              SIEM catches up → Buffer drains → Normal rate resumes

If the buffer fills completely, events will be dropped. Monitor buffer consumer lag and alert when lag exceeds [X minutes] — Nexus SecOps-007.


Schema Evolution

When a log source changes its format or fields:

  1. Detect: Parse error rate increases for the log source
  2. Identify: Compare new log sample to current parser rule
  3. Update: Modify parser (non-breaking: add new fields; breaking: schema migration)
  4. Version: Tag parser with version; maintain backward compatibility for retention period
  5. Test: Validate against 100 historical events before deploying
  6. Deploy: Via change control process (Nexus SecOps-202, Nexus SecOps-203)

Schema versioning: Include schema_version field in all normalized events. Allows querying to distinguish events parsed with different schema versions.


See Reference Architecture | Integration Patterns Nexus SecOps controls: TEL domain (Nexus SecOps-001–015), DQN domain (Nexus SecOps-016–030)