318 lines
14 KiB
Python
318 lines
14 KiB
Python
import argparse
|
|
import asyncio
|
|
import logging
|
|
from datetime import datetime
|
|
import requests
|
|
from aioquic.asyncio import connect, QuicConnectionProtocol
|
|
from aioquic.quic.configuration import QuicConfiguration
|
|
from aioquic.quic.events import HandshakeCompleted
|
|
import OpenSSL
|
|
from cryptography import x509
|
|
from cryptography.hazmat.backends import default_backend
|
|
|
|
# Helper-Funktionen zur Formatierung von Erweiterungen
|
|
def format_san(san):
|
|
try:
|
|
dns_names = san.get_values_for_type(x509.DNSName)
|
|
return ", ".join(dns_names)
|
|
except Exception:
|
|
return str(san)
|
|
|
|
def format_key_usage(key_usage):
|
|
fields = []
|
|
if key_usage.digital_signature:
|
|
fields.append("digitalSignature")
|
|
if key_usage.content_commitment:
|
|
fields.append("nonRepudiation")
|
|
if key_usage.key_encipherment:
|
|
fields.append("keyEncipherment")
|
|
if key_usage.data_encipherment:
|
|
fields.append("dataEncipherment")
|
|
if key_usage.key_agreement:
|
|
fields.append("keyAgreement")
|
|
if key_usage.key_cert_sign:
|
|
fields.append("keyCertSign")
|
|
if key_usage.crl_sign:
|
|
fields.append("cRLSign")
|
|
if key_usage.encipher_only:
|
|
fields.append("encipherOnly")
|
|
if key_usage.decipher_only:
|
|
fields.append("decipherOnly")
|
|
return ", ".join(fields)
|
|
|
|
def format_extended_key_usage(ext_key_usage):
|
|
usages = []
|
|
for oid in ext_key_usage:
|
|
try:
|
|
name = oid._name # Falls vorhanden
|
|
except Exception:
|
|
name = oid.dotted_string
|
|
usages.append(name)
|
|
return ", ".join(usages)
|
|
|
|
def format_crl_distribution_points(crl_dp):
|
|
urls = []
|
|
for dp in crl_dp:
|
|
if dp.full_name:
|
|
for gn in dp.full_name:
|
|
try:
|
|
urls.append(gn.value)
|
|
except Exception:
|
|
pass
|
|
return ", ".join(urls)
|
|
|
|
def format_authority_information_access(aia):
|
|
lines = []
|
|
for ad in aia:
|
|
try:
|
|
method = ad.access_method._name
|
|
except Exception:
|
|
method = ad.access_method.dotted_string
|
|
location = ad.access_location.value
|
|
lines.append(f"{method}: {location}")
|
|
return "; ".join(lines)
|
|
|
|
def format_certificate_policies(cp):
|
|
policies = []
|
|
for policy_info in cp:
|
|
try:
|
|
policy_name = policy_info.policy_identifier._name
|
|
except Exception:
|
|
policy_name = policy_info.policy_identifier.dotted_string
|
|
if policy_info.policy_qualifiers:
|
|
qualifiers = ", ".join(str(q) for q in policy_info.policy_qualifiers)
|
|
policies.append(f"{policy_name} ({qualifiers})")
|
|
else:
|
|
policies.append(policy_name)
|
|
return "; ".join(policies)
|
|
|
|
def format_sct(sct_value):
|
|
try:
|
|
if hasattr(sct_value, "scts"):
|
|
return f"{len(sct_value.scts)} SCTs"
|
|
else:
|
|
return str(sct_value)
|
|
except Exception:
|
|
return str(sct_value)
|
|
|
|
# Helper-Funktion, die alle Detailinformationen eines Zertifikats formatiert zurückgibt.
|
|
# Hier wird die Version um 1 erhöht, sodass v1, v2 und v3 als 1, 2 bzw. 3 angezeigt werden.
|
|
def format_certificate_details(cert, label="Zertifikat"):
|
|
details = []
|
|
details.append(f"========== {label} ==========")
|
|
|
|
details.append("Subject:")
|
|
for key, value in cert.get_subject().get_components():
|
|
details.append(f" {key.decode('utf-8')}: {value.decode('utf-8')}")
|
|
details.append("Issuer:")
|
|
for key, value in cert.get_issuer().get_components():
|
|
details.append(f" {key.decode('utf-8')}: {value.decode('utf-8')}")
|
|
|
|
details.append(f"Serial Number: {cert.get_serial_number()}")
|
|
# Hier wird der von OpenSSL zurückgegebene Wert (0-indexiert) um 1 erhöht.
|
|
details.append(f"Version: {cert.get_version() + 1}")
|
|
|
|
try:
|
|
sig_algo = cert.get_signature_algorithm().decode("utf-8")
|
|
details.append(f"Signaturalgorithmus: {sig_algo}")
|
|
except Exception as e:
|
|
details.append(f"Fehler beim Auslesen des Signaturalgorithmus: {e}")
|
|
|
|
try:
|
|
pubkey = cert.get_pubkey()
|
|
key_type = pubkey.type()
|
|
if key_type == OpenSSL.crypto.TYPE_RSA:
|
|
key_algo = "RSA"
|
|
elif key_type == OpenSSL.crypto.TYPE_DSA:
|
|
key_algo = "DSA"
|
|
elif hasattr(OpenSSL.crypto, "TYPE_EC") and key_type == OpenSSL.crypto.TYPE_EC:
|
|
key_algo = "EC"
|
|
elif key_type == OpenSSL.crypto.TYPE_DH:
|
|
key_algo = "DH"
|
|
else:
|
|
key_algo = f"Unbekannt (Type: {key_type})"
|
|
details.append(f"Public Key Algorithmus: {key_algo}")
|
|
except Exception as e:
|
|
details.append(f"Fehler beim Auslesen des Public Key Algorithmus: {e}")
|
|
|
|
try:
|
|
date_format = "%Y%m%d%H%M%SZ"
|
|
not_before_str = cert.get_notBefore().decode('utf-8')
|
|
not_after_str = cert.get_notAfter().decode('utf-8')
|
|
not_before_dt = datetime.strptime(not_before_str, date_format)
|
|
not_after_dt = datetime.strptime(not_after_str, date_format)
|
|
details.append(f"Gültig von: {not_before_dt.strftime('%Y-%m-%d %H:%M:%S')}")
|
|
details.append(f"Gültig bis: {not_after_dt.strftime('%Y-%m-%d %H:%M:%S')}")
|
|
except Exception as e:
|
|
details.append(f"Fehler beim Formatieren der Gültigkeitsdaten: {e}")
|
|
|
|
try:
|
|
cert_bytes_for_san = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_ASN1, cert)
|
|
cert_crypto = x509.load_der_x509_certificate(cert_bytes_for_san, default_backend())
|
|
san_extension = cert_crypto.extensions.get_extension_for_class(x509.SubjectAlternativeName)
|
|
details.append(f"Subject Alternative Names: {format_san(san_extension.value)}")
|
|
except Exception as e:
|
|
details.append(f"Keine SAN-Erweiterung gefunden: {e}")
|
|
|
|
try:
|
|
cert_bytes_for_ext = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_ASN1, cert)
|
|
cert_crypto = x509.load_der_x509_certificate(cert_bytes_for_ext, default_backend())
|
|
|
|
try:
|
|
key_usage_ext = cert_crypto.extensions.get_extension_for_class(x509.KeyUsage)
|
|
details.append(f"Key Usage: {format_key_usage(key_usage_ext.value)}")
|
|
except Exception as e:
|
|
details.append(f"Keine Key Usage gefunden: {e}")
|
|
|
|
try:
|
|
ext_key_usage_ext = cert_crypto.extensions.get_extension_for_class(x509.ExtendedKeyUsage)
|
|
details.append(f"Extended Key Usage: {format_extended_key_usage(ext_key_usage_ext.value)}")
|
|
except Exception as e:
|
|
details.append(f"Keine Extended Key Usage gefunden: {e}")
|
|
|
|
try:
|
|
crl_dp_ext = cert_crypto.extensions.get_extension_for_class(x509.CRLDistributionPoints)
|
|
details.append(f"CRL Distribution Points: {format_crl_distribution_points(crl_dp_ext.value)}")
|
|
except Exception as e:
|
|
details.append(f"Keine CRL Distribution Points gefunden: {e}")
|
|
|
|
try:
|
|
aia_ext = cert_crypto.extensions.get_extension_for_class(x509.AuthorityInformationAccess)
|
|
details.append(f"Authority Information Access: {format_authority_information_access(aia_ext.value)}")
|
|
except Exception as e:
|
|
details.append(f"Keine Authority Information Access gefunden: {e}")
|
|
|
|
try:
|
|
cp_ext = cert_crypto.extensions.get_extension_for_class(x509.CertificatePolicies)
|
|
details.append(f"Certificate Policies: {format_certificate_policies(cp_ext.value)}")
|
|
except Exception as e:
|
|
details.append(f"Keine Certificate Policies gefunden: {e}")
|
|
|
|
try:
|
|
ski_ext = cert_crypto.extensions.get_extension_for_class(x509.SubjectKeyIdentifier)
|
|
details.append(f"Subject Key Identifier: {ski_ext.value.digest.hex()}")
|
|
except Exception as e:
|
|
details.append(f"Kein Subject Key Identifier gefunden: {e}")
|
|
|
|
try:
|
|
aki_ext = cert_crypto.extensions.get_extension_for_class(x509.AuthorityKeyIdentifier)
|
|
key_id = aki_ext.value.key_identifier
|
|
if key_id:
|
|
details.append(f"Authority Key Identifier: {key_id.hex()}")
|
|
else:
|
|
details.append("Authority Key Identifier: N/A")
|
|
except Exception as e:
|
|
details.append(f"Keine Authority Key Identifier gefunden: {e}")
|
|
|
|
try:
|
|
fingerprint = cert.digest("sha256")
|
|
details.append(f"SHA256-Fingerprint: {fingerprint.decode()}")
|
|
except Exception as e:
|
|
details.append(f"Fehler beim Auslesen des Fingerprints: {e}")
|
|
|
|
try:
|
|
sct_ext = cert_crypto.extensions.get_extension_for_oid(x509.ObjectIdentifier("1.3.6.1.4.1.11129.2.4.2"))
|
|
details.append(f"SCT (Certificate Transparency): {format_sct(sct_ext.value)}")
|
|
except Exception as e:
|
|
details.append(f"Keine SCT-Erweiterung gefunden: {e}")
|
|
except Exception as e:
|
|
details.append(f"Fehler beim Auslesen weiterer Zertifikatsinformationen: {e}")
|
|
details.append(" ")
|
|
details.append("=================================================================== ENDE ===================================================================")
|
|
details.append(" ")
|
|
|
|
return "\n".join(details)
|
|
|
|
# Hilfsfunktion zum rekursiven Herunterladen der Aussteller-Kette (Intermediate + Root CA)
|
|
def download_issuer_chain(cert_crypto):
|
|
chain = []
|
|
current_cert = cert_crypto
|
|
while True:
|
|
try:
|
|
aia_ext = current_cert.extensions.get_extension_for_class(x509.AuthorityInformationAccess)
|
|
ca_issuers = [ad.access_location.value for ad in aia_ext.value if ad.access_method.dotted_string == "1.3.6.1.5.5.7.48.2"]
|
|
if not ca_issuers:
|
|
break
|
|
issuer_url = ca_issuers[0]
|
|
issuer_cert_data = requests.get(issuer_url, timeout=5).content
|
|
issuer_cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_ASN1, issuer_cert_data)
|
|
chain.append(issuer_cert)
|
|
issuer_cert_bytes = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_ASN1, issuer_cert)
|
|
issuer_cert_crypto = x509.load_der_x509_certificate(issuer_cert_bytes, default_backend())
|
|
# Wenn self-signed, haben wir den Root erreicht
|
|
if issuer_cert_crypto.subject == issuer_cert_crypto.issuer:
|
|
break
|
|
current_cert = issuer_cert_crypto
|
|
except Exception:
|
|
break
|
|
return chain
|
|
|
|
class HTTP3Client(QuicConnectionProtocol):
|
|
def quic_event_received(self, event):
|
|
if isinstance(event, HandshakeCompleted):
|
|
output_lines = []
|
|
output_lines.append("TLS-Handshake abgeschlossen!")
|
|
output_lines.append("Zertifikatsinformationen:")
|
|
|
|
# Versuche, Zertifikate aus dem Event zu holen
|
|
certs = getattr(event, "peer_certificates", None)
|
|
if certs is None:
|
|
if hasattr(self._quic.tls, "peer_certificate"):
|
|
cert = self._quic.tls.peer_certificate
|
|
else:
|
|
cert = getattr(self._quic.tls, "_peer_certificate", None)
|
|
if cert:
|
|
certs = [cert]
|
|
|
|
provided_chain = []
|
|
if certs:
|
|
for idx, cert_obj in enumerate(certs, start=1):
|
|
try:
|
|
if not isinstance(cert_obj, bytes):
|
|
from cryptography.hazmat.primitives import serialization
|
|
cert_bytes = cert_obj.public_bytes(encoding=serialization.Encoding.DER)
|
|
else:
|
|
cert_bytes = cert_obj
|
|
cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_ASN1, cert_bytes)
|
|
provided_chain.append(cert)
|
|
output_lines.append(format_certificate_details(cert, label=f"Zertifikat {idx}"))
|
|
except Exception as e:
|
|
output_lines.append(f"Zertifikat {idx}: Fehler beim Laden des Zertifikats: {e}")
|
|
|
|
# Rekursives Herunterladen der Aussteller-Kette, falls das letzte übermittelte Zertifikat nicht self-signed ist
|
|
if provided_chain:
|
|
last_cert_bytes = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_ASN1, provided_chain[-1])
|
|
last_cert_crypto = x509.load_der_x509_certificate(last_cert_bytes, default_backend())
|
|
if last_cert_crypto.subject != last_cert_crypto.issuer:
|
|
chain_downloaded = download_issuer_chain(last_cert_crypto)
|
|
for i, issuer_cert in enumerate(chain_downloaded):
|
|
if i == len(chain_downloaded) - 1:
|
|
label = "Root CA Zertifikat"
|
|
else:
|
|
label = "Intermediate Zertifikat"
|
|
output_lines.append(format_certificate_details(issuer_cert, label=label))
|
|
|
|
final_output = "\n\n".join(output_lines)
|
|
logging.info(final_output)
|
|
|
|
async def run(host, port):
|
|
configuration = QuicConfiguration(is_client=True, alpn_protocols=["h3"])
|
|
try:
|
|
async with connect(host, port, configuration=configuration, create_protocol=HTTP3Client) as client:
|
|
await asyncio.sleep(2)
|
|
except Exception as e:
|
|
logging.error("Fehler beim Verbindungsaufbau: %s", e)
|
|
|
|
if __name__ == '__main__':
|
|
parser = argparse.ArgumentParser(description="HTTP/3 Zertifikats-Info Tool")
|
|
parser.add_argument("host", help="Hostname oder IP, zu dem verbunden werden soll")
|
|
parser.add_argument("--port", type=int, default=443, help="Port, der verwendet werden soll (default: 443)")
|
|
parser.add_argument("-o", "--output", help="Dateipfad, in den der Output geschrieben wird", default=None)
|
|
args = parser.parse_args()
|
|
|
|
if args.output:
|
|
logging.basicConfig(filename=args.output, filemode="w", level=logging.INFO, format="%(message)s")
|
|
else:
|
|
logging.basicConfig(level=logging.INFO, format="%(message)s")
|
|
|
|
asyncio.run(run(args.host, args.port)) |