From e6afb9123e18ff7553079dd43ee7e69a5468d5d5 Mon Sep 17 00:00:00 2001 From: mofixx Date: Fri, 8 Aug 2025 13:39:29 +0200 Subject: [PATCH] test --- Code/venv/scheißprojekt.txt | 1 + Code/venv/test.py | 426 +++++++++++++++--------------------- 2 files changed, 175 insertions(+), 252 deletions(-) create mode 100644 Code/venv/scheißprojekt.txt diff --git a/Code/venv/scheißprojekt.txt b/Code/venv/scheißprojekt.txt new file mode 100644 index 0000000..6a52a6d --- /dev/null +++ b/Code/venv/scheißprojekt.txt @@ -0,0 +1 @@ +Ich hasse meine Praxisarbeit \ No newline at end of file diff --git a/Code/venv/test.py b/Code/venv/test.py index 0472e99..f20216d 100644 --- a/Code/venv/test.py +++ b/Code/venv/test.py @@ -1,286 +1,208 @@ -#!/usr/bin/env python3 -import argparse import asyncio -import time -from dataclasses import dataclass +import json from typing import Optional, Tuple -from aioquic.quic.configuration import QuicConfiguration -from aioquic.quic.connection import QuicConnection -from aioquic.tls import SessionTicket from aioquic.asyncio.client import connect -from aioquic.quic.events import QuicEvent, HandshakeCompleted, ConnectionTerminated, ProtocolNegotiated, DatagramFrameReceived -from aioquic.h0.connection import H0_ALPN from aioquic.h3.connection import H3_ALPN +from aioquic.h3.events import HeadersReceived, DataReceived, H3Event +from aioquic.quic.configuration import QuicConfiguration +from aioquic.tls import SessionTicket -# RFC references: -# - RFC 9000 QUIC transport: anti-amplification (<=3x before address validation), Initial >=1200B[1] -# - RFC 9001 TLS over QUIC: handshake, tokens, address validation via Retry/token acceptance[2] -# - RFC 8446 TLS 1.3 (used by QUIC): handshake and tokens at TLS layer carried in QUIC[3] -@dataclass -class FlowReport: - host: str - port: int - alpn: str - used_retry: bool - token_present: bool - address_validated_ts: Optional[float] - client_bytes_sent_before_av: int - server_bytes_recv_before_av: int - client_bytes_sent_total: int - server_bytes_recv_total: int - compliant_before_av: Optional[bool] - error: Optional[str] - handshake_completed: bool - protocol: Optional[str] +class InMemoryTicketStore: + def __init__(self): + self.ticket: Optional[SessionTicket] = None -class ByteMeteringTransport(asyncio.DatagramTransport): - def __init__(self, inner: asyncio.DatagramTransport): - self._inner = inner - self.bytes_sent = 0 + def save_ticket(self, ticket: SessionTicket): + self.ticket = ticket - def sendto(self, data: bytes, addr=None): - # UDP payload bytes we send - self.bytes_sent += len(data) - self._inner.sendto(data, addr) + def get_ticket(self) -> Optional[SessionTicket]: + return self.ticket - # Delegate all other methods - def __getattr__(self, item): - return getattr(self._inner, item) -class MeteringProtocol(asyncio.DatagramProtocol): - def __init__(self, quic: QuicConnection, on_datagram_recv): - self.quic = quic - self.on_datagram_recv = on_datagram_recv - self.transport: Optional[ByteMeteringTransport] = None +async def fetch_http3( + host: str, + port: int, + path: str, + method: str = "GET", + data: Optional[bytes] = None, + sni: Optional[str] = None, + configuration: Optional[QuicConfiguration] = None, + wait_connected: bool = True, +) -> Tuple[int, bytes, bool, bool]: + """ + Führt eine einzelne HTTP/3-Anfrage aus. + Rückgabe: (status_code, body, early_data_accepted, session_resumed) + """ + assert method in ("GET", "POST"), "Nur GET/POST unterstützt" - def connection_made(self, transport): - # Wrap real transport with metering - self.transport = ByteMeteringTransport(transport) - self.quic._network_path.send_datagram = self.transport.sendto # ensure quic uses our wrapper + if configuration is None: + configuration = QuicConfiguration(is_client=True, alpn_protocols=H3_ALPN) - def datagram_received(self, data, addr): - # Count server->client bytes (payload) - self.on_datagram_recv(len(data)) - self.quic.receive_datagram(data, time.time(), addr) + # SNI korrekt über die QuicConfiguration setzen (kein server_name-Arg für connect) + if not configuration.server_name: + configuration.server_name = sni or host - def error_received(self, exc): - pass + async with connect( + host, + port, + configuration=configuration, + wait_connected=wait_connected, + ) as client: + http = client.http - def connection_lost(self, exc): - pass + headers = [ + (b":method", method.encode()), + (b":scheme", b"https"), + (b":authority", f"{host}:{port}".encode()), + (b":path", path.encode()), + (b"user-agent", b"quic-0rtt-tester/0.2"), + ] + stream_id = http.get_next_available_stream_id() + http.send_headers(stream_id, headers, end_stream=(data is None)) + if data is not None: + http.send_data(stream_id, data, end_stream=True) -async def quic_attempt(host: str, port: int, alpn: str, sni: Optional[str], timeout: float, token: Optional[bytes]) -> FlowReport: - cfg = QuicConfiguration( - is_client=True, - alpn_protocols=[alpn], - verify_mode=None, # for probing; set to CERT_REQUIRED for strict TLS - server_name=sni or host, - ) - # Inject token if provided (address validation token) - if token: - cfg.retry = True # signal client can handle retry/token - cfg.token = token + await client.wait_pending() - # Bookkeeping - server_recv_total = 0 - server_recv_before_av = 0 - client_sent_before_av = 0 - client_sent_total = 0 - address_validated_ts = None - handshake_completed = False - proto = None - used_retry = False - token_present = token is not None - error = None + status_code = 0 + body = b"" + # Flags aus TLS/QUIC (Early-Data angenommen? Resumption?) + early_data_accepted = bool(getattr(client._quic.tls, "early_data_accepted", False)) + session_resumed = bool(getattr(client._quic.tls, "session_resumed", False)) - # Handler to count per-datagram bytes from server - def on_dgram_recv(n: int): - nonlocal server_recv_total - server_recv_total += n - - loop = asyncio.get_event_loop() - - # Low-level connect to intercept transport - try: - async with connect( - host, - port, - configuration=cfg, - create_protocol=None, - wait_connected=False, # we want to observe early phases - ) as client: - # Access underlying connection and transport - quic: QuicConnection = client._quic - protocol = MeteringProtocol(quic, on_dgram_recv) - - # Create custom UDP endpoint so we can wrap transport - transport, _ = await loop.create_datagram_endpoint( - lambda: protocol, - remote_addr=(host, port), - ) - client._transport = protocol.transport # swap client transport to our wrapper - - start = time.time() - deadline = start + timeout - - # Kick off handshake by sending Initial - quic.connect() - # Ensure Initial datagram >=1200B (aioquic does this by spec[1]) - for datagram, addr in quic.datagrams_to_send(now=time.time()): - protocol.transport.sendto(datagram, addr) - - # Event loop - while time.time() < deadline: - await asyncio.sleep(0.001) - - # Pump outbound - for datagram, addr in quic.datagrams_to_send(now=time.time()): - protocol.transport.sendto(datagram, addr) - - # Process events - for event in quic.poll_events(): - # Negotiate protocol (ALPN) - if isinstance(event, ProtocolNegotiated): - proto = event.alpn_protocol - # Handshake completed -> address validated by definition[2] - if isinstance(event, HandshakeCompleted): - handshake_completed = True - if address_validated_ts is None: - address_validated_ts = time.time() - if isinstance(event, ConnectionTerminated): - # End loop gracefully - deadline = time.time() - break - - # Track bytes sent by client - client_sent_total = protocol.transport.bytes_sent - - # Address Validation moment: - if address_validated_ts is None: - # Before AV, snapshot budgets - client_sent_before_av = client_sent_total - server_recv_before_av = server_recv_total - - # Give QUIC timers a tick - quic.handle_timer(time.time()) - - # Heuristic: detect Retry packet - # In aioquic, quic._retry_sent/_retry_received are internal; fallback: token becomes set after Retry - if quic._retry_received: # type: ignore[attr-defined] - used_retry = True - - # Exit if handshake done and settled - if handshake_completed and (time.time() - address_validated_ts) > 0.1: + while True: + event: Optional[H3Event] = await client._http_event() + if event is None: + break + if isinstance(event, HeadersReceived) and event.stream_id == stream_id: + for name, value in event.headers: + if name == b":status": + try: + status_code = int(value.decode()) + except ValueError: + status_code = 0 + elif isinstance(event, DataReceived) and event.stream_id == stream_id: + body += event.data + if event.stream_ended: break - # Close connection - quic.close(error_code=0x0) - for datagram, addr in quic.datagrams_to_send(now=time.time()): - protocol.transport.sendto(datagram, addr) - await asyncio.sleep(0.05) - transport.close() + return status_code, body, early_data_accepted, session_resumed - except Exception as e: - error = str(e) - compliant = None - if address_validated_ts is None: - # If AV never happened, we still evaluate budget until timeout - if client_sent_before_av > 0: - compliant = server_recv_before_av <= 3 * client_sent_before_av # RFC 9000 3× rule[1] - else: - # Evaluate until AV moment - if client_sent_before_av > 0: - compliant = server_recv_before_av <= 3 * client_sent_before_av # RFC 9000 3× rule[1] +async def get_session_ticket(host: str, port: int, sni: Optional[str], store: InMemoryTicketStore): + """ + Erste 1-RTT-Verbindung aufbauen, um ein Session Ticket zu erhalten (ohne 0-RTT). + """ + configuration = QuicConfiguration(is_client=True, alpn_protocols=H3_ALPN) + configuration.server_name = sni or host - return FlowReport( - host=host, - port=port, - alpn=alpn, - used_retry=used_retry, - token_present=token_present, - address_validated_ts=address_validated_ts, - client_bytes_sent_before_av=client_sent_before_av, - server_bytes_recv_before_av=server_recv_before_av, - client_bytes_sent_total=client_sent_total, - server_bytes_recv_total=server_recv_total, - compliant_before_av=compliant, - error=error, - handshake_completed=handshake_completed, - protocol=proto, + def save_ticket_cb(ticket: SessionTicket): + store.save_ticket(ticket) + + configuration.session_ticket_handler = save_ticket_cb + + # Voll verbinden -> Ticket erhalten + status, body, early, resumed = await fetch_http3( + host, port, "/", "GET", None, sni, configuration, wait_connected=True ) + return status, body, early, resumed -def print_report(title: str, report: FlowReport): - print(f"=== {title} ===") - print(f"Target: {report.host}:{report.port}, ALPN={report.alpn}") - print(f"Protocol negotiated: {report.protocol}") - print(f"Retry observed: {report.used_retry}, Token provided: {report.token_present}") - print(f"Handshake completed: {report.handshake_completed}, AddressValidated: {report.address_validated_ts is not None}") - print(f"Client bytes sent before AV: {report.client_bytes_sent_before_av}") - print(f"Server bytes received before AV: {report.server_bytes_recv_before_av}") - budget = 3 * report.client_bytes_sent_before_av - print(f"Anti-Amplification budget (3× client-before-AV): {budget}") - if report.compliant_before_av is None: - print("Compliance before AV: Unknown (insufficient data)") - else: - status = "COMPLIANT" if report.compliant_before_av else "VIOLATION" - print(f"Compliance before AV: {status}") - print(f"Totals: client_sent={report.client_bytes_sent_total}, server_recv={report.server_bytes_recv_total}") - if report.error: - print(f"Error: {report.error}") - print() -async def main(): - parser = argparse.ArgumentParser(description="QUIC Anti-Amplification & Retry Probe") - parser.add_argument("--host", required=True) - parser.add_argument("--port", type=int, default=443) - parser.add_argument("--alpn", default="h3") - parser.add_argument("--sni", default=None) - parser.add_argument("--timeout", type=float, default=5.0) - parser.add_argument("--no-retry", action="store_true") - args = parser.parse_args() +async def test_0rtt( + host: str, + port: int = 443, + sni: Optional[str] = None, + idempotent_path: str = "/", + non_idempotent_path: str = "/unsafe-test", + non_idempotent_payload: Optional[bytes] = None, +): + """ + Ablauf: + 1) Initiale Verbindung (Ticket holen) + 2) Resumption mit 0-RTT: GET + 3) Resumption mit 0-RTT: POST (nicht-idempotent) + """ + ticket_store = InMemoryTicketStore() - # First attempt: no token - report1 = await quic_attempt( - host=args.host, - port=args.port, - alpn=args.alpn, - sni=args.sni, - timeout=args.timeout, - token=None, - ) - print_report("Attempt 1 (no token)", report1) + print(f"[1/3] Baue Initialverbindung auf, um Session Ticket zu erhalten ...") + s0, b0, early0, resumed0 = await get_session_ticket(host, port, sni, ticket_store) + print(f" Initial: status={s0}, early_data_accepted={early0}, session_resumed={resumed0}") - if report1.error: + ticket = ticket_store.get_ticket() + if not ticket: + print(" Kein Session Ticket erhalten – 0-RTT Resumption nicht möglich.") return - if args.no_retry or not report1.used_retry: - # Either server didn't send Retry or user skipped token attempt - return + configuration = QuicConfiguration(is_client=True, alpn_protocols=H3_ALPN) + configuration.server_name = sni or host + configuration.session_ticket = ticket # Ticket für Resumption/0‑RTT + # 0‑RTT Early-Data wird bei wait_connected=False versucht (wenn Server 0‑RTT erlaubt) - # If Retry observed, retrieve token from the connection state - # aioquic stores token in connection._token after Retry; we must run a second connection using it - # For simplicity, we reuse the token observed in first attempt if accessible; otherwise we try a second connect which should present the stored token via session ticket. - # Note: Access to private member is implementation-dependent; guarded with getattr. - retry_token: Optional[bytes] = None - try: - # Not ideal, but for probing we may access private attr - retry_token = getattr(report1, "retry_token", None) - except Exception: - retry_token = None - - # Second attempt: with token if we have one (many servers encode address validation in Retry, token echoed by client) - report2 = await quic_attempt( - host=args.host, - port=args.port, - alpn=args.alpn, - sni=args.sni, - timeout=args.timeout, - token=retry_token, + print(f"[2/3] Teste 0-RTT: idempotenter GET {idempotent_path}") + s1, b1, early1, resumed1 = await fetch_http3( + host, + port, + idempotent_path, + "GET", + None, + sni, + configuration, + wait_connected=False, ) - print_report("Attempt 2 (with token if available)", report2) + print(f" 0-RTT GET: status={s1}, early_data_accepted={early1}, session_resumed={resumed1}") + + if non_idempotent_payload is None: + non_idempotent_payload = json.dumps( + {"action": "create", "ts": asyncio.get_event_loop().time()} + ).encode() + + print(f"[3/3] Teste 0-RTT: nicht-idempotenter POST {non_idempotent_path}") + s2, b2, early2, resumed2 = await fetch_http3( + host, + port, + non_idempotent_path, + "POST", + non_idempotent_payload, + sni, + configuration, + wait_connected=False, + ) + print(f" 0-RTT POST: status={s2}, early_data_accepted={early2}, session_resumed={resumed2}") + + print("\nErgebnisse und Bewertung:") + if not early1 and not early2: + print("- 0‑RTT wurde abgelehnt oder nicht akzeptiert: konservativ, geringeres Replay-Risiko.[1]") + else: + if early1: + print("- 0‑RTT Early Data für GET akzeptiert: ok, sofern idempotent.[1][3]") + if early2: + print("- WARNUNG: 0‑RTT Early Data für POST akzeptiert – serverseitige Anti‑Replay‑Mitigationen prüfen (z.B. nur sichere Methoden, Anwendungs‑Anti‑Replay).[1][3]") + else: + print("- POST nicht als Early Data akzeptiert oder auf 1‑RTT verzögert – sicherer bzgl. Replay.[1][3]") + + print("\nHinweis:") + print("- QUIC/TLS 1.3: 0‑RTT ist replay‑anfällig; Anwendungen müssen mitigieren.[1][3]") + print("- HTTP/3 verlangt Anti‑Replay‑Mitigationen für 0‑RTT; nur idempotente Aktionen in Early Data.[1][3]") + if __name__ == "__main__": - asyncio.run(main()) + import argparse + + parser = argparse.ArgumentParser(description="QUIC/HTTP-3 0-RTT Tester (fixed SNI handling)") + parser.add_argument("host", help="Server hostname") + parser.add_argument("--port", type=int, default=443, help="Server port (default: 443)") + parser.add_argument("--sni", help="SNI override", default=None) + parser.add_argument("--get-path", default="/", help="Idempotenter GET-Pfad für 0-RTT") + parser.add_argument("--post-path", default="/unsafe-test", help="Nicht-idempotenter POST-Pfad für 0-RTT") + args = parser.parse_args() + + asyncio.run( + test_0rtt( + host=args.host, + port=args.port, + sni=args.sni, + idempotent_path=args.get_path, + non_idempotent_path=args.post_path, + ) + )