Code
This commit is contained in:
318
Code/venv/cert.py
Normal file
318
Code/venv/cert.py
Normal file
@ -0,0 +1,318 @@
|
||||
import argparse
|
||||
import asyncio
|
||||
import logging
|
||||
from datetime import datetime
|
||||
import requests
|
||||
from aioquic.asyncio import connect, QuicConnectionProtocol
|
||||
from aioquic.quic.configuration import QuicConfiguration
|
||||
from aioquic.quic.events import HandshakeCompleted
|
||||
import OpenSSL
|
||||
from cryptography import x509
|
||||
from cryptography.hazmat.backends import default_backend
|
||||
|
||||
# Helper-Funktionen zur Formatierung von Erweiterungen
|
||||
def format_san(san):
|
||||
try:
|
||||
dns_names = san.get_values_for_type(x509.DNSName)
|
||||
return ", ".join(dns_names)
|
||||
except Exception:
|
||||
return str(san)
|
||||
|
||||
def format_key_usage(key_usage):
|
||||
fields = []
|
||||
if key_usage.digital_signature:
|
||||
fields.append("digitalSignature")
|
||||
if key_usage.content_commitment:
|
||||
fields.append("nonRepudiation")
|
||||
if key_usage.key_encipherment:
|
||||
fields.append("keyEncipherment")
|
||||
if key_usage.data_encipherment:
|
||||
fields.append("dataEncipherment")
|
||||
if key_usage.key_agreement:
|
||||
fields.append("keyAgreement")
|
||||
if key_usage.key_cert_sign:
|
||||
fields.append("keyCertSign")
|
||||
if key_usage.crl_sign:
|
||||
fields.append("cRLSign")
|
||||
if key_usage.encipher_only:
|
||||
fields.append("encipherOnly")
|
||||
if key_usage.decipher_only:
|
||||
fields.append("decipherOnly")
|
||||
return ", ".join(fields)
|
||||
|
||||
def format_extended_key_usage(ext_key_usage):
|
||||
usages = []
|
||||
for oid in ext_key_usage:
|
||||
try:
|
||||
name = oid._name # Falls vorhanden
|
||||
except Exception:
|
||||
name = oid.dotted_string
|
||||
usages.append(name)
|
||||
return ", ".join(usages)
|
||||
|
||||
def format_crl_distribution_points(crl_dp):
|
||||
urls = []
|
||||
for dp in crl_dp:
|
||||
if dp.full_name:
|
||||
for gn in dp.full_name:
|
||||
try:
|
||||
urls.append(gn.value)
|
||||
except Exception:
|
||||
pass
|
||||
return ", ".join(urls)
|
||||
|
||||
def format_authority_information_access(aia):
|
||||
lines = []
|
||||
for ad in aia:
|
||||
try:
|
||||
method = ad.access_method._name
|
||||
except Exception:
|
||||
method = ad.access_method.dotted_string
|
||||
location = ad.access_location.value
|
||||
lines.append(f"{method}: {location}")
|
||||
return "; ".join(lines)
|
||||
|
||||
def format_certificate_policies(cp):
|
||||
policies = []
|
||||
for policy_info in cp:
|
||||
try:
|
||||
policy_name = policy_info.policy_identifier._name
|
||||
except Exception:
|
||||
policy_name = policy_info.policy_identifier.dotted_string
|
||||
if policy_info.policy_qualifiers:
|
||||
qualifiers = ", ".join(str(q) for q in policy_info.policy_qualifiers)
|
||||
policies.append(f"{policy_name} ({qualifiers})")
|
||||
else:
|
||||
policies.append(policy_name)
|
||||
return "; ".join(policies)
|
||||
|
||||
def format_sct(sct_value):
|
||||
try:
|
||||
if hasattr(sct_value, "scts"):
|
||||
return f"{len(sct_value.scts)} SCTs"
|
||||
else:
|
||||
return str(sct_value)
|
||||
except Exception:
|
||||
return str(sct_value)
|
||||
|
||||
# Helper-Funktion, die alle Detailinformationen eines Zertifikats formatiert zurückgibt.
|
||||
# Hier wird die Version um 1 erhöht, sodass v1, v2 und v3 als 1, 2 bzw. 3 angezeigt werden.
|
||||
def format_certificate_details(cert, label="Zertifikat"):
|
||||
details = []
|
||||
details.append(f"========== {label} ==========")
|
||||
|
||||
details.append("Subject:")
|
||||
for key, value in cert.get_subject().get_components():
|
||||
details.append(f" {key.decode('utf-8')}: {value.decode('utf-8')}")
|
||||
details.append("Issuer:")
|
||||
for key, value in cert.get_issuer().get_components():
|
||||
details.append(f" {key.decode('utf-8')}: {value.decode('utf-8')}")
|
||||
|
||||
details.append(f"Serial Number: {cert.get_serial_number()}")
|
||||
# Hier wird der von OpenSSL zurückgegebene Wert (0-indexiert) um 1 erhöht.
|
||||
details.append(f"Version: {cert.get_version() + 1}")
|
||||
|
||||
try:
|
||||
sig_algo = cert.get_signature_algorithm().decode("utf-8")
|
||||
details.append(f"Signaturalgorithmus: {sig_algo}")
|
||||
except Exception as e:
|
||||
details.append(f"Fehler beim Auslesen des Signaturalgorithmus: {e}")
|
||||
|
||||
try:
|
||||
pubkey = cert.get_pubkey()
|
||||
key_type = pubkey.type()
|
||||
if key_type == OpenSSL.crypto.TYPE_RSA:
|
||||
key_algo = "RSA"
|
||||
elif key_type == OpenSSL.crypto.TYPE_DSA:
|
||||
key_algo = "DSA"
|
||||
elif hasattr(OpenSSL.crypto, "TYPE_EC") and key_type == OpenSSL.crypto.TYPE_EC:
|
||||
key_algo = "EC"
|
||||
elif key_type == OpenSSL.crypto.TYPE_DH:
|
||||
key_algo = "DH"
|
||||
else:
|
||||
key_algo = f"Unbekannt (Type: {key_type})"
|
||||
details.append(f"Public Key Algorithmus: {key_algo}")
|
||||
except Exception as e:
|
||||
details.append(f"Fehler beim Auslesen des Public Key Algorithmus: {e}")
|
||||
|
||||
try:
|
||||
date_format = "%Y%m%d%H%M%SZ"
|
||||
not_before_str = cert.get_notBefore().decode('utf-8')
|
||||
not_after_str = cert.get_notAfter().decode('utf-8')
|
||||
not_before_dt = datetime.strptime(not_before_str, date_format)
|
||||
not_after_dt = datetime.strptime(not_after_str, date_format)
|
||||
details.append(f"Gültig von: {not_before_dt.strftime('%Y-%m-%d %H:%M:%S')}")
|
||||
details.append(f"Gültig bis: {not_after_dt.strftime('%Y-%m-%d %H:%M:%S')}")
|
||||
except Exception as e:
|
||||
details.append(f"Fehler beim Formatieren der Gültigkeitsdaten: {e}")
|
||||
|
||||
try:
|
||||
cert_bytes_for_san = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_ASN1, cert)
|
||||
cert_crypto = x509.load_der_x509_certificate(cert_bytes_for_san, default_backend())
|
||||
san_extension = cert_crypto.extensions.get_extension_for_class(x509.SubjectAlternativeName)
|
||||
details.append(f"Subject Alternative Names: {format_san(san_extension.value)}")
|
||||
except Exception as e:
|
||||
details.append(f"Keine SAN-Erweiterung gefunden: {e}")
|
||||
|
||||
try:
|
||||
cert_bytes_for_ext = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_ASN1, cert)
|
||||
cert_crypto = x509.load_der_x509_certificate(cert_bytes_for_ext, default_backend())
|
||||
|
||||
try:
|
||||
key_usage_ext = cert_crypto.extensions.get_extension_for_class(x509.KeyUsage)
|
||||
details.append(f"Key Usage: {format_key_usage(key_usage_ext.value)}")
|
||||
except Exception as e:
|
||||
details.append(f"Keine Key Usage gefunden: {e}")
|
||||
|
||||
try:
|
||||
ext_key_usage_ext = cert_crypto.extensions.get_extension_for_class(x509.ExtendedKeyUsage)
|
||||
details.append(f"Extended Key Usage: {format_extended_key_usage(ext_key_usage_ext.value)}")
|
||||
except Exception as e:
|
||||
details.append(f"Keine Extended Key Usage gefunden: {e}")
|
||||
|
||||
try:
|
||||
crl_dp_ext = cert_crypto.extensions.get_extension_for_class(x509.CRLDistributionPoints)
|
||||
details.append(f"CRL Distribution Points: {format_crl_distribution_points(crl_dp_ext.value)}")
|
||||
except Exception as e:
|
||||
details.append(f"Keine CRL Distribution Points gefunden: {e}")
|
||||
|
||||
try:
|
||||
aia_ext = cert_crypto.extensions.get_extension_for_class(x509.AuthorityInformationAccess)
|
||||
details.append(f"Authority Information Access: {format_authority_information_access(aia_ext.value)}")
|
||||
except Exception as e:
|
||||
details.append(f"Keine Authority Information Access gefunden: {e}")
|
||||
|
||||
try:
|
||||
cp_ext = cert_crypto.extensions.get_extension_for_class(x509.CertificatePolicies)
|
||||
details.append(f"Certificate Policies: {format_certificate_policies(cp_ext.value)}")
|
||||
except Exception as e:
|
||||
details.append(f"Keine Certificate Policies gefunden: {e}")
|
||||
|
||||
try:
|
||||
ski_ext = cert_crypto.extensions.get_extension_for_class(x509.SubjectKeyIdentifier)
|
||||
details.append(f"Subject Key Identifier: {ski_ext.value.digest.hex()}")
|
||||
except Exception as e:
|
||||
details.append(f"Kein Subject Key Identifier gefunden: {e}")
|
||||
|
||||
try:
|
||||
aki_ext = cert_crypto.extensions.get_extension_for_class(x509.AuthorityKeyIdentifier)
|
||||
key_id = aki_ext.value.key_identifier
|
||||
if key_id:
|
||||
details.append(f"Authority Key Identifier: {key_id.hex()}")
|
||||
else:
|
||||
details.append("Authority Key Identifier: N/A")
|
||||
except Exception as e:
|
||||
details.append(f"Keine Authority Key Identifier gefunden: {e}")
|
||||
|
||||
try:
|
||||
fingerprint = cert.digest("sha256")
|
||||
details.append(f"SHA256-Fingerprint: {fingerprint.decode()}")
|
||||
except Exception as e:
|
||||
details.append(f"Fehler beim Auslesen des Fingerprints: {e}")
|
||||
|
||||
try:
|
||||
sct_ext = cert_crypto.extensions.get_extension_for_oid(x509.ObjectIdentifier("1.3.6.1.4.1.11129.2.4.2"))
|
||||
details.append(f"SCT (Certificate Transparency): {format_sct(sct_ext.value)}")
|
||||
except Exception as e:
|
||||
details.append(f"Keine SCT-Erweiterung gefunden: {e}")
|
||||
except Exception as e:
|
||||
details.append(f"Fehler beim Auslesen weiterer Zertifikatsinformationen: {e}")
|
||||
details.append(" ")
|
||||
details.append("=================================================================== ENDE ===================================================================")
|
||||
details.append(" ")
|
||||
|
||||
return "\n".join(details)
|
||||
|
||||
# Hilfsfunktion zum rekursiven Herunterladen der Aussteller-Kette (Intermediate + Root CA)
|
||||
def download_issuer_chain(cert_crypto):
|
||||
chain = []
|
||||
current_cert = cert_crypto
|
||||
while True:
|
||||
try:
|
||||
aia_ext = current_cert.extensions.get_extension_for_class(x509.AuthorityInformationAccess)
|
||||
ca_issuers = [ad.access_location.value for ad in aia_ext.value if ad.access_method.dotted_string == "1.3.6.1.5.5.7.48.2"]
|
||||
if not ca_issuers:
|
||||
break
|
||||
issuer_url = ca_issuers[0]
|
||||
issuer_cert_data = requests.get(issuer_url, timeout=5).content
|
||||
issuer_cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_ASN1, issuer_cert_data)
|
||||
chain.append(issuer_cert)
|
||||
issuer_cert_bytes = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_ASN1, issuer_cert)
|
||||
issuer_cert_crypto = x509.load_der_x509_certificate(issuer_cert_bytes, default_backend())
|
||||
# Wenn self-signed, haben wir den Root erreicht
|
||||
if issuer_cert_crypto.subject == issuer_cert_crypto.issuer:
|
||||
break
|
||||
current_cert = issuer_cert_crypto
|
||||
except Exception:
|
||||
break
|
||||
return chain
|
||||
|
||||
class HTTP3Client(QuicConnectionProtocol):
|
||||
def quic_event_received(self, event):
|
||||
if isinstance(event, HandshakeCompleted):
|
||||
output_lines = []
|
||||
output_lines.append("TLS-Handshake abgeschlossen!")
|
||||
output_lines.append("Zertifikatsinformationen:")
|
||||
|
||||
# Versuche, Zertifikate aus dem Event zu holen
|
||||
certs = getattr(event, "peer_certificates", None)
|
||||
if certs is None:
|
||||
if hasattr(self._quic.tls, "peer_certificate"):
|
||||
cert = self._quic.tls.peer_certificate
|
||||
else:
|
||||
cert = getattr(self._quic.tls, "_peer_certificate", None)
|
||||
if cert:
|
||||
certs = [cert]
|
||||
|
||||
provided_chain = []
|
||||
if certs:
|
||||
for idx, cert_obj in enumerate(certs, start=1):
|
||||
try:
|
||||
if not isinstance(cert_obj, bytes):
|
||||
from cryptography.hazmat.primitives import serialization
|
||||
cert_bytes = cert_obj.public_bytes(encoding=serialization.Encoding.DER)
|
||||
else:
|
||||
cert_bytes = cert_obj
|
||||
cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_ASN1, cert_bytes)
|
||||
provided_chain.append(cert)
|
||||
output_lines.append(format_certificate_details(cert, label=f"Zertifikat {idx}"))
|
||||
except Exception as e:
|
||||
output_lines.append(f"Zertifikat {idx}: Fehler beim Laden des Zertifikats: {e}")
|
||||
|
||||
# Rekursives Herunterladen der Aussteller-Kette, falls das letzte übermittelte Zertifikat nicht self-signed ist
|
||||
if provided_chain:
|
||||
last_cert_bytes = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_ASN1, provided_chain[-1])
|
||||
last_cert_crypto = x509.load_der_x509_certificate(last_cert_bytes, default_backend())
|
||||
if last_cert_crypto.subject != last_cert_crypto.issuer:
|
||||
chain_downloaded = download_issuer_chain(last_cert_crypto)
|
||||
for i, issuer_cert in enumerate(chain_downloaded):
|
||||
if i == len(chain_downloaded) - 1:
|
||||
label = "Root CA Zertifikat"
|
||||
else:
|
||||
label = "Intermediate Zertifikat"
|
||||
output_lines.append(format_certificate_details(issuer_cert, label=label))
|
||||
|
||||
final_output = "\n\n".join(output_lines)
|
||||
logging.info(final_output)
|
||||
|
||||
async def run(host, port):
|
||||
configuration = QuicConfiguration(is_client=True, alpn_protocols=["h3"])
|
||||
try:
|
||||
async with connect(host, port, configuration=configuration, create_protocol=HTTP3Client) as client:
|
||||
await asyncio.sleep(2)
|
||||
except Exception as e:
|
||||
logging.error("Fehler beim Verbindungsaufbau: %s", e)
|
||||
|
||||
if __name__ == '__main__':
|
||||
parser = argparse.ArgumentParser(description="HTTP/3 Zertifikats-Info Tool")
|
||||
parser.add_argument("host", help="Hostname oder IP, zu dem verbunden werden soll")
|
||||
parser.add_argument("--port", type=int, default=443, help="Port, der verwendet werden soll (default: 443)")
|
||||
parser.add_argument("-o", "--output", help="Dateipfad, in den der Output geschrieben wird", default=None)
|
||||
args = parser.parse_args()
|
||||
|
||||
if args.output:
|
||||
logging.basicConfig(filename=args.output, filemode="w", level=logging.INFO, format="%(message)s")
|
||||
else:
|
||||
logging.basicConfig(level=logging.INFO, format="%(message)s")
|
||||
|
||||
asyncio.run(run(args.host, args.port))
|
||||
Reference in New Issue
Block a user