diff --git a/src/auto_archiver/modules/timestamping_enricher/timestamping_enricher.py b/src/auto_archiver/modules/timestamping_enricher/timestamping_enricher.py index 1a0f932..90860dc 100644 --- a/src/auto_archiver/modules/timestamping_enricher/timestamping_enricher.py +++ b/src/auto_archiver/modules/timestamping_enricher/timestamping_enricher.py @@ -1,16 +1,21 @@ import os from loguru import logger -from tsp_client import TSPSigner, SigningSettings, TSPVerifier -from tsp_client.algorithms import DigestAlgorithm -from importlib.metadata import version -from asn1crypto.cms import ContentInfo -from certvalidator import CertificateValidator, ValidationContext -from asn1crypto import pem -from asn1crypto.core import Asn1Value -import certifi +from importlib.metadata import version + +import requests +from rfc3161_client import ( + TimestampRequestBuilder, + TimeStampResponse, + decode_timestamp_response, + VerifierBuilder +) +from rfc3161_client import VerificationError as Rfc3161VerificationError +from rfc3161_client.base import HashAlgorithm +import certifi from auto_archiver.core import Enricher from auto_archiver.core import Metadata, Media +from auto_archiver.version import __version__ class TimestampingEnricher(Enricher): """ @@ -21,6 +26,22 @@ class TimestampingEnricher(Enricher): See https://gist.github.com/Manouchehri/fd754e402d98430243455713efada710 for list of timestamp authorities. """ + def setup(self): + self.session = requests.Session() + self.session.headers.update( + { + "Content-Type": "application/timestamp-query", + "User-Agent": f"Auto-Archiver {__version__}", + "Accept": "application/timestamp-reply", + } + ) + + def __del__(self) -> None: + """ + Terminates the underlying network session. + """ + self.session.close() + def enrich(self, to_enrich: Metadata) -> None: url = to_enrich.get_url() logger.debug(f"RFC3161 timestamping existing files for {url=}") @@ -32,8 +53,7 @@ class TimestampingEnricher(Enricher): logger.warning(f"No hashes found in {url=}") return - tmp_dir = self.tmp_dir - hashes_fn = os.path.join(tmp_dir, "hashes.txt") + hashes_fn = os.path.join(self.tmp_dir, "hashes.txt") data_to_sign = "\n".join(hashes) with open(hashes_fn, "w") as f: @@ -45,14 +65,15 @@ class TimestampingEnricher(Enricher): for tsa_url in self.tsa_urls: try: message = bytes(data_to_sign, encoding='utf8') - signed = self.sign_data(tsa_url, message) + signed: TimeStampResponse = self.sign_data(tsa_url, message) # fail if there's any issue with the certificates, uses certifi list of trusted CAs self.verify_signed(signed, message) # download and verify timestamping certificate cert_chain = self.download_and_verify_certificate(signed) # continue with saving the timestamp token - tst_fn = os.path.join(tmp_dir, f"timestamp_token_{slugify(tsa_url)}") - with open(tst_fn, "wb") as f: f.write(signed) + tst_fn = os.path.join(self.tmp_dir, f"timestamp_token_{slugify(tsa_url)}") + with open(tst_fn, "wb") as f: + f.write(signed) timestamp_tokens.append(Media(filename=tst_fn).set("tsa", tsa_url).set("cert_chain", cert_chain)) except Exception as e: logger.warning(f"Error while timestamping {url=} with {tsa_url=}: {e}") @@ -68,17 +89,73 @@ class TimestampingEnricher(Enricher): else: logger.warning(f"No successful timestamps for {url=}") - def verify_signed(self, signed: bytes, message: bytes) -> None: - verifier = TSPVerifier(certifi.where()) - verifier.verify(signed, message=message) + def verify_signed(self, timestamp_response: TimeStampResponse, signature: bytes) -> None: + """ + Verify a Signed Timestamp using the TSA provided by the Trusted Root. + """ + cert_authorities = self._trusted_root.get_timestamp_authorities() + valid = False + for certificate_authority in cert_authorities: + certificates = certificate_authority.certificates(allow_expired=True) - def sign_data(self, tsa_url: str, bytes_data: bytes) -> bytes: - signing_settings = SigningSettings(tsp_server=tsa_url, digest_algorithm=DigestAlgorithm.SHA256) - signer = TSPSigner() - # send TSQ and get TSR from the TSA server - return signer.sign(message=bytes_data, signing_settings=signing_settings) + builder = VerifierBuilder() + for certificate in certificates: + builder.add_root_certificate(certificate) + + verifier = builder.build() + try: + verifier.verify(timestamp_response, signature) + except Rfc3161VerificationError as e: + logger.debug("Unable to verify Timestamp with CA.") + logger.exception(e) + continue + + if ( + certificate_authority.validity_period_start + and certificate_authority.validity_period_end + ): + if ( + certificate_authority.validity_period_start + <= timestamp_response.tst_info.gen_time + < certificate_authority.validity_period_end + ): + return TimestampVerificationResult( + source=TimestampSource.TIMESTAMP_AUTHORITY, + time=timestamp_response.tst_info.gen_time, + ) + + logger.debug( + "Unable to verify Timestamp because not in CA time range." + ) + else: + logger.debug( + "Unable to verify Timestamp because no validity provided." + ) + + return None + + def sign_data(self, tsa_url: str, bytes_data: bytes) -> TimeStampResponse: + # see https://github.com/sigstore/sigstore-python/blob/99948d5b80525a5a104e904ffea58169dc6e0629/sigstore/_internal/timestamp.py#L84-L121 + + timestamp_request = ( + TimestampRequestBuilder().data(bytes_data).nonce(nonce=True).build() + ) + try: + response = self.session.post(tsa_url, data=timestamp_request.as_bytes(), timeout=10) + response.raise_for_status() + except requests.RequestException as e: + logger.error(f"Error while sending request to {tsa_url=}: {e}") + raise + + # Check that we can parse the response but do not *verify* it + try: + timestamp_response = decode_timestamp_response(response.content) + except ValueError as e: + logger.error(f"Invalid timestamp response from server {tsa_url}: {e}") + raise + return timestamp_response - def load_tst_certs(self, signed: bytes) -> list[Asn1Value]: + def load_tst_certs(self, signed: bytes): return ContentInfo.load(signed)["content"]["certificates"] def download_and_verify_certificate(self, signed: bytes) -> list[Media]: diff --git a/tests/enrichers/test_timestamping_enricher.py b/tests/enrichers/test_timestamping_enricher.py index b8351d3..a90d4bd 100644 --- a/tests/enrichers/test_timestamping_enricher.py +++ b/tests/enrichers/test_timestamping_enricher.py @@ -1,6 +1,10 @@ import pytest from auto_archiver.modules.timestamping_enricher.timestamping_enricher import TimestampingEnricher - +from rfc3161_client import ( + TimestampRequestBuilder, + TimeStampResponse, + decode_timestamp_response, +) @pytest.fixture def digicert(): @@ -9,11 +13,11 @@ def digicert(): @pytest.mark.download def test_sign_data(setup_module): - tsa_url = "http://timestamp.digicert.com" + tsa_url = "http://timestamp.identrust.com" tsp: TimestampingEnricher = setup_module("timestamping_enricher") data = b"4b7b4e39f12b8c725e6e603e6d4422500316df94211070682ef10260ff5759ef" - result: bytes = tsp.sign_data(tsa_url, data) - assert isinstance(result, bytes) + result: TimeStampResponse = tsp.sign_data(tsa_url, data) + assert isinstance(result, TimeStampResponse) try: tsp.verify_signed(result, data)