209 lines
7.4 KiB
Python
209 lines
7.4 KiB
Python
import asyncio
|
||
import json
|
||
from typing import Optional, Tuple
|
||
|
||
from aioquic.asyncio.client import connect
|
||
from aioquic.h3.connection import H3_ALPN
|
||
from aioquic.h3.events import HeadersReceived, DataReceived, H3Event
|
||
from aioquic.quic.configuration import QuicConfiguration
|
||
from aioquic.tls import SessionTicket
|
||
|
||
|
||
class InMemoryTicketStore:
|
||
def __init__(self):
|
||
self.ticket: Optional[SessionTicket] = None
|
||
|
||
def save_ticket(self, ticket: SessionTicket):
|
||
self.ticket = ticket
|
||
|
||
def get_ticket(self) -> Optional[SessionTicket]:
|
||
return self.ticket
|
||
|
||
|
||
async def fetch_http3(
|
||
host: str,
|
||
port: int,
|
||
path: str,
|
||
method: str = "GET",
|
||
data: Optional[bytes] = None,
|
||
sni: Optional[str] = None,
|
||
configuration: Optional[QuicConfiguration] = None,
|
||
wait_connected: bool = True,
|
||
) -> Tuple[int, bytes, bool, bool]:
|
||
"""
|
||
Führt eine einzelne HTTP/3-Anfrage aus.
|
||
Rückgabe: (status_code, body, early_data_accepted, session_resumed)
|
||
"""
|
||
assert method in ("GET", "POST"), "Nur GET/POST unterstützt"
|
||
|
||
if configuration is None:
|
||
configuration = QuicConfiguration(is_client=True, alpn_protocols=H3_ALPN)
|
||
|
||
# SNI korrekt über die QuicConfiguration setzen (kein server_name-Arg für connect)
|
||
if not configuration.server_name:
|
||
configuration.server_name = sni or host
|
||
|
||
async with connect(
|
||
host,
|
||
port,
|
||
configuration=configuration,
|
||
wait_connected=wait_connected,
|
||
) as client:
|
||
http = client.http
|
||
|
||
headers = [
|
||
(b":method", method.encode()),
|
||
(b":scheme", b"https"),
|
||
(b":authority", f"{host}:{port}".encode()),
|
||
(b":path", path.encode()),
|
||
(b"user-agent", b"quic-0rtt-tester/0.2"),
|
||
]
|
||
stream_id = http.get_next_available_stream_id()
|
||
http.send_headers(stream_id, headers, end_stream=(data is None))
|
||
if data is not None:
|
||
http.send_data(stream_id, data, end_stream=True)
|
||
|
||
await client.wait_pending()
|
||
|
||
status_code = 0
|
||
body = b""
|
||
# Flags aus TLS/QUIC (Early-Data angenommen? Resumption?)
|
||
early_data_accepted = bool(getattr(client._quic.tls, "early_data_accepted", False))
|
||
session_resumed = bool(getattr(client._quic.tls, "session_resumed", False))
|
||
|
||
while True:
|
||
event: Optional[H3Event] = await client._http_event()
|
||
if event is None:
|
||
break
|
||
if isinstance(event, HeadersReceived) and event.stream_id == stream_id:
|
||
for name, value in event.headers:
|
||
if name == b":status":
|
||
try:
|
||
status_code = int(value.decode())
|
||
except ValueError:
|
||
status_code = 0
|
||
elif isinstance(event, DataReceived) and event.stream_id == stream_id:
|
||
body += event.data
|
||
if event.stream_ended:
|
||
break
|
||
|
||
return status_code, body, early_data_accepted, session_resumed
|
||
|
||
|
||
async def get_session_ticket(host: str, port: int, sni: Optional[str], store: InMemoryTicketStore):
|
||
"""
|
||
Erste 1-RTT-Verbindung aufbauen, um ein Session Ticket zu erhalten (ohne 0-RTT).
|
||
"""
|
||
configuration = QuicConfiguration(is_client=True, alpn_protocols=H3_ALPN)
|
||
configuration.server_name = sni or host
|
||
|
||
def save_ticket_cb(ticket: SessionTicket):
|
||
store.save_ticket(ticket)
|
||
|
||
configuration.session_ticket_handler = save_ticket_cb
|
||
|
||
# Voll verbinden -> Ticket erhalten
|
||
status, body, early, resumed = await fetch_http3(
|
||
host, port, "/", "GET", None, sni, configuration, wait_connected=True
|
||
)
|
||
return status, body, early, resumed
|
||
|
||
|
||
async def test_0rtt(
|
||
host: str,
|
||
port: int = 443,
|
||
sni: Optional[str] = None,
|
||
idempotent_path: str = "/",
|
||
non_idempotent_path: str = "/unsafe-test",
|
||
non_idempotent_payload: Optional[bytes] = None,
|
||
):
|
||
"""
|
||
Ablauf:
|
||
1) Initiale Verbindung (Ticket holen)
|
||
2) Resumption mit 0-RTT: GET
|
||
3) Resumption mit 0-RTT: POST (nicht-idempotent)
|
||
"""
|
||
ticket_store = InMemoryTicketStore()
|
||
|
||
print(f"[1/3] Baue Initialverbindung auf, um Session Ticket zu erhalten ...")
|
||
s0, b0, early0, resumed0 = await get_session_ticket(host, port, sni, ticket_store)
|
||
print(f" Initial: status={s0}, early_data_accepted={early0}, session_resumed={resumed0}")
|
||
|
||
ticket = ticket_store.get_ticket()
|
||
if not ticket:
|
||
print(" Kein Session Ticket erhalten – 0-RTT Resumption nicht möglich.")
|
||
return
|
||
|
||
configuration = QuicConfiguration(is_client=True, alpn_protocols=H3_ALPN)
|
||
configuration.server_name = sni or host
|
||
configuration.session_ticket = ticket # Ticket für Resumption/0‑RTT
|
||
# 0‑RTT Early-Data wird bei wait_connected=False versucht (wenn Server 0‑RTT erlaubt)
|
||
|
||
print(f"[2/3] Teste 0-RTT: idempotenter GET {idempotent_path}")
|
||
s1, b1, early1, resumed1 = await fetch_http3(
|
||
host,
|
||
port,
|
||
idempotent_path,
|
||
"GET",
|
||
None,
|
||
sni,
|
||
configuration,
|
||
wait_connected=False,
|
||
)
|
||
print(f" 0-RTT GET: status={s1}, early_data_accepted={early1}, session_resumed={resumed1}")
|
||
|
||
if non_idempotent_payload is None:
|
||
non_idempotent_payload = json.dumps(
|
||
{"action": "create", "ts": asyncio.get_event_loop().time()}
|
||
).encode()
|
||
|
||
print(f"[3/3] Teste 0-RTT: nicht-idempotenter POST {non_idempotent_path}")
|
||
s2, b2, early2, resumed2 = await fetch_http3(
|
||
host,
|
||
port,
|
||
non_idempotent_path,
|
||
"POST",
|
||
non_idempotent_payload,
|
||
sni,
|
||
configuration,
|
||
wait_connected=False,
|
||
)
|
||
print(f" 0-RTT POST: status={s2}, early_data_accepted={early2}, session_resumed={resumed2}")
|
||
|
||
print("\nErgebnisse und Bewertung:")
|
||
if not early1 and not early2:
|
||
print("- 0‑RTT wurde abgelehnt oder nicht akzeptiert: konservativ, geringeres Replay-Risiko.[1]")
|
||
else:
|
||
if early1:
|
||
print("- 0‑RTT Early Data für GET akzeptiert: ok, sofern idempotent.[1][3]")
|
||
if early2:
|
||
print("- WARNUNG: 0‑RTT Early Data für POST akzeptiert – serverseitige Anti‑Replay‑Mitigationen prüfen (z.B. nur sichere Methoden, Anwendungs‑Anti‑Replay).[1][3]")
|
||
else:
|
||
print("- POST nicht als Early Data akzeptiert oder auf 1‑RTT verzögert – sicherer bzgl. Replay.[1][3]")
|
||
|
||
print("\nHinweis:")
|
||
print("- QUIC/TLS 1.3: 0‑RTT ist replay‑anfällig; Anwendungen müssen mitigieren.[1][3]")
|
||
print("- HTTP/3 verlangt Anti‑Replay‑Mitigationen für 0‑RTT; nur idempotente Aktionen in Early Data.[1][3]")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
import argparse
|
||
|
||
parser = argparse.ArgumentParser(description="QUIC/HTTP-3 0-RTT Tester (fixed SNI handling)")
|
||
parser.add_argument("host", help="Server hostname")
|
||
parser.add_argument("--port", type=int, default=443, help="Server port (default: 443)")
|
||
parser.add_argument("--sni", help="SNI override", default=None)
|
||
parser.add_argument("--get-path", default="/", help="Idempotenter GET-Pfad für 0-RTT")
|
||
parser.add_argument("--post-path", default="/unsafe-test", help="Nicht-idempotenter POST-Pfad für 0-RTT")
|
||
args = parser.parse_args()
|
||
|
||
asyncio.run(
|
||
test_0rtt(
|
||
host=args.host,
|
||
port=args.port,
|
||
sni=args.sni,
|
||
idempotent_path=args.get_path,
|
||
non_idempotent_path=args.post_path,
|
||
)
|
||
)
|