diff --git a/src/auto_archiver/modules/timestamping_enricher/timestamping_enricher.py b/src/auto_archiver/modules/timestamping_enricher/timestamping_enricher.py index a7a0aee..4d5dd20 100644 --- a/src/auto_archiver/modules/timestamping_enricher/timestamping_enricher.py +++ b/src/auto_archiver/modules/timestamping_enricher/timestamping_enricher.py @@ -6,11 +6,11 @@ from importlib.metadata import version from asn1crypto.cms import ContentInfo from certvalidator import CertificateValidator, ValidationContext from asn1crypto import pem +from asn1crypto.core import Asn1Value import certifi from auto_archiver.core import Enricher from auto_archiver.core import Metadata, ArchivingContext, Media -from auto_archiver.core import Extractor class TimestampingEnricher(Enricher): @@ -45,13 +45,10 @@ class TimestampingEnricher(Enricher): from slugify import slugify for tsa_url in self.tsa_urls: try: - signing_settings = SigningSettings(tsp_server=tsa_url, digest_algorithm=DigestAlgorithm.SHA256) - signer = TSPSigner() message = bytes(data_to_sign, encoding='utf8') - # send TSQ and get TSR from the TSA server - signed = signer.sign(message=message, signing_settings=signing_settings) + signed = self.sign_data(tsa_url, message) # fail if there's any issue with the certificates, uses certifi list of trusted CAs - TSPVerifier(certifi.where()).verify(signed, message=message) + self.verify_signed(signed, message) # download and verify timestamping certificate cert_chain = self.download_and_verify_certificate(signed) # continue with saving the timestamp token @@ -72,9 +69,22 @@ class TimestampingEnricher(Enricher): else: logger.warning(f"No successful timestamps for {url=}") + def verify_signed(self, signed: bytes, message: bytes) -> None: + verifier = TSPVerifier(certifi.where()) + verifier.verify(signed, message=message) + + def sign_data(self, tsa_url: str, bytes_data: bytes) -> bytes: + signing_settings = SigningSettings(tsp_server=tsa_url, digest_algorithm=DigestAlgorithm.SHA256) + signer = TSPSigner() + # send TSQ and get TSR from the TSA server + return signer.sign(message=bytes_data, signing_settings=signing_settings) + + def load_tst_certs(self, signed: bytes) -> list[Asn1Value]: + return ContentInfo.load(signed)["content"]["certificates"] + def download_and_verify_certificate(self, signed: bytes) -> list[Media]: # returns the leaf certificate URL, fails if not set - tst = ContentInfo.load(signed) + certificates = self.load_tst_certs(signed) trust_roots = [] with open(certifi.where(), 'rb') as f: @@ -82,7 +92,6 @@ class TimestampingEnricher(Enricher): trust_roots.append(der_bytes) context = ValidationContext(trust_roots=trust_roots) - certificates = tst["content"]["certificates"] first_cert = certificates[0].dump() intermediate_certs = [] for i in range(1, len(certificates)): # cannot use list comprehension [1:] diff --git a/tests/data/timestamp_token_digicert_com.crt b/tests/data/timestamp_token_digicert_com.crt new file mode 100644 index 0000000..592edf4 Binary files /dev/null and b/tests/data/timestamp_token_digicert_com.crt differ diff --git a/tests/enrichers/test_timestamping_enricher.py b/tests/enrichers/test_timestamping_enricher.py new file mode 100644 index 0000000..3c978f0 --- /dev/null +++ b/tests/enrichers/test_timestamping_enricher.py @@ -0,0 +1,41 @@ +import pytest +from auto_archiver.modules.timestamping_enricher.timestamping_enricher import TimestampingEnricher + + +@pytest.fixture +def digicert(): + with open("tests/data/timestamp_token_digicert_com.crt", "rb") as f: + return f.read() + +@pytest.mark.download +def test_sign_data(setup_module): + tsa_url = "http://timestamp.digicert.com" + tsp: TimestampingEnricher = setup_module("timestamping_enricher") + data = b"4b7b4e39f12b8c725e6e603e6d4422500316df94211070682ef10260ff5759ef" + result: bytes = tsp.sign_data(tsa_url, data) + assert isinstance(result, bytes) + + try: + tsp.verify_signed(result, data) + except Exception as e: + pytest.fail(f"Verification failed: {e}") + +def test_tsp_enricher_download_syndication(setup_module, digicert): + tsp: TimestampingEnricher = setup_module("timestamping_enricher") + try: + cert_chain = tsp.download_and_verify_certificate(digicert) + assert len(cert_chain) == 3 + assert cert_chain[0].filename == "/var/folders/h7/g67pz_kx67q7qxzzrrhvry5r0000gn/T/74515005589773707779.crt" + assert cert_chain[1].filename == "/var/folders/h7/g67pz_kx67q7qxzzrrhvry5r0000gn/T/95861100433808324400.crt" + assert cert_chain[2].filename == "/var/folders/h7/g67pz_kx67q7qxzzrrhvry5r0000gn/T/15527051335772373346.crt" + except Exception as e: + pytest.fail(f"Verification failed: {e}") + + +def test_tst_cert_valid(setup_module, digicert): + tsp: TimestampingEnricher = setup_module("timestamping_enricher") + + try: + tsp.verify_signed(digicert, b"4b7b4e39f12b8c725e6e603e6d4422500316df94211070682ef10260ff5759ef") + except Exception as e: + pytest.fail(f"Verification failed: {e}") \ No newline at end of file