Automatisierung ist der Hebel, mit dem du repetitive Recon- und Test-Aufgaben skalierst: Mehr Daten, weniger Hantieren, schnellere Iterationen. Gleichzeitig ist Automatisierung gefährlich, wenn sie unbedacht eingesetzt wird — falsche Rate, falsche Targets oder schlecht geparste Ergebnisse führen zu Lärm, falschen Reports oder rechtlichen Problemen. In diesem Teil bekommst du konkrete Prinzipien, Architektur-Patterns, sichere Praktiken und ein direkt nutzbares Starter-Skript in Python.
Grundprinzipien vor dem Automatisieren
- Frag dich: Was genau soll die Automation erreichen?
Definiere ein klares Ziel (z. B. Subdomain-Sammlung + Alive-Check + CSV-Export), keine „Alles-auf-einen-Schlag“-Pipelines. - Respect the scope
Automatisierte Aktionen sind oft invasiver als man denkt. Limitiere Targets strikt auf erlaubte Domains/IPs. - Rate-Limits & Backoff
Setze conservative Rate-Limits, benutze Exponential Backoff bei Fehlern und einstellbare Concurrency-Parameter. - Dry-run & Safe mode
Jede Pipeline sollte einen Dry-Run-Mode haben, der nur Logs erzeugt, aber keine Requests an Live-Targets sendet. - Idempotenz & Reproduzierbarkeit
Ergebnisse sollten deterministisch sein — gleiche Eingabe = gleiche Ausgabe. Versioniere Skripte, nutze timestamps. - Logging & Audit Trail
Protokolliere Eingaben, genutzte APIs, Zeiten, Konfigurationen. Bewahre Logs sicher auf (nicht mit sensiblen Secrets). - Secrets Management
Keine API-Keys oder Passwörter im Code. Nutze Environment-Variablen, Vaults, oder verschlüsselte Secrets in CI. - Deduplication & Normalisierung
Daten aus mehreren Tools (amass, subfinder, crt.sh) vereinigen und normalisieren, bevor du weiter testest.
Architektur-Pattern für Recon-Pipelines
- Collector Layer: Führt passive/aktive Tools aus (amass, subfinder, crt.sh API) und sammelt Rohdaten.
- Normalizer: Bereinigt, dedupliziert, normalisiert (IDN, lowercase, trailing dots entfernen).
- Enricher: Fügt Informationen hinzu — IPs, CDN-Erkennung, Wappalyzer-Resultate, Shodan/Censys-Metadaten (nur lesend).
- Verifier: Alive-Checks, Header-Checks, TLS-Checks.
- Reporter / Storage: CSV/JSON/DB-Export, Markierung nach Priorität, Dashboard/Notebooks.
- Scheduler: Cron / Queue (Celery / RQ) oder GitHub Actions für regelmäßige Läufe.
Dieses Schichtenmodell macht Komponenten austauschbar und einfacher testbar.
Tools orchestration — kurze Bash-Beispiele
Ein einfacher Start: subfinder → amass → dedupe → ffuf-Queue (nur auf erlaubten Hosts).
# subfinder (save)
subfinder -d example.com -o subfinder.txt
# amass passive
amass enum -passive -d example.com -o amass.txt
# merge & dedupe
cat subfinder.txt amass.txt | sort -u > subs_merged.txt
# ffuf directory discovery (nur gegen subs_merged.txt list)
while read host; do
ffuf -u "https://$host/FUZZ" -w wordlist.txt -mc 200 -t 20 -o "ffuf_${host}.json"
done < subs_merged.txt
Wichtig: ffuf/Scanner nur gegen Hosts die im Scope sind. Rate-Limits und -t
(Threads) anpassen.
Praktisches Beispiel: Python-Starter (Subdomains dedupen + Alive-Check → CSV)
Das folgende, einfach gehaltene Script sammelt Domains aus Dateien, dedupliziert, prüft https
-Alive-Status (HEAD/GET) asynchron mit aiohttp
, und exportiert ein CSV. Es ist ein Starter – in Produktion solltest du Logging, retries und bessere Fehlerbehandlung hinzufügen.
# file: recon_checker.py
# usage: python recon_checker.py --input subs_1.txt subs_2.txt --output subs_alive.csv --concurrency 20 --dryrun
import asyncio
import aiohttp
import argparse
import csv
from urllib.parse import urlparse
async def check_host(session, host, timeout=10):
"""Check if host responds on HTTPS (HEAD, fallback GET)."""
url = f"https://{host}"
try:
async with session.head(url, timeout=timeout, allow_redirects=True) as resp:
return host, resp.status, resp.headers.get('server', '')
except Exception:
try:
async with session.get(url, timeout=timeout, allow_redirects=True) as resp:
return host, resp.status, resp.headers.get('server', '')
except Exception as e:
return host, None, None
async def worker(name, queue, results, sem, session):
while True:
host = await queue.get()
if host is None:
queue.task_done()
break
async with sem:
host, status, server = await check_host(session, host)
results.append({'host': host, 'status': status, 'server': server})
queue.task_done()
async def main(inputs, output, concurrency, dryrun):
# Read + normalize + dedupe
hosts = set()
for f in inputs:
with open(f, 'r', encoding='utf-8') as fh:
for line in fh:
h = line.strip()
if not h:
continue
# normalize: if line is a URL, extract hostname
parsed = urlparse(h if '://' in h else f"//{h}")
host = parsed.hostname or h
hosts.add(host.lower().rstrip('.'))
if dryrun:
print(f"Would check {len(hosts)} hosts (dry-run).")
return
queue = asyncio.Queue()
for h in hosts:
queue.put_nowait(h)
results = []
sem = asyncio.Semaphore(concurrency)
async with aiohttp.ClientSession() as session:
tasks = []
for i in range(concurrency):
tasks.append(asyncio.create_task(worker(f"w{i}", queue, results, sem, session)))
# sentinel to stop workers
for _ in range(concurrency):
queue.put_nowait(None)
await asyncio.gather(*tasks)
# write CSV
with open(output, 'w', newline='', encoding='utf-8') as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=['host', 'status', 'server'])
writer.writeheader()
for r in sorted(results, key=lambda x: (x['status'] is None, x['host'])):
writer.writerow(r)
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument('--input', nargs='+', required=True)
parser.add_argument('--output', default='subs_alive.csv')
parser.add_argument('--concurrency', type=int, default=10)
parser.add_argument('--dryrun', action='store_true')
args = parser.parse_args()
asyncio.run(main(args.input, args.output, args.concurrency, args.dryrun))
Erläuterungen / Erweiterungen:
- Ergänze Retries (backoff) und Timeouts.
- Ersetze
head
→get
je nach Ziel. Manche Hosts blockenHEAD
. - Führe TLS-Fingerprint-Checks oder HSTS-Prüfungen im Enricher durch.
- Integriere mit
amass
/subfinder
-Output oder speichere in DB (SQLite / Postgres) für Historie.
CI / Scheduler & Beobachtung
- Scheduler: Für regelmäßige Läufe: GitHub Actions (zeitgesteuert), cronjobs oder Celery-Worker.
- Monitoring: Alerts per Slack/Email nur bei neuen/hochprioritären Funden.
- Versioning: Commit Skripte in ein Repo; releasemanagement für Änderrungen an der Pipeline.
Beispiel-Aktion (GitHub Actions) läuft täglich (nur als Idee, nicht gegen Live-Targets ohne Erlaubnis).
Ethik & Legal Reminder (nochmal kurz)
Automatisierung bringt Reichweite — aber auch Verantwortung. Bevor du Pipeline-runs gegen Programme konfigurierst, prüfe die Regeln des jeweiligen Bug-Bounty-Programms und hol ggf. explizite Erlaubnis. Wenn du unsicher bist: Dry-run, Kontakt zum Triage-Team, oder nur lokal üben.
Schreibe einen Kommentar