WIP - timestamping enricher

This commit is contained in:
Patrick Robertson
2025-02-11 18:18:19 +00:00
parent 3163cb793a
commit d0c379a3ba
2 changed files with 107 additions and 26 deletions

View File

@@ -1,16 +1,21 @@
import os
from loguru import logger
from tsp_client import TSPSigner, SigningSettings, TSPVerifier
from tsp_client.algorithms import DigestAlgorithm
from importlib.metadata import version
from asn1crypto.cms import ContentInfo
from certvalidator import CertificateValidator, ValidationContext
from asn1crypto import pem
from asn1crypto.core import Asn1Value
import certifi
from importlib.metadata import version
import requests
from rfc3161_client import (
TimestampRequestBuilder,
TimeStampResponse,
decode_timestamp_response,
VerifierBuilder
)
from rfc3161_client import VerificationError as Rfc3161VerificationError
from rfc3161_client.base import HashAlgorithm
import certifi
from auto_archiver.core import Enricher
from auto_archiver.core import Metadata, Media
from auto_archiver.version import __version__
class TimestampingEnricher(Enricher):
"""
@@ -21,6 +26,22 @@ class TimestampingEnricher(Enricher):
See https://gist.github.com/Manouchehri/fd754e402d98430243455713efada710 for list of timestamp authorities.
"""
def setup(self):
self.session = requests.Session()
self.session.headers.update(
{
"Content-Type": "application/timestamp-query",
"User-Agent": f"Auto-Archiver {__version__}",
"Accept": "application/timestamp-reply",
}
)
def __del__(self) -> None:
"""
Terminates the underlying network session.
"""
self.session.close()
def enrich(self, to_enrich: Metadata) -> None:
url = to_enrich.get_url()
logger.debug(f"RFC3161 timestamping existing files for {url=}")
@@ -32,8 +53,7 @@ class TimestampingEnricher(Enricher):
logger.warning(f"No hashes found in {url=}")
return
tmp_dir = self.tmp_dir
hashes_fn = os.path.join(tmp_dir, "hashes.txt")
hashes_fn = os.path.join(self.tmp_dir, "hashes.txt")
data_to_sign = "\n".join(hashes)
with open(hashes_fn, "w") as f:
@@ -45,14 +65,15 @@ class TimestampingEnricher(Enricher):
for tsa_url in self.tsa_urls:
try:
message = bytes(data_to_sign, encoding='utf8')
signed = self.sign_data(tsa_url, message)
signed: TimeStampResponse = self.sign_data(tsa_url, message)
# fail if there's any issue with the certificates, uses certifi list of trusted CAs
self.verify_signed(signed, message)
# download and verify timestamping certificate
cert_chain = self.download_and_verify_certificate(signed)
# continue with saving the timestamp token
tst_fn = os.path.join(tmp_dir, f"timestamp_token_{slugify(tsa_url)}")
with open(tst_fn, "wb") as f: f.write(signed)
tst_fn = os.path.join(self.tmp_dir, f"timestamp_token_{slugify(tsa_url)}")
with open(tst_fn, "wb") as f:
f.write(signed)
timestamp_tokens.append(Media(filename=tst_fn).set("tsa", tsa_url).set("cert_chain", cert_chain))
except Exception as e:
logger.warning(f"Error while timestamping {url=} with {tsa_url=}: {e}")
@@ -68,17 +89,73 @@ class TimestampingEnricher(Enricher):
else:
logger.warning(f"No successful timestamps for {url=}")
def verify_signed(self, signed: bytes, message: bytes) -> None:
verifier = TSPVerifier(certifi.where())
verifier.verify(signed, message=message)
def verify_signed(self, timestamp_response: TimeStampResponse, signature: bytes) -> None:
"""
Verify a Signed Timestamp using the TSA provided by the Trusted Root.
"""
cert_authorities = self._trusted_root.get_timestamp_authorities()
valid = False
for certificate_authority in cert_authorities:
certificates = certificate_authority.certificates(allow_expired=True)
def sign_data(self, tsa_url: str, bytes_data: bytes) -> bytes:
signing_settings = SigningSettings(tsp_server=tsa_url, digest_algorithm=DigestAlgorithm.SHA256)
signer = TSPSigner()
# send TSQ and get TSR from the TSA server
return signer.sign(message=bytes_data, signing_settings=signing_settings)
builder = VerifierBuilder()
for certificate in certificates:
builder.add_root_certificate(certificate)
verifier = builder.build()
try:
verifier.verify(timestamp_response, signature)
except Rfc3161VerificationError as e:
logger.debug("Unable to verify Timestamp with CA.")
logger.exception(e)
continue
if (
certificate_authority.validity_period_start
and certificate_authority.validity_period_end
):
if (
certificate_authority.validity_period_start
<= timestamp_response.tst_info.gen_time
< certificate_authority.validity_period_end
):
return TimestampVerificationResult(
source=TimestampSource.TIMESTAMP_AUTHORITY,
time=timestamp_response.tst_info.gen_time,
)
logger.debug(
"Unable to verify Timestamp because not in CA time range."
)
else:
logger.debug(
"Unable to verify Timestamp because no validity provided."
)
return None
def sign_data(self, tsa_url: str, bytes_data: bytes) -> TimeStampResponse:
# see https://github.com/sigstore/sigstore-python/blob/99948d5b80525a5a104e904ffea58169dc6e0629/sigstore/_internal/timestamp.py#L84-L121
timestamp_request = (
TimestampRequestBuilder().data(bytes_data).nonce(nonce=True).build()
)
try:
response = self.session.post(tsa_url, data=timestamp_request.as_bytes(), timeout=10)
response.raise_for_status()
except requests.RequestException as e:
logger.error(f"Error while sending request to {tsa_url=}: {e}")
raise
# Check that we can parse the response but do not *verify* it
try:
timestamp_response = decode_timestamp_response(response.content)
except ValueError as e:
logger.error(f"Invalid timestamp response from server {tsa_url}: {e}")
raise
return timestamp_response
def load_tst_certs(self, signed: bytes) -> list[Asn1Value]:
def load_tst_certs(self, signed: bytes):
return ContentInfo.load(signed)["content"]["certificates"]
def download_and_verify_certificate(self, signed: bytes) -> list[Media]:

View File

@@ -1,6 +1,10 @@
import pytest
from auto_archiver.modules.timestamping_enricher.timestamping_enricher import TimestampingEnricher
from rfc3161_client import (
TimestampRequestBuilder,
TimeStampResponse,
decode_timestamp_response,
)
@pytest.fixture
def digicert():
@@ -9,11 +13,11 @@ def digicert():
@pytest.mark.download
def test_sign_data(setup_module):
tsa_url = "http://timestamp.digicert.com"
tsa_url = "http://timestamp.identrust.com"
tsp: TimestampingEnricher = setup_module("timestamping_enricher")
data = b"4b7b4e39f12b8c725e6e603e6d4422500316df94211070682ef10260ff5759ef"
result: bytes = tsp.sign_data(tsa_url, data)
assert isinstance(result, bytes)
result: TimeStampResponse = tsp.sign_data(tsa_url, data)
assert isinstance(result, TimeStampResponse)
try:
tsp.verify_signed(result, data)