import asyncio import json from typing import Optional, Tuple from aioquic.asyncio.client import connect from aioquic.h3.connection import H3_ALPN from aioquic.h3.events import HeadersReceived, DataReceived, H3Event from aioquic.quic.configuration import QuicConfiguration from aioquic.tls import SessionTicket class InMemoryTicketStore: def __init__(self): self.ticket: Optional[SessionTicket] = None def save_ticket(self, ticket: SessionTicket): self.ticket = ticket def get_ticket(self) -> Optional[SessionTicket]: return self.ticket async def fetch_http3( host: str, port: int, path: str, method: str = "GET", data: Optional[bytes] = None, sni: Optional[str] = None, configuration: Optional[QuicConfiguration] = None, wait_connected: bool = True, ) -> Tuple[int, bytes, bool, bool]: """ Führt eine einzelne HTTP/3-Anfrage aus. Rückgabe: (status_code, body, early_data_accepted, session_resumed) """ assert method in ("GET", "POST"), "Nur GET/POST unterstützt" if configuration is None: configuration = QuicConfiguration(is_client=True, alpn_protocols=H3_ALPN) # SNI korrekt über die QuicConfiguration setzen (kein server_name-Arg für connect) if not configuration.server_name: configuration.server_name = sni or host async with connect( host, port, configuration=configuration, wait_connected=wait_connected, ) as client: http = client.http headers = [ (b":method", method.encode()), (b":scheme", b"https"), (b":authority", f"{host}:{port}".encode()), (b":path", path.encode()), (b"user-agent", b"quic-0rtt-tester/0.2"), ] stream_id = http.get_next_available_stream_id() http.send_headers(stream_id, headers, end_stream=(data is None)) if data is not None: http.send_data(stream_id, data, end_stream=True) await client.wait_pending() status_code = 0 body = b"" # Flags aus TLS/QUIC (Early-Data angenommen? Resumption?) early_data_accepted = bool(getattr(client._quic.tls, "early_data_accepted", False)) session_resumed = bool(getattr(client._quic.tls, "session_resumed", False)) while True: event: Optional[H3Event] = await client._http_event() if event is None: break if isinstance(event, HeadersReceived) and event.stream_id == stream_id: for name, value in event.headers: if name == b":status": try: status_code = int(value.decode()) except ValueError: status_code = 0 elif isinstance(event, DataReceived) and event.stream_id == stream_id: body += event.data if event.stream_ended: break return status_code, body, early_data_accepted, session_resumed async def get_session_ticket(host: str, port: int, sni: Optional[str], store: InMemoryTicketStore): """ Erste 1-RTT-Verbindung aufbauen, um ein Session Ticket zu erhalten (ohne 0-RTT). """ configuration = QuicConfiguration(is_client=True, alpn_protocols=H3_ALPN) configuration.server_name = sni or host def save_ticket_cb(ticket: SessionTicket): store.save_ticket(ticket) configuration.session_ticket_handler = save_ticket_cb # Voll verbinden -> Ticket erhalten status, body, early, resumed = await fetch_http3( host, port, "/", "GET", None, sni, configuration, wait_connected=True ) return status, body, early, resumed async def test_0rtt( host: str, port: int = 443, sni: Optional[str] = None, idempotent_path: str = "/", non_idempotent_path: str = "/unsafe-test", non_idempotent_payload: Optional[bytes] = None, ): """ Ablauf: 1) Initiale Verbindung (Ticket holen) 2) Resumption mit 0-RTT: GET 3) Resumption mit 0-RTT: POST (nicht-idempotent) """ ticket_store = InMemoryTicketStore() print(f"[1/3] Baue Initialverbindung auf, um Session Ticket zu erhalten ...") s0, b0, early0, resumed0 = await get_session_ticket(host, port, sni, ticket_store) print(f" Initial: status={s0}, early_data_accepted={early0}, session_resumed={resumed0}") ticket = ticket_store.get_ticket() if not ticket: print(" Kein Session Ticket erhalten – 0-RTT Resumption nicht möglich.") return configuration = QuicConfiguration(is_client=True, alpn_protocols=H3_ALPN) configuration.server_name = sni or host configuration.session_ticket = ticket # Ticket für Resumption/0‑RTT # 0‑RTT Early-Data wird bei wait_connected=False versucht (wenn Server 0‑RTT erlaubt) print(f"[2/3] Teste 0-RTT: idempotenter GET {idempotent_path}") s1, b1, early1, resumed1 = await fetch_http3( host, port, idempotent_path, "GET", None, sni, configuration, wait_connected=False, ) print(f" 0-RTT GET: status={s1}, early_data_accepted={early1}, session_resumed={resumed1}") if non_idempotent_payload is None: non_idempotent_payload = json.dumps( {"action": "create", "ts": asyncio.get_event_loop().time()} ).encode() print(f"[3/3] Teste 0-RTT: nicht-idempotenter POST {non_idempotent_path}") s2, b2, early2, resumed2 = await fetch_http3( host, port, non_idempotent_path, "POST", non_idempotent_payload, sni, configuration, wait_connected=False, ) print(f" 0-RTT POST: status={s2}, early_data_accepted={early2}, session_resumed={resumed2}") print("\nErgebnisse und Bewertung:") if not early1 and not early2: print("- 0‑RTT wurde abgelehnt oder nicht akzeptiert: konservativ, geringeres Replay-Risiko.[1]") else: if early1: print("- 0‑RTT Early Data für GET akzeptiert: ok, sofern idempotent.[1][3]") if early2: print("- WARNUNG: 0‑RTT Early Data für POST akzeptiert – serverseitige Anti‑Replay‑Mitigationen prüfen (z.B. nur sichere Methoden, Anwendungs‑Anti‑Replay).[1][3]") else: print("- POST nicht als Early Data akzeptiert oder auf 1‑RTT verzögert – sicherer bzgl. Replay.[1][3]") print("\nHinweis:") print("- QUIC/TLS 1.3: 0‑RTT ist replay‑anfällig; Anwendungen müssen mitigieren.[1][3]") print("- HTTP/3 verlangt Anti‑Replay‑Mitigationen für 0‑RTT; nur idempotente Aktionen in Early Data.[1][3]") if __name__ == "__main__": import argparse parser = argparse.ArgumentParser(description="QUIC/HTTP-3 0-RTT Tester (fixed SNI handling)") parser.add_argument("host", help="Server hostname") parser.add_argument("--port", type=int, default=443, help="Server port (default: 443)") parser.add_argument("--sni", help="SNI override", default=None) parser.add_argument("--get-path", default="/", help="Idempotenter GET-Pfad für 0-RTT") parser.add_argument("--post-path", default="/unsafe-test", help="Nicht-idempotenter POST-Pfad für 0-RTT") args = parser.parse_args() asyncio.run( test_0rtt( host=args.host, port=args.port, sni=args.sni, idempotent_path=args.get_path, non_idempotent_path=args.post_path, ) )