Fix bug ordering tsr that only have one cert + more unit tests

This commit is contained in:
Patrick Robertson
2025-03-11 15:44:04 +00:00
parent 2ffe124d95
commit 294033f156
11 changed files with 158 additions and 173 deletions

View File

@@ -15,25 +15,21 @@
"configs": {
"tsa_urls": {
"default": [
# [Adobe Approved Trust List] and [Windows Cert Store]
"http://timestamp.digicert.com",
# See https://github.com/trailofbits/rfc3161-client/issues/46 for a list of valid TSAs
# Full list of TSAs: https://gist.github.com/Manouchehri/fd754e402d98430243455713efada710
"http://timestamp.identrust.com",
# "https://timestamp.entrust.net/TSS/RFC3161sha2TS", # not valid for timestamping
# "https://timestamp.sectigo.com", # wait 15 seconds between each request.
# [Adobe: European Union Trusted Lists].
# "https://timestamp.sectigo.com/qualified", # wait 15 seconds between each request.
# [Windows Cert Store]
"http://timestamp.globalsign.com/tsa/r6advanced1",
# [Adobe: European Union Trusted Lists] and [Windows Cert Store]
# "http://ts.quovadisglobal.com/eu", # not valid for timestamping
# "http://tsa.belgium.be/connect", # self-signed certificate in certificate chain
# "https://timestamp.aped.gov.gr/qtss", # self-signed certificate in certificate chain
# "http://tsa.sep.bg", # self-signed certificate in certificate chain
# "http://tsa.izenpe.com", #unable to get local issuer certificate
# "http://kstamp.keynectis.com/KSign", # unable to get local issuer certificate
"http://tss.accv.es:8318/tsa",
"http://timestamp.ssl.trustwave.com", #timeouts
"http://zeitstempel.dfn.de",
"http://ts.ssl.com",
"http://tsa.izenpe.com",
"http://tsa.lex-persona.com/tsa",
"http://ca.signfiles.com/TSAServer.aspx",
"http://aloahacoin.chain-provider.com/tsa.aspx",
"http://tsa.sinpe.fi.cr/tsaHttp/",
"http://tsa.cra.ge/signserver/tsa?workerName=qtsa",
"http://tss.cnbs.gob.hn/TSS/HttpTspServer",
"http://dss.nowina.lu/pki-factory/tsa/good-tsa",
"https://freetsa.org/tsr",
],
"help": "List of RFC3161 Time Stamp Authorities to use, separate with commas if passed via the command line.",
},

View File

@@ -1,10 +1,11 @@
import os
from loguru import logger
from importlib.metadata import version
import hashlib
from slugify import slugify
import requests
from loguru import logger
from rfc3161_client import (
TimestampRequestBuilder,
@@ -23,7 +24,6 @@ from auto_archiver.core import Metadata, Media
from auto_archiver.version import __version__
class TimestampingEnricher(Enricher):
"""
Uses several RFC3161 Time Stamp Authorities to generate a timestamp token that will be preserved. This can be used to prove that a certain file existed at a certain time, useful for legal purposes, for example, to prove that a certain file was not tampered with after a certain date.
@@ -33,6 +33,8 @@ class TimestampingEnricher(Enricher):
See https://gist.github.com/Manouchehri/fd754e402d98430243455713efada710 for list of timestamp authorities.
"""
session = None
def setup(self):
self.session = requests.Session()
self.session.headers.update(
@@ -43,11 +45,12 @@ class TimestampingEnricher(Enricher):
}
)
def __del__(self) -> None:
def cleaup(self) -> None:
"""
Terminates the underlying network session.
"""
self.session.close()
if self.session:
self.session.close()
def enrich(self, to_enrich: Metadata) -> None:
url = to_enrich.get_url()
@@ -68,40 +71,47 @@ class TimestampingEnricher(Enricher):
hashes_media = Media(filename=hashes_fn)
timestamp_tokens = []
from slugify import slugify
for tsa_url in self.tsa_urls:
try:
message = bytes(data_to_sign, encoding='utf8')
print(tsa_url)
logger.debug(f"Timestamping {url=} with {tsa_url=}")
signed: TimeStampResponse = self.sign_data(tsa_url, message)
# fail if there's any issue with the certificates, uses certifi list of trusted CAs or the user-defined `cert_authorities`
root_cert = self.verify_signed(signed, message)
if not root_cert:
raise ValueError(f"No valid root certificate found for {tsa_url=}. Are you sure it's a trusted TSA? Or define an alternative trusted root with `cert_authorities`.")
raise ValueError(f"No valid root certificate found for {tsa_url=}. Are you sure it's a trusted TSA? Or define an alternative trusted root with `cert_authorities`. (tried: {self.cert_authorities or certifi.where()})")
# save the timestamping certificate
cert_chain = self.save_certificate(signed, root_cert)
# continue with saving the timestamp token
tst_fn = os.path.join(self.tmp_dir, f"timestamp_token_{slugify(tsa_url)}")
with open(tst_fn, "wb") as f:
f.write(signed)
timestamp_tokens.append(Media(filename=tst_fn).set("tsa", tsa_url).set("cert_chain", cert_chain))
timestamp_token_path = self.save_timestamp_token(signed.time_stamp_token(), tsa_url)
timestamp_tokens.append(Media(filename=timestamp_token_path).set("tsa", tsa_url).set("cert_chain", cert_chain))
except Exception as e:
logger.warning(f"Error while timestamping {url=} with {tsa_url=}: {e}")
if len(timestamp_tokens):
hashes_media.set("timestamp_authority_files", timestamp_tokens)
hashes_media.set("certifi v", version("certifi"))
hashes_media.set("tsp_client v", version("tsp_client"))
hashes_media.set("certvalidator v", version("certvalidator"))
hashes_media.set("rfc3161-client v", version("rfc3161_client"))
hashes_media.set("cryptography v", version("cryptography"))
to_enrich.add_media(hashes_media, id="timestamped_hashes")
to_enrich.set("timestamped", True)
logger.success(f"{len(timestamp_tokens)} timestamp tokens created for {url=}")
else:
logger.warning(f"No successful timestamps for {url=}")
def save_timestamp_token(self, timestamp_token: bytes, tsa_url: str) -> str:
"""
Takes a timestamp token, and saves it to a file with the TSA URL as part of the filename.
"""
tst_path = os.path.join(self.tmp_dir, f"timestamp_token_{slugify(tsa_url)}")
with open(tst_path, "wb") as f:
f.write(timestamp_token)
return tst_path
def verify_signed(self, timestamp_response: TimeStampResponse, message: bytes) -> x509.Certificate:
"""
Verify a Signed Timestamp Response is trusted by a known Certificate Authority.
@@ -127,10 +137,7 @@ class TimestampingEnricher(Enricher):
raise ValueError(f"No trusted roots found in {trusted_root_path}.")
timestamp_certs = self.tst_certs(timestamp_response)
intermediate_certs = []
for i, cert in enumerate(timestamp_certs): # cannot use list comprehension, it's a set
intermediate_certs.append(cert)
intermediate_certs = timestamp_certs[1:-1]
message_hash = None
hash_algorithm = timestamp_response.tst_info.message_imprint.hash_algorithm
@@ -155,10 +162,9 @@ class TimestampingEnricher(Enricher):
verifier.verify(timestamp_response, message_hash)
return certificate
except Rfc3161VerificationError as e:
logger.debug(f"Unable to verify Timestamp with CA {certificate.subject}: {e}")
continue
raise ValueError(f"No valid root certificate found in {trusted_root_path}.")
return None
def sign_data(self, tsa_url: str, bytes_data: bytes) -> TimeStampResponse:
# see https://github.com/sigstore/sigstore-python/blob/99948d5b80525a5a104e904ffea58169dc6e0629/sigstore/_internal/timestamp.py#L84-L121
@@ -186,15 +192,18 @@ class TimestampingEnricher(Enricher):
certs = [x509.load_der_x509_certificate(c) for c in signed_data.certificates]
# reorder the certs to be in the correct order
ordered_certs = []
if len(certs) == 1:
return certs
while(len(ordered_certs) < len(certs)):
if len(ordered_certs) == 0:
for cert in certs:
if not [c for c in certs if c.subject == cert.issuer]:
if not [c for c in certs if cert.subject == c.issuer]:
ordered_certs.append(cert)
break
else:
for cert in certs:
if cert.issuer == ordered_certs[-1].subject:
if cert.subject == ordered_certs[-1].issuer:
ordered_certs.append(cert)
break
return ordered_certs

View File

@@ -9,7 +9,7 @@ from typing import Dict, Tuple
import hashlib
import pytest
from auto_archiver.core.metadata import Metadata
from auto_archiver.core.metadata import Metadata, Media
from auto_archiver.core.module import ModuleFactory
# Test names inserted into this list will be run last. This is useful for expensive/costly tests
@@ -139,6 +139,12 @@ def mock_binary_dependencies(mocker):
mock_shutil_which.return_value = "/usr/bin/fake_binary"
return mock_shutil_which
@pytest.fixture
def sample_media(tmp_path) -> Media:
"""Fixture creating a Media object with temporary source file"""
src_file = tmp_path / "source.txt"
src_file.write_text("test content")
return Media(key="subdir/test.txt", filename=str(src_file))
@pytest.fixture
def sample_datetime():

View File

@@ -1,44 +0,0 @@
-----BEGIN CERTIFICATE-----
MIIHuDCCBaCgAwIBAgIQQAF/lJAVu6kSuFeWPUTs7jANBgkqhkiG9w0BAQsFADBK
MQswCQYDVQQGEwJVUzESMBAGA1UEChMJSWRlblRydXN0MScwJQYDVQQDEx5JZGVu
VHJ1c3QgQ29tbWVyY2lhbCBSb290IENBIDEwHhcNMjIwMzE2MjEwOTA1WhcNMzMw
NjEyMjEwOTA1WjBFMQswCQYDVQQGEwJVUzESMBAGA1UEChMJSWRlblRydXN0MSIw
IAYDVQQDExlUcnVzdElEIFRpbWVzdGFtcGluZyBDQSAzMIICIjANBgkqhkiG9w0B
AQEFAAOCAg8AMIICCgKCAgEAqGL0RYFG7mL0RgSXLynLNWhEVrhsKhrVL4rSG+NA
p4v8TbAP2YXsqWB8yZgj9DQ55AECnmQ2Uo/BqQSsI/AOr9ctqZykItmca/nGjKez
l1kZS2YoNc4Zjj+7QO9iNunclA06fBhI+iQHAam7isQLK3CwXRDLzKkMs7TisMoG
QOSd0M8P6YY/QOGYv/+tCxmfvUz2GjWzQQemgiuLjvGhPwo+hrcNzays9j0G7QtA
LkJ0KfvJS+guvCvSuEfDzt3BaPIpD2q6GYK+MUiNis3uwwngauyL4r048wdvUSsf
92Kyr6T1pAfjjPyVDNazf/w/BjzA6ewNevFVLNfE0DhQkXMmsNVGBzY5Phhlp5fb
TwsrD19K0FPgbGO/l/Zp2dheeiCbe09bxbhdeahSBtTVPca4Vu3Ljz+PRZjFodq7
+lziqqpqqCP/ikEnmK/QkxjCG7AkX384dxg7yb5jjtXOnP5Yv4SXuV4SNNVVUBJf
bLXyYAf3Q0Dal85ZxNQd0QNPQIsYWv9ttTMVc6sVErdfTBPw355St6bHz91LUoDD
0S/GjUif8LYhVlZGXlwjYmVZOb2Z7+DAamzjwVrSsrxGCJ66Coy1rKapJuHVsfGA
W44p2ioIEZT3s6nQJkCt7te4ab1iWzaydZGAYyBao0K7kfK3vSp4AXsE5t5y6dpH
Yo0CAwEAAaOCAp0wggKZMBIGA1UdEwEB/wQIMAYBAf8CAQAwDgYDVR0PAQH/BAQD
AgGGMIGJBggrBgEFBQcBAQR9MHswMAYIKwYBBQUHMAGGJGh0dHA6Ly9jb21tZXJj
aWFsLm9jc3AuaWRlbnRydXN0LmNvbTBHBggrBgEFBQcwAoY7aHR0cDovL3ZhbGlk
YXRpb24uaWRlbnRydXN0LmNvbS9yb290cy9jb21tZXJjaWFscm9vdGNhMS5wN2Mw
HwYDVR0jBBgwFoAU7UQZwNPwBovupHu+QucmVMiONnYwggFEBgNVHSAEggE7MIIB
NzAIBgZngQwBBAIwDQYLYIZIAYb5LwAGDQMwggEaBgtghkgBhvkvAAYNATCCAQkw
SgYIKwYBBQUHAgEWPmh0dHBzOi8vc2VjdXJlLmlkZW50cnVzdC5jb20vY2VydGlm
aWNhdGVzL3BvbGljeS90cy9pbmRleC5odG1sMIG6BggrBgEFBQcCAjCBrQyBqlRo
aXMgVHJ1c3RJRCBDZXJ0aWZpY2F0ZSBoYXMgYmVlbiBpc3N1ZWQgaW4gYWNjb3Jk
YW5jZSB3aXRoIElkZW5UcnVzdCdzIFRydXN0SUQgQ2VydGlmaWNhdGUgUG9saWN5
IGZvdW5kIGF0IGh0dHBzOi8vc2VjdXJlLmlkZW50cnVzdC5jb20vY2VydGlmaWNh
dGVzL3BvbGljeS90cy9pbmRleC5odG1sMEoGA1UdHwRDMEEwP6A9oDuGOWh0dHA6
Ly92YWxpZGF0aW9uLmlkZW50cnVzdC5jb20vY3JsL2NvbW1lcmNpYWxyb290Y2Ex
LmNybDAdBgNVHQ4EFgQUyjLwNnzHKtqRtXyHihG9uCJsvwkwEwYDVR0lBAwwCgYI
KwYBBQUHAwgwDQYJKoZIhvcNAQELBQADggIBACtnO4f6QB6v2yDFeld3Pa1H7Bmb
y2tSwzQ/5dB5WXmTZHlV433s7BDkpwGzK4fGBuTcx814uPUWWSwcL+f8bpfozBv6
p855j/AlKul1EiPMKkMlLtwdiK6sWT2x8qaTm4fMGbcpgUbQnxOo4BnzVUmgs3ep
m9/qXf9GaWXRz4maSWl4z3apD3X/5oMrriGWIiW6ivCq8bmOBUdm0I9quhW9Snk2
JAaqkVCjs06rqE3rRblyNdrSypGzo5eBT498aCfcvDPJX2/q2PMkLvkKoXtVJ1g4
sEwDQxm2sg8QMEd2GKo+X7TqOeF8An7KOPDq9v0xtSsF4+ufFrl43vl6v4uMey68
wOHv6VmaGpCtWk1e6lSq9jLQqRBBg8CMwpw6niVyvkrdh4Tvu+5HLrareZBp98PJ
2cQzrHk1SiPcyDxSlhbXRks/TgKXTicUm3pZsKZcTvCXHcqulN1eoQ3z9azMIuR7
RtdECz2RJUI6Xg//6ZdMMgaDnktMALgAyo9GgGGVimx4E2/aM5r0cm+RCrk956BT
qahEdiGLWjgjq8dAJe4XfLtp6EmGvsw650uBbDA0Mg9nlSCFdGTMFBKlw65o2oJa
enMysAvE/VC39ZIEOBfk2IOj2GTYlOgHi0iIBIZf7SqdjhTAtoW7U9TTZj8SHRen
/EEe1WEcfTawcOhJ
-----END CERTIFICATE-----

View File

@@ -1,39 +0,0 @@
-----BEGIN CERTIFICATE-----
MIIG4zCCBMugAwIBAgIQQAGSoSzKR+4rZmhPudMJdjANBgkqhkiG9w0BAQsFADBF
MQswCQYDVQQGEwJVUzESMBAGA1UEChMJSWRlblRydXN0MSIwIAYDVQQDExlUcnVz
dElEIFRpbWVzdGFtcGluZyBDQSAzMB4XDTI0MTAxODE5NDg0MFoXDTI2MDExNzE5
NDgzOVowRzELMAkGA1UEBhMCVVMxEjAQBgNVBAoTCUlkZW5UcnVzdDEkMCIGA1UE
AxMbVHJ1c3RJRCBUaW1lc3RhbXAgQXV0aG9yaXR5MIICIjANBgkqhkiG9w0BAQEF
AAOCAg8AMIICCgKCAgEA27BXl6MHeJwySCub5fd8rD4bWC2yOQE4hWDkw6YjMHMl
mKQVPIqD2+ezxhIK5GmwwNxDopo1ovsTMrF/KFsTExw0l4ZguJ+xs5W6qLrRVm7Z
TnkB6wMKaxR8+fdxtq9lnqRDnHtrGiGEQGOqVvUpAhnzioYa/5qWmqhRhdf7bRpI
1D71LC4UtWm1YG0PmJVSsE2ZqjB4rP7baj6hAlQxe0bvNwL/Bqy6D0QHNk2Xf2Vk
w7BLN7knh/SngAlzfumBg0cQff0C5DQgeyCQoM5/XBAyYZUQc//t2XVZPYiGUvOj
QNjLCbqOpRQf9JBaU/gh9Mf6Mbs6xh5qn4gKHpxOUMy+QfYAU+RiLhV4X0/brdqc
UOqRHte9gcv8yU4mqyMzbmHn5y09SkuhPOIhYE5Fd72XHeR8RnC9CI9V5NjVW370
iNZuO2R2hw3dwtlWFxJPM+jortoxO3BPoxmppp/EdEH60p5Sk12eeyx86zieLb1E
JS5gEvu+jYWwKKjCf3visWC4OJNzbknGr3WRVOI6UvjyKrck3hpCuOjLXST+pEiB
XJzbiHCR5UJn8mXJnPAvKabgCGk7/trqrqB3Rkd2l5rce7maoyn265r/3IRLZisf
QdBsfzQv6RAP/zBFQkzcE2Yl12M25k2B6kFZMLsTT60jBTt1W6utXM2T7DbOwO8C
AwEAAaOCAcswggHHMAwGA1UdEwEB/wQCMAAwDgYDVR0PAQH/BAQDAgeAMBYGA1Ud
JQEB/wQMMAoGCCsGAQUFBwMIMIGFBggrBgEFBQcBAQR5MHcwMAYIKwYBBQUHMAGG
JGh0dHA6Ly9jb21tZXJjaWFsLm9jc3AuaWRlbnRydXN0LmNvbTBDBggrBgEFBQcw
AoY3aHR0cDovL3ZhbGlkYXRpb24uaWRlbnRydXN0LmNvbS9jZXJ0cy90aW1lc3Rh
bXBpbmczLnA3YzAfBgNVHSMEGDAWgBTKMvA2fMcq2pG1fIeKEb24Imy/CTB/BgNV
HSAEeDB2MAgGBmeBDAEEAjANBgtghkgBhvkvAAYNATBbBgtghkgBhvkvAAYNAzBM
MEoGCCsGAQUFBwIBFj5odHRwczovL3NlY3VyZS5pZGVudHJ1c3QuY29tL2NlcnRp
ZmljYXRlcy9wb2xpY3kvdHMvaW5kZXguaHRtbDBGBgNVHR8EPzA9MDugOaA3hjVo
dHRwOi8vdmFsaWRhdGlvbi5pZGVudHJ1c3QuY29tL2NybC90aW1lc3RhbXBpbmcz
LmNybDAdBgNVHQ4EFgQUIYBfL7FSq7kLBlpOTWDaRGHtFgEwDQYJKoZIhvcNAQEL
BQADggIBAFIczKS3kFMFeX2WIC1uUj4Nvt+W2/kPNhuKukF2pOC+VcxrbbsTugTx
oO+X8J5JVX5sNP74p7YrkSY+dVPFQ+8rfJJeshSvJEQnt9DgsXPTRcU982OPmJCQ
QZY2Ux7xYfoEuWytuigyhMMoh+g5IzipQ56UCDY/sHY40SdLpSXg69RMy6y/L2Zm
9f4YPFrXPy7q3hGTemQHo+jmshg/hU/zIEjjfWx7uG223r0M5Ez9ks4y/EmtdT5l
KkF9RGALpKEBWIQaL+yi8X8NHRM1Qfs91GvEZe5wPri/5R9YvhiHWjizqxvDlqVW
ka6Mu13zbovM2vMppHJWYlvBTZ4z8vZBdNzN7fiTAdcd3lAl0A3wmpBIqrvFChhY
DX0Su1/3kA3X2PAJrbaZ7RNQ+Zjuz31T8QK0d1PyzHOZ/jK76f7Nb5Ic8fbALKzs
S+Yg3O4fmLqC0kERtL96xA/y+Y9oAJYMOOvrpBvpe1TdGZ5s3nRpLhWT4NEddkGg
AWodbBA3W0x5iKRWXAo+sATHTNXB6RGTIuF1/PC3R4fjbTMvpHXyW+bP+k8m+uIu
XLDMJ+XW5TZ01g6UGfg62ti4ohS0PyEHbnuEQDQogcqtqF0d8oihfcbDXOm/YgId
vhdRXJC/UZ0Q6Q6jtuj5uV76fvNU4j4FYKnXZqutzlCxeKn1odHN
-----END CERTIFICATE-----

View File

@@ -1,31 +0,0 @@
-----BEGIN CERTIFICATE-----
MIIFYDCCA0igAwIBAgIQCgFCgAAAAUUjyES1AAAAAjANBgkqhkiG9w0BAQsFADBK
MQswCQYDVQQGEwJVUzESMBAGA1UEChMJSWRlblRydXN0MScwJQYDVQQDEx5JZGVu
VHJ1c3QgQ29tbWVyY2lhbCBSb290IENBIDEwHhcNMTQwMTE2MTgxMjIzWhcNMzQw
MTE2MTgxMjIzWjBKMQswCQYDVQQGEwJVUzESMBAGA1UEChMJSWRlblRydXN0MScw
JQYDVQQDEx5JZGVuVHJ1c3QgQ29tbWVyY2lhbCBSb290IENBIDEwggIiMA0GCSqG
SIb3DQEBAQUAA4ICDwAwggIKAoICAQCnUBneP5k91DNG8W9RYYKyqU+PZ4ldhNlT
3Qwo2dfw/66VQ3KZ+bVdfIrBQuExUHTRgQ18zZshq0PirK1ehm7zCYofWjK9ouuU
+ehcCuz/mNKvcbO0U59Oh++SvL3sTzIwiEsXXlfEU8L2ApeN2WIrvyQfYo3fw7gp
S0l4PJNgiCL8mdo2yMKi1CxUAGc1bnO/AljwpN3lsKImesrgNqUZFvX9t++uP0D1
bVoE/c40yiTcdCMbXTMTEl3EASX2MN0CXZ/g1Ue9tOsbobtJSdifWwLziuQkkORi
T0/Br4sOdBeo0XKIanoBScy0RnnGF7HamB4HWfp1IYVl3ZBWzvurpWCdxJ35UrCL
vYf5jysjCiN2O/cz4ckA82n5S6LgTrx+kzmEB/dEcH7+B1rlsazRGMzyNeVJSQjK
Vsk9+w8YfYs7wRPCTY/JTw436R+hDmrfYi7LNQZReSzIJTj0+kuniVyc0uMNOYZK
dHzVWYfCP04MXFL0PfdSgvHqo6z9STQaKPNBiDoT7uje/5kdX7rL6B7yuVBgwDHT
c+XvvqDtMwt0viAgxGds8AgDelWAf0ZOlqf0Hj7h9tgJ4TNkK2PXMl6f+cB7D3hv
l7yTmvmcEpB4eoCHFddydJxVdHixuuFucAS6T6C6aMN7/zHwcz09lCqxC0EOoP5N
iGVreTO01wIDAQABo0IwQDAOBgNVHQ8BAf8EBAMCAQYwDwYDVR0TAQH/BAUwAwEB
/zAdBgNVHQ4EFgQU7UQZwNPwBovupHu+QucmVMiONnYwDQYJKoZIhvcNAQELBQAD
ggIBAA2ukDL2pkt8RHYZYR4nKM1eVO8lvOMIkPkp165oCOGUAFjvLi5+U1KMtlwH
6oi6mYtQlNeCgN9hCQCTrQ0U5s7B8jeUeLBfnLOic7iPBZM4zY0+sLj7wM+x8uwt
LRvM7Kqas6pgghstO8OEPVeKlh6cdbjTMM1gCIOQ045U8U1mwF10A0Cj7oV+wh93
nAbowacYXVKV7cndJZ5t+qntozo00Fl72u1Q8zW/7esUTTHHYPTa8Yec4kjixsU3
+wYQ+nVZZjFHKdp2mhzpgq7vmrlR94gjmmmVYjzlVYA211QC//G5Xc7UI2/YRYRK
W2XviQzdFKcgyxilJbQN+QHwotL0AMh0jqEqSI5l2xPE4iUXfeu+h1sXIFRRk0pT
AwvsXcoz7WL9RccvW9xYoIA55vrX/hMUpu09lEpCdNTDd1lzzY9GvlU47/rokTLq
l1gEIt44w8y8bckzOmoKaT+gyOpyj4xjhiO9bTyWnpXgSUyqorkqG5w2gXjtw+hG
4iZZRHUe2XWJUc0QhJ1hYMtd+ZciTY6Y5uN/9lu7rs3KSoFrXgvzUeF0K+l+J6fZ
mUlO+KWA2yUPHGNiiskzZ2s8EIPGrd6ozRaOjfAHN3Gf8qv8QfXBi+wAN10J5U6A
7/qxXDgGpRtK4dw4LTzcqx+QGtVKnO7RcGzM7vRX+Bi6hG6H
-----END CERTIFICATE-----

Binary file not shown.

View File

@@ -1,22 +1,103 @@
from pathlib import Path
import pytest
from auto_archiver.modules.timestamping_enricher.timestamping_enricher import TimestampingEnricher
from rfc3161_client import (
TimeStampResponse,
decode_timestamp_response,
)
import requests
from cryptography import x509
from auto_archiver.modules.timestamping_enricher.timestamping_enricher import TimestampingEnricher
from auto_archiver.core import Metadata, Media
@pytest.fixture
def timestamp_response() -> TimeStampResponse:
with open("tests/data/timestamping/timestamp_response.tsr", "rb") as f:
with open("tests/data/timestamping/valid_timestamp.tsr", "rb") as f:
return decode_timestamp_response(f.read())
@pytest.fixture
def wrong_order_timestamp_response() -> TimeStampResponse:
with open("tests/data/timestamping/rfc3161-client-issue-104.tsr", "rb") as f:
return decode_timestamp_response(f.read())
@pytest.fixture
def selfsigned_response() -> TimeStampResponse:
with open("tests/data/timestamping/self_signed.tsr", "rb") as f:
return decode_timestamp_response(f.read())
@pytest.fixture
def filehash():
return "4b7b4e39f12b8c725e6e603e6d4422500316df94211070682ef10260ff5759ef"
def test_full_enriching(setup_module, sample_media, mocker, timestamp_response, filehash):
mock_post = mocker.patch("requests.sessions.Session.post")
mock_post.return_value.status_code = 200
mock_decode_timestamp_response = mocker.patch("auto_archiver.modules.timestamping_enricher.timestamping_enricher.decode_timestamp_response")
mock_decode_timestamp_response.return_value = timestamp_response
tsp: TimestampingEnricher = setup_module("timestamping_enricher", {"tsa_urls": ["http://timestamp.identrust.com"]})
metadata = Metadata().set_url("https://example.com")
sample_media.set('hash', filehash)
metadata.add_media(sample_media)
tsp.enrich(metadata)
assert metadata.get('timestamped') == True
assert len(metadata.media) == 2 # the original 'sample_media' and the new 'timestamp_media'
timestamp_media = metadata.media[1]
assert timestamp_media.filename == f"{tsp.tmp_dir}/hashes.txt"
assert Path(timestamp_media.filename).read_text() == filehash
# we only have one authority file because we only used one TSA
assert len(timestamp_media.get('timestamp_authority_files')) == 1
timestamp_authority_file = timestamp_media.get('timestamp_authority_files')[0]
assert Path(timestamp_authority_file.filename).read_bytes() == timestamp_response.time_stamp_token()
cert_chain = timestamp_authority_file.get('cert_chain')
assert len(cert_chain) == 3
assert cert_chain[0].filename == f"{tsp.tmp_dir}/1 85078758028491331763.crt"
assert cert_chain[1].filename == f"{tsp.tmp_dir}/2 85078371663472981624.crt"
assert cert_chain[2].filename == f"{tsp.tmp_dir}/3 13298821034946342390.crt"
def test_full_enriching_multiple_tsa(setup_module, sample_media, mocker, timestamp_response, filehash):
mock_post = mocker.patch("requests.sessions.Session.post")
mock_post.return_value.status_code = 200
mock_decode_timestamp_response = mocker.patch("auto_archiver.modules.timestamping_enricher.timestamping_enricher.decode_timestamp_response")
mock_decode_timestamp_response.return_value = timestamp_response
tsp: TimestampingEnricher = setup_module("timestamping_enricher", {"tsa_urls": ["http://example.com/timestamp1", "http://example.com/timestamp2"]})
metadata = Metadata().set_url("https://example.com")
sample_media.set('hash', filehash)
metadata.add_media(sample_media)
tsp.enrich(metadata)
assert metadata.get('timestamped') == True
assert len(metadata.media) == 2 # the original 'sample_media' and the new 'timestamp_media'
timestamp_media = metadata.media[1]
assert len(timestamp_media.get('timestamp_authority_files')) == 2
for timestamp_token_media in timestamp_media.get('timestamp_authority_files'):
assert Path(timestamp_token_media.filename).read_bytes() == timestamp_response.time_stamp_token()
assert len(timestamp_token_media.get('cert_chain')) == 3
def test_enriching(setup_module, sample_media):
tsp: TimestampingEnricher = setup_module("timestamping_enricher")
# test the enrich method
metadata = Metadata().set_url("https://example.com")
sample_media.set('hash', "4b7b4e39f12b8c725e6e603e6d4422500316df94211070682ef10260ff5759ef")
metadata.add_media(sample_media)
tsp.enrich(metadata)
@pytest.mark.download
def test_fails_for_digicert(setup_module):
@@ -57,8 +138,8 @@ def test_verify_save(setup_module, timestamp_response):
cert_chain = tsp.save_certificate(timestamp_response, verified_root_cert)
assert len(cert_chain) == 3
assert cert_chain[0].filename == f"{tsp.tmp_dir}/1 85078371663472981624.crt"
assert cert_chain[1].filename == f"{tsp.tmp_dir}/2 85078758028491331763.crt"
assert cert_chain[0].filename == f"{tsp.tmp_dir}/1 85078758028491331763.crt"
assert cert_chain[1].filename == f"{tsp.tmp_dir}/2 85078371663472981624.crt"
assert cert_chain[2].filename == f"{tsp.tmp_dir}/3 13298821034946342390.crt"
@@ -70,7 +151,25 @@ def test_order_crt_correctly(setup_module, wrong_order_timestamp_response):
ordered_certs = tsp.tst_certs(wrong_order_timestamp_response)
assert len(ordered_certs) == 2
assert ordered_certs[0].subject.rfc4514_string() == "CN=TrustID Timestamping CA 3,O=IdenTrust,C=US"
assert ordered_certs[1].subject.rfc4514_string() == "CN=TrustID Timestamp Authority,O=IdenTrust,C=US"
assert ordered_certs[0].subject.rfc4514_string() == "CN=TrustID Timestamp Authority,O=IdenTrust,C=US"
assert ordered_certs[1].subject.rfc4514_string() == "CN=TrustID Timestamping CA 3,O=IdenTrust,C=US"
def test_invalid_tsa_404(setup_module, mocker):
tsp = setup_module("timestamping_enricher")
post_mock = mocker.patch("requests.sessions.Session.post")
post_mock.side_effect = Exception("error")
with pytest.raises(Exception, match="error"):
tsp.sign_data("http://bellingcat.com/", b"my-message")
@pytest.mark.download
def test_invalid_tsa_invalid_response(setup_module, mocker):
tsp = setup_module("timestamping_enricher")
with pytest.raises(requests.exceptions.HTTPError, match="404 Client Error"):
tsp.sign_data("http://bellingcat.com/page-not-found/", b"my-message")
def test_fail_on_selfsigned_cert(setup_module, selfsigned_response):
tsp = setup_module("timestamping_enricher")
root_cert = tsp.verify_signed(selfsigned_response, b"my-message")
assert root_cert is None

View File

@@ -18,22 +18,11 @@ def local_storage(setup_module) -> LocalStorage:
}
return setup_module("local_storage", configs)
@pytest.fixture
def sample_media(tmp_path) -> Media:
"""Fixture creating a Media object with temporary source file"""
src_file = tmp_path / "source.txt"
src_file.write_text("test content")
return Media(key="subdir/test.txt", filename=str(src_file))
def test_get_cdn_url_relative(local_storage):
media = Media(key="test.txt", filename="dummy.txt")
expected = os.path.join(local_storage.save_to, media.key)
assert local_storage.get_cdn_url(media) == expected
def test_get_cdn_url_absolute(local_storage):
media = Media(key="test.txt", filename="dummy.txt")
local_storage.save_absolute = True