Teil 7 – Automatisierung & Scripting: eigene Tools, Pipelines und sichere Automatisierung

Automatisierung ist der Hebel, mit dem du repetitive Recon- und Test-Aufgaben skalierst: Mehr Daten, weniger Hantieren, schnellere Iterationen. Gleichzeitig ist Automatisierung gefährlich, wenn sie unbedacht eingesetzt wird — falsche Rate, falsche Targets oder schlecht geparste Ergebnisse führen zu Lärm, falschen Reports oder rechtlichen Problemen. In diesem Teil bekommst du konkrete Prinzipien, Architektur-Patterns, sichere Praktiken und ein direkt nutzbares Starter-Skript in Python.


Grundprinzipien vor dem Automatisieren

  1. Frag dich: Was genau soll die Automation erreichen?
    Definiere ein klares Ziel (z. B. Subdomain-Sammlung + Alive-Check + CSV-Export), keine „Alles-auf-einen-Schlag“-Pipelines.
  2. Respect the scope
    Automatisierte Aktionen sind oft invasiver als man denkt. Limitiere Targets strikt auf erlaubte Domains/IPs.
  3. Rate-Limits & Backoff
    Setze conservative Rate-Limits, benutze Exponential Backoff bei Fehlern und einstellbare Concurrency-Parameter.
  4. Dry-run & Safe mode
    Jede Pipeline sollte einen Dry-Run-Mode haben, der nur Logs erzeugt, aber keine Requests an Live-Targets sendet.
  5. Idempotenz & Reproduzierbarkeit
    Ergebnisse sollten deterministisch sein — gleiche Eingabe = gleiche Ausgabe. Versioniere Skripte, nutze timestamps.
  6. Logging & Audit Trail
    Protokolliere Eingaben, genutzte APIs, Zeiten, Konfigurationen. Bewahre Logs sicher auf (nicht mit sensiblen Secrets).
  7. Secrets Management
    Keine API-Keys oder Passwörter im Code. Nutze Environment-Variablen, Vaults, oder verschlüsselte Secrets in CI.
  8. Deduplication & Normalisierung
    Daten aus mehreren Tools (amass, subfinder, crt.sh) vereinigen und normalisieren, bevor du weiter testest.

Architektur-Pattern für Recon-Pipelines

  • Collector Layer: Führt passive/aktive Tools aus (amass, subfinder, crt.sh API) und sammelt Rohdaten.
  • Normalizer: Bereinigt, dedupliziert, normalisiert (IDN, lowercase, trailing dots entfernen).
  • Enricher: Fügt Informationen hinzu — IPs, CDN-Erkennung, Wappalyzer-Resultate, Shodan/Censys-Metadaten (nur lesend).
  • Verifier: Alive-Checks, Header-Checks, TLS-Checks.
  • Reporter / Storage: CSV/JSON/DB-Export, Markierung nach Priorität, Dashboard/Notebooks.
  • Scheduler: Cron / Queue (Celery / RQ) oder GitHub Actions für regelmäßige Läufe.

Dieses Schichtenmodell macht Komponenten austauschbar und einfacher testbar.


Tools orchestration — kurze Bash-Beispiele

Ein einfacher Start: subfinder → amass → dedupe → ffuf-Queue (nur auf erlaubten Hosts).

# subfinder (save)
subfinder -d example.com -o subfinder.txt

# amass passive
amass enum -passive -d example.com -o amass.txt

# merge & dedupe
cat subfinder.txt amass.txt | sort -u > subs_merged.txt

# ffuf directory discovery (nur gegen subs_merged.txt list)
while read host; do
  ffuf -u "https://$host/FUZZ" -w wordlist.txt -mc 200 -t 20 -o "ffuf_${host}.json"
done < subs_merged.txt

Wichtig: ffuf/Scanner nur gegen Hosts die im Scope sind. Rate-Limits und -t (Threads) anpassen.


Praktisches Beispiel: Python-Starter (Subdomains dedupen + Alive-Check → CSV)

Das folgende, einfach gehaltene Script sammelt Domains aus Dateien, dedupliziert, prüft https-Alive-Status (HEAD/GET) asynchron mit aiohttp, und exportiert ein CSV. Es ist ein Starter – in Produktion solltest du Logging, retries und bessere Fehlerbehandlung hinzufügen.

# file: recon_checker.py
# usage: python recon_checker.py --input subs_1.txt subs_2.txt --output subs_alive.csv --concurrency 20 --dryrun

import asyncio
import aiohttp
import argparse
import csv
from urllib.parse import urlparse

async def check_host(session, host, timeout=10):
    """Check if host responds on HTTPS (HEAD, fallback GET)."""
    url = f"https://{host}"
    try:
        async with session.head(url, timeout=timeout, allow_redirects=True) as resp:
            return host, resp.status, resp.headers.get('server', '')
    except Exception:
        try:
            async with session.get(url, timeout=timeout, allow_redirects=True) as resp:
                return host, resp.status, resp.headers.get('server', '')
        except Exception as e:
            return host, None, None

async def worker(name, queue, results, sem, session):
    while True:
        host = await queue.get()
        if host is None:
            queue.task_done()
            break
        async with sem:
            host, status, server = await check_host(session, host)
            results.append({'host': host, 'status': status, 'server': server})
        queue.task_done()

async def main(inputs, output, concurrency, dryrun):
    # Read + normalize + dedupe
    hosts = set()
    for f in inputs:
        with open(f, 'r', encoding='utf-8') as fh:
            for line in fh:
                h = line.strip()
                if not h:
                    continue
                # normalize: if line is a URL, extract hostname
                parsed = urlparse(h if '://' in h else f"//{h}")
                host = parsed.hostname or h
                hosts.add(host.lower().rstrip('.'))

    if dryrun:
        print(f"Would check {len(hosts)} hosts (dry-run).")
        return

    queue = asyncio.Queue()
    for h in hosts:
        queue.put_nowait(h)

    results = []
    sem = asyncio.Semaphore(concurrency)

    async with aiohttp.ClientSession() as session:
        tasks = []
        for i in range(concurrency):
            tasks.append(asyncio.create_task(worker(f"w{i}", queue, results, sem, session)))
        # sentinel to stop workers
        for _ in range(concurrency):
            queue.put_nowait(None)
        await asyncio.gather(*tasks)

    # write CSV
    with open(output, 'w', newline='', encoding='utf-8') as csvfile:
        writer = csv.DictWriter(csvfile, fieldnames=['host', 'status', 'server'])
        writer.writeheader()
        for r in sorted(results, key=lambda x: (x['status'] is None, x['host'])):
            writer.writerow(r)

if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument('--input', nargs='+', required=True)
    parser.add_argument('--output', default='subs_alive.csv')
    parser.add_argument('--concurrency', type=int, default=10)
    parser.add_argument('--dryrun', action='store_true')
    args = parser.parse_args()

    asyncio.run(main(args.input, args.output, args.concurrency, args.dryrun))

Erläuterungen / Erweiterungen:

  • Ergänze Retries (backoff) und Timeouts.
  • Ersetze headget je nach Ziel. Manche Hosts blocken HEAD.
  • Führe TLS-Fingerprint-Checks oder HSTS-Prüfungen im Enricher durch.
  • Integriere mit amass/subfinder-Output oder speichere in DB (SQLite / Postgres) für Historie.

CI / Scheduler & Beobachtung

  • Scheduler: Für regelmäßige Läufe: GitHub Actions (zeitgesteuert), cronjobs oder Celery-Worker.
  • Monitoring: Alerts per Slack/Email nur bei neuen/hochprioritären Funden.
  • Versioning: Commit Skripte in ein Repo; releasemanagement für Änderrungen an der Pipeline.

Beispiel-Aktion (GitHub Actions) läuft täglich (nur als Idee, nicht gegen Live-Targets ohne Erlaubnis).


Ethik & Legal Reminder (nochmal kurz)

Automatisierung bringt Reichweite — aber auch Verantwortung. Bevor du Pipeline-runs gegen Programme konfigurierst, prüfe die Regeln des jeweiligen Bug-Bounty-Programms und hol ggf. explizite Erlaubnis. Wenn du unsicher bist: Dry-run, Kontakt zum Triage-Team, oder nur lokal üben.


Kommentare

Schreibe einen Kommentar

Deine E-Mail-Adresse wird nicht veröffentlicht. Erforderliche Felder sind mit * markiert