import argparse import asyncio import logging from datetime import datetime import requests from aioquic.asyncio import connect, QuicConnectionProtocol from aioquic.quic.configuration import QuicConfiguration from aioquic.quic.events import HandshakeCompleted import OpenSSL from cryptography import x509 from cryptography.hazmat.backends import default_backend # Helper-Funktionen zur Formatierung von Erweiterungen def format_san(san): try: dns_names = san.get_values_for_type(x509.DNSName) return ", ".join(dns_names) except Exception: return str(san) def format_key_usage(key_usage): fields = [] if key_usage.digital_signature: fields.append("digitalSignature") if key_usage.content_commitment: fields.append("nonRepudiation") if key_usage.key_encipherment: fields.append("keyEncipherment") if key_usage.data_encipherment: fields.append("dataEncipherment") if key_usage.key_agreement: fields.append("keyAgreement") if key_usage.key_cert_sign: fields.append("keyCertSign") if key_usage.crl_sign: fields.append("cRLSign") if key_usage.encipher_only: fields.append("encipherOnly") if key_usage.decipher_only: fields.append("decipherOnly") return ", ".join(fields) def format_extended_key_usage(ext_key_usage): usages = [] for oid in ext_key_usage: try: name = oid._name # Falls vorhanden except Exception: name = oid.dotted_string usages.append(name) return ", ".join(usages) def format_crl_distribution_points(crl_dp): urls = [] for dp in crl_dp: if dp.full_name: for gn in dp.full_name: try: urls.append(gn.value) except Exception: pass return ", ".join(urls) def format_authority_information_access(aia): lines = [] for ad in aia: try: method = ad.access_method._name except Exception: method = ad.access_method.dotted_string location = ad.access_location.value lines.append(f"{method}: {location}") return "; ".join(lines) def format_certificate_policies(cp): policies = [] for policy_info in cp: try: policy_name = policy_info.policy_identifier._name except Exception: policy_name = policy_info.policy_identifier.dotted_string if policy_info.policy_qualifiers: qualifiers = ", ".join(str(q) for q in policy_info.policy_qualifiers) policies.append(f"{policy_name} ({qualifiers})") else: policies.append(policy_name) return "; ".join(policies) def format_sct(sct_value): try: if hasattr(sct_value, "scts"): return f"{len(sct_value.scts)} SCTs" else: return str(sct_value) except Exception: return str(sct_value) # Helper-Funktion, die alle Detailinformationen eines Zertifikats formatiert zurückgibt. # Hier wird die Version um 1 erhöht, sodass v1, v2 und v3 als 1, 2 bzw. 3 angezeigt werden. def format_certificate_details(cert, label="Zertifikat"): details = [] details.append(f"========== {label} ==========") details.append("Subject:") for key, value in cert.get_subject().get_components(): details.append(f" {key.decode('utf-8')}: {value.decode('utf-8')}") details.append("Issuer:") for key, value in cert.get_issuer().get_components(): details.append(f" {key.decode('utf-8')}: {value.decode('utf-8')}") details.append(f"Serial Number: {cert.get_serial_number()}") # Hier wird der von OpenSSL zurückgegebene Wert (0-indexiert) um 1 erhöht. details.append(f"Version: {cert.get_version() + 1}") try: sig_algo = cert.get_signature_algorithm().decode("utf-8") details.append(f"Signaturalgorithmus: {sig_algo}") except Exception as e: details.append(f"Fehler beim Auslesen des Signaturalgorithmus: {e}") try: pubkey = cert.get_pubkey() key_type = pubkey.type() if key_type == OpenSSL.crypto.TYPE_RSA: key_algo = "RSA" elif key_type == OpenSSL.crypto.TYPE_DSA: key_algo = "DSA" elif hasattr(OpenSSL.crypto, "TYPE_EC") and key_type == OpenSSL.crypto.TYPE_EC: key_algo = "EC" elif key_type == OpenSSL.crypto.TYPE_DH: key_algo = "DH" else: key_algo = f"Unbekannt (Type: {key_type})" details.append(f"Public Key Algorithmus: {key_algo}") except Exception as e: details.append(f"Fehler beim Auslesen des Public Key Algorithmus: {e}") try: date_format = "%Y%m%d%H%M%SZ" not_before_str = cert.get_notBefore().decode('utf-8') not_after_str = cert.get_notAfter().decode('utf-8') not_before_dt = datetime.strptime(not_before_str, date_format) not_after_dt = datetime.strptime(not_after_str, date_format) details.append(f"Gültig von: {not_before_dt.strftime('%Y-%m-%d %H:%M:%S')}") details.append(f"Gültig bis: {not_after_dt.strftime('%Y-%m-%d %H:%M:%S')}") except Exception as e: details.append(f"Fehler beim Formatieren der Gültigkeitsdaten: {e}") try: cert_bytes_for_san = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_ASN1, cert) cert_crypto = x509.load_der_x509_certificate(cert_bytes_for_san, default_backend()) san_extension = cert_crypto.extensions.get_extension_for_class(x509.SubjectAlternativeName) details.append(f"Subject Alternative Names: {format_san(san_extension.value)}") except Exception as e: details.append(f"Keine SAN-Erweiterung gefunden: {e}") try: cert_bytes_for_ext = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_ASN1, cert) cert_crypto = x509.load_der_x509_certificate(cert_bytes_for_ext, default_backend()) try: key_usage_ext = cert_crypto.extensions.get_extension_for_class(x509.KeyUsage) details.append(f"Key Usage: {format_key_usage(key_usage_ext.value)}") except Exception as e: details.append(f"Keine Key Usage gefunden: {e}") try: ext_key_usage_ext = cert_crypto.extensions.get_extension_for_class(x509.ExtendedKeyUsage) details.append(f"Extended Key Usage: {format_extended_key_usage(ext_key_usage_ext.value)}") except Exception as e: details.append(f"Keine Extended Key Usage gefunden: {e}") try: crl_dp_ext = cert_crypto.extensions.get_extension_for_class(x509.CRLDistributionPoints) details.append(f"CRL Distribution Points: {format_crl_distribution_points(crl_dp_ext.value)}") except Exception as e: details.append(f"Keine CRL Distribution Points gefunden: {e}") try: aia_ext = cert_crypto.extensions.get_extension_for_class(x509.AuthorityInformationAccess) details.append(f"Authority Information Access: {format_authority_information_access(aia_ext.value)}") except Exception as e: details.append(f"Keine Authority Information Access gefunden: {e}") try: cp_ext = cert_crypto.extensions.get_extension_for_class(x509.CertificatePolicies) details.append(f"Certificate Policies: {format_certificate_policies(cp_ext.value)}") except Exception as e: details.append(f"Keine Certificate Policies gefunden: {e}") try: ski_ext = cert_crypto.extensions.get_extension_for_class(x509.SubjectKeyIdentifier) details.append(f"Subject Key Identifier: {ski_ext.value.digest.hex()}") except Exception as e: details.append(f"Kein Subject Key Identifier gefunden: {e}") try: aki_ext = cert_crypto.extensions.get_extension_for_class(x509.AuthorityKeyIdentifier) key_id = aki_ext.value.key_identifier if key_id: details.append(f"Authority Key Identifier: {key_id.hex()}") else: details.append("Authority Key Identifier: N/A") except Exception as e: details.append(f"Keine Authority Key Identifier gefunden: {e}") try: fingerprint = cert.digest("sha256") details.append(f"SHA256-Fingerprint: {fingerprint.decode()}") except Exception as e: details.append(f"Fehler beim Auslesen des Fingerprints: {e}") try: sct_ext = cert_crypto.extensions.get_extension_for_oid(x509.ObjectIdentifier("1.3.6.1.4.1.11129.2.4.2")) details.append(f"SCT (Certificate Transparency): {format_sct(sct_ext.value)}") except Exception as e: details.append(f"Keine SCT-Erweiterung gefunden: {e}") except Exception as e: details.append(f"Fehler beim Auslesen weiterer Zertifikatsinformationen: {e}") details.append(" ") details.append("=================================================================== ENDE ===================================================================") details.append(" ") return "\n".join(details) # Hilfsfunktion zum rekursiven Herunterladen der Aussteller-Kette (Intermediate + Root CA) def download_issuer_chain(cert_crypto): chain = [] current_cert = cert_crypto while True: try: aia_ext = current_cert.extensions.get_extension_for_class(x509.AuthorityInformationAccess) ca_issuers = [ad.access_location.value for ad in aia_ext.value if ad.access_method.dotted_string == "1.3.6.1.5.5.7.48.2"] if not ca_issuers: break issuer_url = ca_issuers[0] issuer_cert_data = requests.get(issuer_url, timeout=5).content issuer_cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_ASN1, issuer_cert_data) chain.append(issuer_cert) issuer_cert_bytes = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_ASN1, issuer_cert) issuer_cert_crypto = x509.load_der_x509_certificate(issuer_cert_bytes, default_backend()) # Wenn self-signed, haben wir den Root erreicht if issuer_cert_crypto.subject == issuer_cert_crypto.issuer: break current_cert = issuer_cert_crypto except Exception: break return chain class HTTP3Client(QuicConnectionProtocol): def quic_event_received(self, event): if isinstance(event, HandshakeCompleted): output_lines = [] output_lines.append("TLS-Handshake abgeschlossen!") output_lines.append("Zertifikatsinformationen:") # Versuche, Zertifikate aus dem Event zu holen certs = getattr(event, "peer_certificates", None) if certs is None: if hasattr(self._quic.tls, "peer_certificate"): cert = self._quic.tls.peer_certificate else: cert = getattr(self._quic.tls, "_peer_certificate", None) if cert: certs = [cert] provided_chain = [] if certs: for idx, cert_obj in enumerate(certs, start=1): try: if not isinstance(cert_obj, bytes): from cryptography.hazmat.primitives import serialization cert_bytes = cert_obj.public_bytes(encoding=serialization.Encoding.DER) else: cert_bytes = cert_obj cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_ASN1, cert_bytes) provided_chain.append(cert) output_lines.append(format_certificate_details(cert, label=f"Zertifikat {idx}")) except Exception as e: output_lines.append(f"Zertifikat {idx}: Fehler beim Laden des Zertifikats: {e}") # Rekursives Herunterladen der Aussteller-Kette, falls das letzte übermittelte Zertifikat nicht self-signed ist if provided_chain: last_cert_bytes = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_ASN1, provided_chain[-1]) last_cert_crypto = x509.load_der_x509_certificate(last_cert_bytes, default_backend()) if last_cert_crypto.subject != last_cert_crypto.issuer: chain_downloaded = download_issuer_chain(last_cert_crypto) for i, issuer_cert in enumerate(chain_downloaded): if i == len(chain_downloaded) - 1: label = "Root CA Zertifikat" else: label = "Intermediate Zertifikat" output_lines.append(format_certificate_details(issuer_cert, label=label)) final_output = "\n\n".join(output_lines) logging.info(final_output) async def run(host, port): configuration = QuicConfiguration(is_client=True, alpn_protocols=["h3"]) try: async with connect(host, port, configuration=configuration, create_protocol=HTTP3Client) as client: await asyncio.sleep(2) except Exception as e: logging.error("Fehler beim Verbindungsaufbau: %s", e) if __name__ == '__main__': parser = argparse.ArgumentParser(description="HTTP/3 Zertifikats-Info Tool") parser.add_argument("host", help="Hostname oder IP, zu dem verbunden werden soll") parser.add_argument("--port", type=int, default=443, help="Port, der verwendet werden soll (default: 443)") parser.add_argument("-o", "--output", help="Dateipfad, in den der Output geschrieben wird", default=None) args = parser.parse_args() if args.output: logging.basicConfig(filename=args.output, filemode="w", level=logging.INFO, format="%(message)s") else: logging.basicConfig(level=logging.INFO, format="%(message)s") asyncio.run(run(args.host, args.port))