Improved unit tests for timestamping

This commit is contained in:
Patrick Robertson
2025-03-11 11:08:52 +00:00
parent 3f6acc0917
commit 1db8be91db
8 changed files with 184 additions and 35 deletions

View File

@@ -81,7 +81,7 @@ class TimestampingEnricher(Enricher):
raise ValueError(f"No valid root certificate found for {tsa_url=}. Are you sure it's a trusted TSA? Or define an alternative trusted root with `cert_authorities`.")
# save the timestamping certificate
cert_chain = self.save_certificate(signed)
cert_chain = self.save_certificate(signed, root_cert)
# continue with saving the timestamp token
tst_fn = os.path.join(self.tmp_dir, f"timestamp_token_{slugify(tsa_url)}")
@@ -102,9 +102,19 @@ class TimestampingEnricher(Enricher):
else:
logger.warning(f"No successful timestamps for {url=}")
def verify_signed(self, timestamp_response: TimeStampResponse, message: bytes) -> None:
def verify_signed(self, timestamp_response: TimeStampResponse, message: bytes) -> x509.Certificate:
"""
Verify a Signed Timestamp using the TSA provided by the Trusted Root.
Verify a Signed Timestamp Response is trusted by a known Certificate Authority.
Args:
timestamp_response (TimeStampResponse): The signed timestamp response.
message (bytes): The message that was timestamped.
Returns:
x509.Certificate: A valid root certificate that was used to sign the timestamp response, or None
Raises:
ValueError: If no valid root certificate was found in the trusted root store.
"""
trusted_root_path = self.cert_authorities or certifi.where()
@@ -148,7 +158,7 @@ class TimestampingEnricher(Enricher):
logger.debug(f"Unable to verify Timestamp with CA {certificate.subject}: {e}")
continue
return None
raise ValueError(f"No valid root certificate found in {trusted_root_path}.")
def sign_data(self, tsa_url: str, bytes_data: bytes) -> TimeStampResponse:
# see https://github.com/sigstore/sigstore-python/blob/99948d5b80525a5a104e904ffea58169dc6e0629/sigstore/_internal/timestamp.py#L84-L121
@@ -173,19 +183,31 @@ class TimestampingEnricher(Enricher):
def tst_certs(self, tsp_response: TimeStampResponse):
signed_data: SignedData = tsp_response.signed_data
return [x509.load_der_x509_certificate(c) for c in signed_data.certificates]
certs = [x509.load_der_x509_certificate(c) for c in signed_data.certificates]
# reorder the certs to be in the correct order
ordered_certs = []
while(len(ordered_certs) < len(certs)):
if len(ordered_certs) == 0:
for cert in certs:
if not [c for c in certs if c.subject == cert.issuer]:
ordered_certs.append(cert)
break
else:
for cert in certs:
if cert.issuer == ordered_certs[-1].subject:
ordered_certs.append(cert)
break
return ordered_certs
def save_certificate(self, tsp_response: TimeStampResponse) -> list[Media]:
def save_certificate(self, tsp_response: TimeStampResponse, verified_root_cert: x509.Certificate) -> list[Media]:
# returns the leaf certificate URL, fails if not set
certificates = self.tst_certs(tsp_response)
certificates = self.tst_certs(tsp_response) + [verified_root_cert]
cert_chain = []
for cert in certificates:
cert_fn = os.path.join(self.tmp_dir, f"{str(cert.serial_number)[:20]}.crt")
print(cert_fn)
for i, cert in enumerate(certificates):
cert_fn = os.path.join(self.tmp_dir, f"{i+1} {str(cert.serial_number)[:20]}.crt")
with open(cert_fn, "wb") as f:
f.write(cert.public_bytes(encoding=serialization.Encoding.PEM))
cert_chain.append(Media(filename=cert_fn).set("subject", cert.subject.get_attributes_for_oid(x509.NameOID.COMMON_NAME)[0].value))