diff --git a/src/auto_archiver/modules/timestamping_enricher/timestamping_enricher.py b/src/auto_archiver/modules/timestamping_enricher/timestamping_enricher.py index c138cee..a779426 100644 --- a/src/auto_archiver/modules/timestamping_enricher/timestamping_enricher.py +++ b/src/auto_archiver/modules/timestamping_enricher/timestamping_enricher.py @@ -69,10 +69,16 @@ class TimestampingEnricher(Enricher): try: message = bytes(data_to_sign, encoding='utf8') signed: TimeStampResponse = self.sign_data(tsa_url, message) - # fail if there's any issue with the certificates, uses certifi list of trusted CAs - self.verify_signed(signed, message) - # download and verify timestamping certificate - cert_chain = self.download_certificate(signed) + + # fail if there's any issue with the certificates, uses certifi list of trusted CAs or the user-defined `cert_authorities` + root_cert = self.verify_signed(signed, message) + + if not root_cert: + raise ValueError(f"No valid root certificate found for {tsa_url=}. Are you sure it's a trusted TSA? Or define an alternative trusted root with `cert_authorities`.") + + # save the timestamping certificate + cert_chain = self.save_certificate(signed) + # continue with saving the timestamp token tst_fn = os.path.join(self.tmp_dir, f"timestamp_token_{slugify(tsa_url)}") with open(tst_fn, "wb") as f: @@ -106,11 +112,18 @@ class TimestampingEnricher(Enricher): if not cert_authorities: raise ValueError(f"No trusted roots found in {trusted_root_path}.") + timestamp_certs = self.tst_certs(timestamp_response) + intermediate_certs = [] + for i, cert in enumerate(timestamp_certs): # cannot use list comprehension, it's a set + intermediate_certs.append(cert) for certificate in cert_authorities: builder = VerifierBuilder() builder.add_root_certificate(certificate) + for intermediate_cert in intermediate_certs: + builder.add_intermediate_certificate(intermediate_cert) + verifier = builder.build() try: verifier.verify(timestamp_response, signature) @@ -118,8 +131,8 @@ class TimestampingEnricher(Enricher): except Rfc3161VerificationError as e: logger.debug(f"Unable to verify Timestamp with CA {certificate.subject}: {e}") continue - - return False + + return None def sign_data(self, tsa_url: str, bytes_data: bytes) -> TimeStampResponse: # see https://github.com/sigstore/sigstore-python/blob/99948d5b80525a5a104e904ffea58169dc6e0629/sigstore/_internal/timestamp.py#L84-L121 @@ -142,12 +155,12 @@ class TimestampingEnricher(Enricher): raise return timestamp_response - def load_tst_certs(self, tsp_response: TimeStampResponse): + def tst_certs(self, tsp_response: TimeStampResponse): signed_data: SignedData = tsp_response.signed_data return [x509.load_der_x509_certificate(c) for c in signed_data.certificates] - def download_certificate(self, tsp_response: TimeStampResponse) -> list[Media]: + def save_certificate(self, tsp_response: TimeStampResponse) -> list[Media]: # returns the leaf certificate URL, fails if not set certificates = self.load_tst_certs(tsp_response)