Files
Praxisarbeit-1/Code/venv/test.py
mofixx e6afb9123e test
2025-08-08 13:39:29 +02:00

209 lines
7.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import asyncio
import json
from typing import Optional, Tuple
from aioquic.asyncio.client import connect
from aioquic.h3.connection import H3_ALPN
from aioquic.h3.events import HeadersReceived, DataReceived, H3Event
from aioquic.quic.configuration import QuicConfiguration
from aioquic.tls import SessionTicket
class InMemoryTicketStore:
def __init__(self):
self.ticket: Optional[SessionTicket] = None
def save_ticket(self, ticket: SessionTicket):
self.ticket = ticket
def get_ticket(self) -> Optional[SessionTicket]:
return self.ticket
async def fetch_http3(
host: str,
port: int,
path: str,
method: str = "GET",
data: Optional[bytes] = None,
sni: Optional[str] = None,
configuration: Optional[QuicConfiguration] = None,
wait_connected: bool = True,
) -> Tuple[int, bytes, bool, bool]:
"""
Führt eine einzelne HTTP/3-Anfrage aus.
Rückgabe: (status_code, body, early_data_accepted, session_resumed)
"""
assert method in ("GET", "POST"), "Nur GET/POST unterstützt"
if configuration is None:
configuration = QuicConfiguration(is_client=True, alpn_protocols=H3_ALPN)
# SNI korrekt über die QuicConfiguration setzen (kein server_name-Arg für connect)
if not configuration.server_name:
configuration.server_name = sni or host
async with connect(
host,
port,
configuration=configuration,
wait_connected=wait_connected,
) as client:
http = client.http
headers = [
(b":method", method.encode()),
(b":scheme", b"https"),
(b":authority", f"{host}:{port}".encode()),
(b":path", path.encode()),
(b"user-agent", b"quic-0rtt-tester/0.2"),
]
stream_id = http.get_next_available_stream_id()
http.send_headers(stream_id, headers, end_stream=(data is None))
if data is not None:
http.send_data(stream_id, data, end_stream=True)
await client.wait_pending()
status_code = 0
body = b""
# Flags aus TLS/QUIC (Early-Data angenommen? Resumption?)
early_data_accepted = bool(getattr(client._quic.tls, "early_data_accepted", False))
session_resumed = bool(getattr(client._quic.tls, "session_resumed", False))
while True:
event: Optional[H3Event] = await client._http_event()
if event is None:
break
if isinstance(event, HeadersReceived) and event.stream_id == stream_id:
for name, value in event.headers:
if name == b":status":
try:
status_code = int(value.decode())
except ValueError:
status_code = 0
elif isinstance(event, DataReceived) and event.stream_id == stream_id:
body += event.data
if event.stream_ended:
break
return status_code, body, early_data_accepted, session_resumed
async def get_session_ticket(host: str, port: int, sni: Optional[str], store: InMemoryTicketStore):
"""
Erste 1-RTT-Verbindung aufbauen, um ein Session Ticket zu erhalten (ohne 0-RTT).
"""
configuration = QuicConfiguration(is_client=True, alpn_protocols=H3_ALPN)
configuration.server_name = sni or host
def save_ticket_cb(ticket: SessionTicket):
store.save_ticket(ticket)
configuration.session_ticket_handler = save_ticket_cb
# Voll verbinden -> Ticket erhalten
status, body, early, resumed = await fetch_http3(
host, port, "/", "GET", None, sni, configuration, wait_connected=True
)
return status, body, early, resumed
async def test_0rtt(
host: str,
port: int = 443,
sni: Optional[str] = None,
idempotent_path: str = "/",
non_idempotent_path: str = "/unsafe-test",
non_idempotent_payload: Optional[bytes] = None,
):
"""
Ablauf:
1) Initiale Verbindung (Ticket holen)
2) Resumption mit 0-RTT: GET
3) Resumption mit 0-RTT: POST (nicht-idempotent)
"""
ticket_store = InMemoryTicketStore()
print(f"[1/3] Baue Initialverbindung auf, um Session Ticket zu erhalten ...")
s0, b0, early0, resumed0 = await get_session_ticket(host, port, sni, ticket_store)
print(f" Initial: status={s0}, early_data_accepted={early0}, session_resumed={resumed0}")
ticket = ticket_store.get_ticket()
if not ticket:
print(" Kein Session Ticket erhalten 0-RTT Resumption nicht möglich.")
return
configuration = QuicConfiguration(is_client=True, alpn_protocols=H3_ALPN)
configuration.server_name = sni or host
configuration.session_ticket = ticket # Ticket für Resumption/0RTT
# 0RTT Early-Data wird bei wait_connected=False versucht (wenn Server 0RTT erlaubt)
print(f"[2/3] Teste 0-RTT: idempotenter GET {idempotent_path}")
s1, b1, early1, resumed1 = await fetch_http3(
host,
port,
idempotent_path,
"GET",
None,
sni,
configuration,
wait_connected=False,
)
print(f" 0-RTT GET: status={s1}, early_data_accepted={early1}, session_resumed={resumed1}")
if non_idempotent_payload is None:
non_idempotent_payload = json.dumps(
{"action": "create", "ts": asyncio.get_event_loop().time()}
).encode()
print(f"[3/3] Teste 0-RTT: nicht-idempotenter POST {non_idempotent_path}")
s2, b2, early2, resumed2 = await fetch_http3(
host,
port,
non_idempotent_path,
"POST",
non_idempotent_payload,
sni,
configuration,
wait_connected=False,
)
print(f" 0-RTT POST: status={s2}, early_data_accepted={early2}, session_resumed={resumed2}")
print("\nErgebnisse und Bewertung:")
if not early1 and not early2:
print("- 0RTT wurde abgelehnt oder nicht akzeptiert: konservativ, geringeres Replay-Risiko.[1]")
else:
if early1:
print("- 0RTT Early Data für GET akzeptiert: ok, sofern idempotent.[1][3]")
if early2:
print("- WARNUNG: 0RTT Early Data für POST akzeptiert serverseitige AntiReplayMitigationen prüfen (z.B. nur sichere Methoden, AnwendungsAntiReplay).[1][3]")
else:
print("- POST nicht als Early Data akzeptiert oder auf 1RTT verzögert sicherer bzgl. Replay.[1][3]")
print("\nHinweis:")
print("- QUIC/TLS 1.3: 0RTT ist replayanfällig; Anwendungen müssen mitigieren.[1][3]")
print("- HTTP/3 verlangt AntiReplayMitigationen für 0RTT; nur idempotente Aktionen in Early Data.[1][3]")
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="QUIC/HTTP-3 0-RTT Tester (fixed SNI handling)")
parser.add_argument("host", help="Server hostname")
parser.add_argument("--port", type=int, default=443, help="Server port (default: 443)")
parser.add_argument("--sni", help="SNI override", default=None)
parser.add_argument("--get-path", default="/", help="Idempotenter GET-Pfad für 0-RTT")
parser.add_argument("--post-path", default="/unsafe-test", help="Nicht-idempotenter POST-Pfad für 0-RTT")
args = parser.parse_args()
asyncio.run(
test_0rtt(
host=args.host,
port=args.port,
sni=args.sni,
idempotent_path=args.get_path,
non_idempotent_path=args.post_path,
)
)