embed retry into timestamping

This commit is contained in:
msramalho
2025-07-10 14:49:53 +01:00
parent d3efd7121c
commit 2081c16555

View File

@@ -4,12 +4,12 @@ from importlib.metadata import version
import hashlib
from slugify import slugify
from retrying import retry
import requests
from auto_archiver.utils.custom_logger import logger
from rfc3161_client import (decode_timestamp_response,TimestampRequestBuilder,TimeStampResponse, VerifierBuilder)
from rfc3161_client import (decode_timestamp_response, TimestampRequestBuilder, TimeStampResponse, VerifierBuilder)
from rfc3161_client import VerificationError as Rfc3161VerificationError
from rfc3161_client.base import HashAlgorithm
from rfc3161_client.tsp import SignedData
from cryptography import x509
from cryptography.hazmat.primitives import serialization
@@ -60,7 +60,6 @@ class TimestampingEnricher(Enricher):
logger.debug(f"No hashes found")
return
hashes_fn = os.path.join(self.tmp_dir, "hashes.txt")
data_to_sign = "\n".join(hashes)
@@ -75,7 +74,7 @@ class TimestampingEnricher(Enricher):
logger.debug(f"Timestamping with {tsa_url=}")
signed: TimeStampResponse = self.sign_data(tsa_url, message)
# fail if there's any issue with the certificates, uses certifi list of trusted CAs or the user-defined `cert_authorities`
root_cert = self.verify_signed(signed, message)
@@ -113,7 +112,7 @@ class TimestampingEnricher(Enricher):
f.write(timestamp_token)
return tst_path
def verify_signed(self, timestamp_response: TimeStampResponse, message: bytes) -> x509.Certificate:
def verify_signed(self, timestamp_response: TimeStampResponse, message: bytes) -> x509.Certificate:
"""
Verify a Signed Timestamp Response is trusted by a known Certificate Authority.
@@ -136,7 +135,7 @@ class TimestampingEnricher(Enricher):
if not cert_authorities:
raise ValueError(f"No trusted roots found in {trusted_root_path}.")
timestamp_certs = self.tst_certs(timestamp_response)
intermediate_certs = timestamp_certs[1:-1]
@@ -148,7 +147,7 @@ class TimestampingEnricher(Enricher):
message_hash = hashlib.sha256(message).digest()
else:
raise ValueError(f"Unsupported hash algorithm: {hash_algorithm}")
for certificate in cert_authorities:
builder = VerifierBuilder()
builder.add_root_certificate(certificate)
@@ -158,7 +157,6 @@ class TimestampingEnricher(Enricher):
verifier = builder.build()
try:
verifier.verify(timestamp_response, message_hash)
return certificate
@@ -171,23 +169,38 @@ class TimestampingEnricher(Enricher):
# see https://github.com/sigstore/sigstore-python/blob/99948d5b80525a5a104e904ffea58169dc6e0629/sigstore/_internal/timestamp.py#L84-L121
timestamp_request = (
TimestampRequestBuilder().data(bytes_data).nonce(nonce=True).build()
)
try:
TimestampRequestBuilder().data(bytes_data).nonce(nonce=True).build()
)
@retry(
wait_exponential_multiplier=1,
stop_max_attempt_number=2,
)
def sign_with_retry():
response = self.session.post(tsa_url, data=timestamp_request.as_bytes(), timeout=10)
response.raise_for_status()
return response
try:
response = sign_with_retry()
except requests.RequestException as e:
logger.error(f"Error while sending request to {tsa_url=}: {e}")
raise
@retry(
wait_exponential_multiplier=1,
stop_max_attempt_number=2,
)
def decode_with_retry(response):
return decode_timestamp_response(response.content)
# Check that we can parse the response but do not *verify* it
try:
timestamp_response = decode_timestamp_response(response.content)
timestamp_response = decode_with_retry(response)
except ValueError as e:
logger.error(f"Invalid timestamp response from server {tsa_url}: {e}")
raise
return timestamp_response
def tst_certs(self, tsp_response: TimeStampResponse):
signed_data: SignedData = tsp_response.signed_data
certs = [x509.load_der_x509_certificate(c) for c in signed_data.certificates]
@@ -196,7 +209,7 @@ class TimestampingEnricher(Enricher):
if len(certs) == 1:
return certs
while(len(ordered_certs) < len(certs)):
while (len(ordered_certs) < len(certs)):
if len(ordered_certs) == 0:
for cert in certs:
if not [c for c in certs if cert.subject == c.issuer]:
@@ -220,7 +233,7 @@ class TimestampingEnricher(Enricher):
cert_chain = []
for i, cert in enumerate(certificates):
cert_fn = os.path.join(self.tmp_dir, f"{i+1} {str(cert.serial_number)[:20]}.crt")
cert_fn = os.path.join(self.tmp_dir, f"{i + 1} {str(cert.serial_number)[:20]}.crt")
with open(cert_fn, "wb") as f:
f.write(cert.public_bytes(encoding=serialization.Encoding.PEM))
cert_chain.append(Media(filename=cert_fn).set("subject", cert.subject.get_attributes_for_oid(x509.NameOID.COMMON_NAME)[0].value))