diff --git a/src/auto_archiver/modules/timestamping_enricher/__manifest__.py b/src/auto_archiver/modules/timestamping_enricher/__manifest__.py index 6ad9c57..e945350 100644 --- a/src/auto_archiver/modules/timestamping_enricher/__manifest__.py +++ b/src/auto_archiver/modules/timestamping_enricher/__manifest__.py @@ -36,6 +36,11 @@ "http://tss.accv.es:8318/tsa", ], "help": "List of RFC3161 Time Stamp Authorities to use, separate with commas if passed via the command line.", + }, + "cert_authorities": { + "default": None, + "help": "Path to a file containing trusted Certificate Authorities (CAs) in PEM format. If empty, the default system authorities are used.", + "type": "str", } }, "description": """ diff --git a/src/auto_archiver/modules/timestamping_enricher/timestamping_enricher.py b/src/auto_archiver/modules/timestamping_enricher/timestamping_enricher.py index 90860dc..0031210 100644 --- a/src/auto_archiver/modules/timestamping_enricher/timestamping_enricher.py +++ b/src/auto_archiver/modules/timestamping_enricher/timestamping_enricher.py @@ -12,6 +12,8 @@ from rfc3161_client import ( ) from rfc3161_client import VerificationError as Rfc3161VerificationError from rfc3161_client.base import HashAlgorithm +from rfc3161_client.tsp import SignedData +from cryptography import x509 import certifi from auto_archiver.core import Enricher from auto_archiver.core import Metadata, Media @@ -69,7 +71,7 @@ class TimestampingEnricher(Enricher): # fail if there's any issue with the certificates, uses certifi list of trusted CAs self.verify_signed(signed, message) # download and verify timestamping certificate - cert_chain = self.download_and_verify_certificate(signed) + cert_chain = self.download_certificate(signed) # continue with saving the timestamp token tst_fn = os.path.join(self.tmp_dir, f"timestamp_token_{slugify(tsa_url)}") with open(tst_fn, "wb") as f: @@ -93,46 +95,31 @@ class TimestampingEnricher(Enricher): """ Verify a Signed Timestamp using the TSA provided by the Trusted Root. """ - cert_authorities = self._trusted_root.get_timestamp_authorities() - valid = False - for certificate_authority in cert_authorities: - certificates = certificate_authority.certificates(allow_expired=True) + trusted_root_path = self.cert_authorities or certifi.where() + cert_authorities = [] + + with open(trusted_root_path, 'rb') as f: + cert_authorities = x509.load_pem_x509_certificates(f.read()) + + if not cert_authorities: + raise ValueError(f"No trusted roots found in {trusted_root_path}.") + + + valid = False + for certificate in cert_authorities: builder = VerifierBuilder() - for certificate in certificates: - builder.add_root_certificate(certificate) + builder.add_root_certificate(certificate) verifier = builder.build() try: verifier.verify(timestamp_response, signature) + return certificate except Rfc3161VerificationError as e: - logger.debug("Unable to verify Timestamp with CA.") - logger.exception(e) + logger.debug(f"Unable to verify Timestamp with CA {certificate.subject}: {e}") continue - - if ( - certificate_authority.validity_period_start - and certificate_authority.validity_period_end - ): - if ( - certificate_authority.validity_period_start - <= timestamp_response.tst_info.gen_time - < certificate_authority.validity_period_end - ): - return TimestampVerificationResult( - source=TimestampSource.TIMESTAMP_AUTHORITY, - time=timestamp_response.tst_info.gen_time, - ) - - logger.debug( - "Unable to verify Timestamp because not in CA time range." - ) - else: - logger.debug( - "Unable to verify Timestamp because no validity provided." - ) - - return None + + return False def sign_data(self, tsa_url: str, bytes_data: bytes) -> TimeStampResponse: # see https://github.com/sigstore/sigstore-python/blob/99948d5b80525a5a104e904ffea58169dc6e0629/sigstore/_internal/timestamp.py#L84-L121 @@ -155,26 +142,16 @@ class TimestampingEnricher(Enricher): raise return timestamp_response - def load_tst_certs(self, signed: bytes): - return ContentInfo.load(signed)["content"]["certificates"] + def load_tst_certs(self, tsp_response: TimeStampResponse): + signed_data: SignedData = tsp_response.signed_data + certs = signed_data.certificates + - def download_and_verify_certificate(self, signed: bytes) -> list[Media]: + def download_certificate(self, tsp_response: TimeStampResponse) -> list[Media]: # returns the leaf certificate URL, fails if not set - certificates = self.load_tst_certs(signed) - trust_roots = [] - with open(certifi.where(), 'rb') as f: - for _, _, der_bytes in pem.unarmor(f.read(), multiple=True): - trust_roots.append(der_bytes) - context = ValidationContext(trust_roots=trust_roots) + certificates = self.load_tst_certs(tsp_response) - first_cert = certificates[0].dump() - intermediate_certs = [] - for i in range(1, len(certificates)): # cannot use list comprehension [1:] - intermediate_certs.append(certificates[i].dump()) - - validator = CertificateValidator(first_cert, intermediate_certs=intermediate_certs, validation_context=context) - path = validator.validate_usage({'digital_signature'}, extended_key_usage={'time_stamping'}) cert_chain = [] for cert in path: diff --git a/tests/enrichers/test_timestamping_enricher.py b/tests/enrichers/test_timestamping_enricher.py index a90d4bd..7d30e2f 100644 --- a/tests/enrichers/test_timestamping_enricher.py +++ b/tests/enrichers/test_timestamping_enricher.py @@ -15,15 +15,21 @@ def digicert(): def test_sign_data(setup_module): tsa_url = "http://timestamp.identrust.com" tsp: TimestampingEnricher = setup_module("timestamping_enricher") + data = b"4b7b4e39f12b8c725e6e603e6d4422500316df94211070682ef10260ff5759ef" result: TimeStampResponse = tsp.sign_data(tsa_url, data) assert isinstance(result, TimeStampResponse) + try: - tsp.verify_signed(result, data) + valid_root = tsp.verify_signed(result, data) + assert valid_root.subject == "CN=Entrust Root Certification Authority - G2, OU=(c) 2009 Entrust, Inc. - for authorized use only, OU=See www.entrust.net/legal-terms, O=Entrust, Inc., C=" except Exception as e: pytest.fail(f"Verification failed: {e}") + # test downloading the cert + cert_chain = tsp.download_and_verify_certificate(result) + def test_tsp_enricher_download_syndication(setup_module, digicert): tsp: TimestampingEnricher = setup_module("timestamping_enricher")