mirror of
https://github.com/bellingcat/auto-archiver.git
synced 2026-06-07 19:08:30 +03:00
Remove unittest and switch to pytest fully
This commit is contained in:
@@ -75,3 +75,7 @@ repository = "https://github.com/bellingcat/auto-archiver"
|
|||||||
documentation = "https://github.com/bellingcat/auto-archiver"
|
documentation = "https://github.com/bellingcat/auto-archiver"
|
||||||
|
|
||||||
|
|
||||||
|
[tool.pytest.ini_options]
|
||||||
|
markers = [
|
||||||
|
"download: marks tests that download content from the network",
|
||||||
|
]
|
||||||
@@ -1,10 +1,6 @@
|
|||||||
import unittest
|
|
||||||
import tempfile
|
import tempfile
|
||||||
|
|
||||||
from auto_archiver.core.context import ArchivingContext
|
from auto_archiver.core.context import ArchivingContext
|
||||||
|
|
||||||
ArchivingContext.reset(full_reset=True)
|
ArchivingContext.reset(full_reset=True)
|
||||||
ArchivingContext.set_tmp_dir(tempfile.gettempdir())
|
ArchivingContext.set_tmp_dir(tempfile.gettempdir())
|
||||||
|
|
||||||
if __name__ == '__main__':
|
|
||||||
unittest.main()
|
|
||||||
@@ -1,4 +1,7 @@
|
|||||||
|
import pytest
|
||||||
|
|
||||||
from auto_archiver.core import Metadata
|
from auto_archiver.core import Metadata
|
||||||
|
from auto_archiver.core import Step
|
||||||
from auto_archiver.core.metadata import Metadata
|
from auto_archiver.core.metadata import Metadata
|
||||||
|
|
||||||
class TestArchiverBase(object):
|
class TestArchiverBase(object):
|
||||||
@@ -6,17 +9,12 @@ class TestArchiverBase(object):
|
|||||||
archiver_class = None
|
archiver_class = None
|
||||||
config = None
|
config = None
|
||||||
|
|
||||||
def setUp(self):
|
@pytest.fixture(autouse=True)
|
||||||
|
def setup_archiver(self):
|
||||||
assert self.archiver_class is not None, "self.archiver_class must be set on the subclass"
|
assert self.archiver_class is not None, "self.archiver_class must be set on the subclass"
|
||||||
assert self.config is not None, "self.config must be a dict set on the subclass"
|
assert self.config is not None, "self.config must be a dict set on the subclass"
|
||||||
self.archiver = self.archiver_class(self.config)
|
self.archiver = self.archiver_class(self.config)
|
||||||
|
|
||||||
def create_item(self, url, **kwargs):
|
|
||||||
item = Metadata().set_url(url)
|
|
||||||
for key, value in kwargs.items():
|
|
||||||
item.set(key, value)
|
|
||||||
return item
|
|
||||||
|
|
||||||
def assertValidResponseMetadata(self, test_response: Metadata, title: str, timestamp: str, status: str = ""):
|
def assertValidResponseMetadata(self, test_response: Metadata, title: str, timestamp: str, status: str = ""):
|
||||||
assert test_response is not False
|
assert test_response is not False
|
||||||
|
|
||||||
|
|||||||
@@ -1,10 +1,9 @@
|
|||||||
import pytest
|
import pytest
|
||||||
import unittest
|
|
||||||
|
|
||||||
from auto_archiver.archivers.bluesky_archiver import BlueskyArchiver
|
from auto_archiver.archivers.bluesky_archiver import BlueskyArchiver
|
||||||
from .test_archiver_base import TestArchiverBase
|
from .test_archiver_base import TestArchiverBase
|
||||||
|
|
||||||
class TestBlueskyArchiver(TestArchiverBase, unittest.TestCase):
|
class TestBlueskyArchiver(TestArchiverBase):
|
||||||
"""Tests Bluesky Archiver
|
"""Tests Bluesky Archiver
|
||||||
|
|
||||||
Note that these tests will download API responses from the bluesky API, so they may be slow.
|
Note that these tests will download API responses from the bluesky API, so they may be slow.
|
||||||
|
|||||||
@@ -1,4 +1,3 @@
|
|||||||
import unittest
|
|
||||||
import datetime
|
import datetime
|
||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
@@ -6,59 +5,33 @@ from auto_archiver.archivers.twitter_archiver import TwitterArchiver
|
|||||||
|
|
||||||
from .test_archiver_base import TestArchiverBase
|
from .test_archiver_base import TestArchiverBase
|
||||||
|
|
||||||
class TestTwitterArchiver(TestArchiverBase, unittest.TestCase):
|
class TestTwitterArchiver(TestArchiverBase):
|
||||||
|
|
||||||
archiver_class = TwitterArchiver
|
archiver_class = TwitterArchiver
|
||||||
config = {}
|
config = {}
|
||||||
|
@pytest.mark.parametrize("url, expected", [
|
||||||
|
("https://t.co/yl3oOJatFp", "https://www.bellingcat.com/category/resources/"), # t.co URL
|
||||||
|
("https://x.com/bellingcat/status/1874097816571961839", "https://x.com/bellingcat/status/1874097816571961839"), # x.com urls unchanged
|
||||||
|
("https://twitter.com/bellingcat/status/1874097816571961839", "https://twitter.com/bellingcat/status/1874097816571961839"), # twitter urls unchanged
|
||||||
|
("https://twitter.com/bellingcat/status/1874097816571961839?s=20&t=3d0g4ZQis7dCbSDg-mE7-w", "https://twitter.com/bellingcat/status/1874097816571961839"), # strip tracking params
|
||||||
|
("https://www.bellingcat.com/category/resources/", "https://www.bellingcat.com/category/resources/"), # non-twitter/x urls unchanged
|
||||||
|
("https://www.bellingcat.com/category/resources/?s=20&t=3d0g4ZQis7dCbSDg-mE7-w", "https://www.bellingcat.com/category/resources/?s=20&t=3d0g4ZQis7dCbSDg-mE7-w"), # shouldn't strip params from non-twitter/x URLs
|
||||||
|
])
|
||||||
|
def test_sanitize_url(self, url, expected):
|
||||||
|
assert expected == self.archiver.sanitize_url(url)
|
||||||
|
|
||||||
def test_sanitize_url(self):
|
@pytest.mark.parametrize("url, exptected_username, exptected_tweetid", [
|
||||||
|
("https://twitter.com/bellingcat/status/1874097816571961839", "bellingcat", "1874097816571961839"),
|
||||||
|
("https://x.com/bellingcat/status/1874097816571961839", "bellingcat", "1874097816571961839"),
|
||||||
|
("https://www.bellingcat.com/category/resources/", False, False)
|
||||||
|
])
|
||||||
|
|
||||||
# should expand t.co URLs
|
def test_get_username_tweet_id_from_url(self, url, exptected_username, exptected_tweetid):
|
||||||
t_co_url = "https://t.co/yl3oOJatFp"
|
|
||||||
t_co_resolved_url = "https://www.bellingcat.com/category/resources/"
|
|
||||||
assert t_co_resolved_url == self.archiver.sanitize_url(t_co_url)
|
|
||||||
|
|
||||||
# shouldn't alter valid x URLs
|
|
||||||
x_url = "https://x.com/bellingcat/status/1874097816571961839"
|
|
||||||
assert x_url == self.archiver.sanitize_url(x_url)
|
|
||||||
|
|
||||||
# shouldn't alter valid twitter.com URLs
|
|
||||||
twitter_url = "https://twitter.com/bellingcat/status/1874097816571961839"
|
|
||||||
assert twitter_url == self.archiver.sanitize_url(twitter_url)
|
|
||||||
|
|
||||||
# should strip tracking params
|
|
||||||
tracking_url = "https://twitter.com/bellingcat/status/1874097816571961839?s=20&t=3d0g4ZQis7dCbSDg-mE7-w"
|
|
||||||
assert "https://twitter.com/bellingcat/status/1874097816571961839" == self.archiver.sanitize_url(tracking_url)
|
|
||||||
|
|
||||||
# shouldn't alter non-twitter/x URLs
|
|
||||||
test_url = "https://www.bellingcat.com/category/resources/"
|
|
||||||
assert test_url == self.archiver.sanitize_url(test_url)
|
|
||||||
|
|
||||||
# shouldn't strip params from non-twitter/x URLs
|
|
||||||
test_url = "https://www.bellingcat.com/category/resources/?s=20&t=3d0g4ZQis7dCbSDg-mE7-w"
|
|
||||||
assert test_url == self.archiver.sanitize_url(test_url)
|
|
||||||
|
|
||||||
def test_get_username_tweet_id_from_url(self):
|
|
||||||
|
|
||||||
# test valid twitter URL
|
|
||||||
url = "https://twitter.com/bellingcat/status/1874097816571961839"
|
|
||||||
username, tweet_id = self.archiver.get_username_tweet_id(url)
|
username, tweet_id = self.archiver.get_username_tweet_id(url)
|
||||||
assert "bellingcat" == username
|
assert exptected_username == username
|
||||||
assert "1874097816571961839" == tweet_id
|
assert exptected_tweetid == tweet_id
|
||||||
|
|
||||||
# test valid x URL
|
|
||||||
url = "https://x.com/bellingcat/status/1874097816571961839"
|
|
||||||
username, tweet_id = self.archiver.get_username_tweet_id(url)
|
|
||||||
assert "bellingcat" == username
|
|
||||||
assert "1874097816571961839" == tweet_id
|
|
||||||
|
|
||||||
# test invalid URL
|
|
||||||
# TODO: should this return None, False or raise an exception? Right now it returns False
|
|
||||||
url = "https://www.bellingcat.com/category/resources/"
|
|
||||||
username, tweet_id = self.archiver.get_username_tweet_id(url)
|
|
||||||
assert not username
|
|
||||||
assert not tweet_id
|
|
||||||
|
|
||||||
def test_choose_variants(self):
|
def test_choose_variants(self):
|
||||||
# taken from the response for url https://x.com/bellingcat/status/1871552600346415571
|
# taken from the response for url https://x.com/bellingcat/status/1871552600346415571
|
||||||
variant_list = [{'content_type': 'application/x-mpegURL', 'url': 'https://video.twimg.com/ext_tw_video/1871551993677852672/pu/pl/ovWo7ux-bKROwYIC.m3u8?tag=12&v=e1b'},
|
variant_list = [{'content_type': 'application/x-mpegURL', 'url': 'https://video.twimg.com/ext_tw_video/1871551993677852672/pu/pl/ovWo7ux-bKROwYIC.m3u8?tag=12&v=e1b'},
|
||||||
@@ -68,25 +41,26 @@ class TestTwitterArchiver(TestArchiverBase, unittest.TestCase):
|
|||||||
]
|
]
|
||||||
chosen_variant = self.archiver.choose_variant(variant_list)
|
chosen_variant = self.archiver.choose_variant(variant_list)
|
||||||
assert chosen_variant == variant_list[3]
|
assert chosen_variant == variant_list[3]
|
||||||
|
|
||||||
def test_reverse_engineer_token(self):
|
@pytest.mark.parametrize("tweet_id, expected_token", [
|
||||||
|
("1874097816571961839", "4jjngwkifa"),
|
||||||
|
("1674700676612386816", "42586mwa3uv"),
|
||||||
|
("1877747914073620506", "4jv4aahw36n"),
|
||||||
|
("1876710769913450647", "4jruzjz5lux"),
|
||||||
|
("1346554693649113090", "39ibqxei7mo")
|
||||||
|
])
|
||||||
|
def test_reverse_engineer_token(self, tweet_id, expected_token):
|
||||||
# see Vercel's implementation here: https://github.com/vercel/react-tweet/blob/main/packages/react-tweet/src/api/fetch-tweet.ts#L27C1-L31C2
|
# see Vercel's implementation here: https://github.com/vercel/react-tweet/blob/main/packages/react-tweet/src/api/fetch-tweet.ts#L27C1-L31C2
|
||||||
# and the discussion here: https://github.com/JustAnotherArchivist/snscrape/issues/996#issuecomment-2211358215
|
# and the discussion here: https://github.com/JustAnotherArchivist/snscrape/issues/996#issuecomment-2211358215
|
||||||
|
|
||||||
for tweet_id, real_token in [
|
generated_token = self.archiver.generate_token(tweet_id)
|
||||||
("1874097816571961839", "4jjngwkifa"),
|
assert expected_token == generated_token
|
||||||
("1674700676612386816", "42586mwa3uv"),
|
|
||||||
("1877747914073620506", "4jv4aahw36n"),
|
|
||||||
("1876710769913450647", "4jruzjz5lux"),
|
|
||||||
("1346554693649113090", "39ibqxei7mo"),]:
|
|
||||||
generated_token = self.archiver.generate_token(tweet_id)
|
|
||||||
self.assertEqual(real_token, generated_token)
|
|
||||||
|
|
||||||
@pytest.mark.download
|
@pytest.mark.download
|
||||||
def test_youtube_dlp_archiver(self):
|
def test_youtube_dlp_archiver(self, make_item):
|
||||||
|
|
||||||
url = "https://x.com/bellingcat/status/1874097816571961839"
|
url = "https://x.com/bellingcat/status/1874097816571961839"
|
||||||
post = self.archiver.download_yt_dlp(self.create_item(url), url, "1874097816571961839")
|
post = self.archiver.download_yt_dlp(make_item(url), url, "1874097816571961839")
|
||||||
assert post
|
assert post
|
||||||
self.assertValidResponseMetadata(
|
self.assertValidResponseMetadata(
|
||||||
post,
|
post,
|
||||||
@@ -96,11 +70,11 @@ class TestTwitterArchiver(TestArchiverBase, unittest.TestCase):
|
|||||||
)
|
)
|
||||||
|
|
||||||
@pytest.mark.download
|
@pytest.mark.download
|
||||||
def test_syndication_archiver(self):
|
def test_syndication_archiver(self, make_item):
|
||||||
|
|
||||||
url = "https://x.com/bellingcat/status/1874097816571961839"
|
url = "https://x.com/bellingcat/status/1874097816571961839"
|
||||||
post = self.archiver.download_syndication(self.create_item(url), url, "1874097816571961839")
|
post = self.archiver.download_syndication(make_item(url), url, "1874097816571961839")
|
||||||
self.assertTrue(post)
|
assert post
|
||||||
self.assertValidResponseMetadata(
|
self.assertValidResponseMetadata(
|
||||||
post,
|
post,
|
||||||
"As 2024 comes to a close, here’s some examples of what Bellingcat investigated per month in our 10th year! 🧵",
|
"As 2024 comes to a close, here’s some examples of what Bellingcat investigated per month in our 10th year! 🧵",
|
||||||
@@ -108,23 +82,23 @@ class TestTwitterArchiver(TestArchiverBase, unittest.TestCase):
|
|||||||
)
|
)
|
||||||
|
|
||||||
@pytest.mark.download
|
@pytest.mark.download
|
||||||
def test_download_nonexistend_tweet(self):
|
def test_download_nonexistend_tweet(self, make_item):
|
||||||
# this tweet does not exist
|
# this tweet does not exist
|
||||||
url = "https://x.com/Bellingcat/status/17197025860711058"
|
url = "https://x.com/Bellingcat/status/17197025860711058"
|
||||||
response = self.archiver.download(self.create_item(url))
|
response = self.archiver.download(make_item(url))
|
||||||
self.assertFalse(response)
|
assert not response
|
||||||
|
|
||||||
@pytest.mark.download
|
@pytest.mark.download
|
||||||
def test_download_malformed_tweetid(self):
|
def test_download_malformed_tweetid(self, make_item):
|
||||||
# this tweet does not exist
|
# this tweet does not exist
|
||||||
url = "https://x.com/Bellingcat/status/1719702586071100058"
|
url = "https://x.com/Bellingcat/status/1719702586071100058"
|
||||||
response = self.archiver.download(self.create_item(url))
|
response = self.archiver.download(make_item(url))
|
||||||
self.assertFalse(response)
|
assert not response
|
||||||
|
|
||||||
@pytest.mark.download
|
@pytest.mark.download
|
||||||
def test_download_tweet_no_media(self):
|
def test_download_tweet_no_media(self, make_item):
|
||||||
|
|
||||||
item = self.create_item("https://twitter.com/MeCookieMonster/status/1617921633456640001?s=20&t=3d0g4ZQis7dCbSDg-mE7-w")
|
item = make_item("https://twitter.com/MeCookieMonster/status/1617921633456640001?s=20&t=3d0g4ZQis7dCbSDg-mE7-w")
|
||||||
post = self.archiver.download(item)
|
post = self.archiver.download(item)
|
||||||
|
|
||||||
self.assertValidResponseMetadata(
|
self.assertValidResponseMetadata(
|
||||||
@@ -135,36 +109,32 @@ class TestTwitterArchiver(TestArchiverBase, unittest.TestCase):
|
|||||||
)
|
)
|
||||||
|
|
||||||
@pytest.mark.download
|
@pytest.mark.download
|
||||||
def test_download_video(self):
|
def test_download_video(self, make_item):
|
||||||
url = "https://x.com/bellingcat/status/1871552600346415571"
|
url = "https://x.com/bellingcat/status/1871552600346415571"
|
||||||
|
post = self.archiver.download(make_item(url))
|
||||||
post = self.archiver.download(self.create_item(url))
|
|
||||||
self.assertValidResponseMetadata(
|
self.assertValidResponseMetadata(
|
||||||
post,
|
post,
|
||||||
"This month's Bellingchat Premium is with @KolinaKoltai. She reveals how she investigated a platform allowing users to create AI-generated child sexual abuse material and explains why it's crucial to investigate the people behind these services https://t.co/SfBUq0hSD0 https://t.co/rIHx0WlKp8",
|
"This month's Bellingchat Premium is with @KolinaKoltai. She reveals how she investigated a platform allowing users to create AI-generated child sexual abuse material and explains why it's crucial to investigate the people behind these services https://t.co/SfBUq0hSD0 https://t.co/rIHx0WlKp8",
|
||||||
datetime.datetime(2024, 12, 24, 13, 44, 46, tzinfo=datetime.timezone.utc)
|
datetime.datetime(2024, 12, 24, 13, 44, 46, tzinfo=datetime.timezone.utc)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@pytest.mark.xfail(reason="Currently failing, sensitive content requires logged in users/cookies - not yet implemented")
|
||||||
@pytest.mark.download
|
@pytest.mark.download
|
||||||
def test_download_sensitive_media(self):
|
@pytest.mark.parametrize("url, title, timestamp, image_hash", [
|
||||||
|
|
||||||
"""Download tweets with sensitive media
|
|
||||||
|
|
||||||
Note: currently failing, youtube-dlp requres logged in users + download_syndication requires logging in"""
|
|
||||||
|
|
||||||
test_data = [
|
|
||||||
("https://x.com/SozinhoRamalho/status/1876710769913450647", "ignore tweet, testing sensitivity warning nudity", datetime.datetime(2024, 12, 31, 14, 18, 33, tzinfo=datetime.timezone.utc), "image_hash"),
|
("https://x.com/SozinhoRamalho/status/1876710769913450647", "ignore tweet, testing sensitivity warning nudity", datetime.datetime(2024, 12, 31, 14, 18, 33, tzinfo=datetime.timezone.utc), "image_hash"),
|
||||||
("https://x.com/SozinhoRamalho/status/1876710875475681357", "ignore tweet, testing sensitivity warning violence", datetime.datetime(2024, 12, 31, 14, 18, 33, tzinfo=datetime.timezone.utc), "image_hash"),
|
("https://x.com/SozinhoRamalho/status/1876710875475681357", "ignore tweet, testing sensitivity warning violence", datetime.datetime(2024, 12, 31, 14, 18, 33, tzinfo=datetime.timezone.utc), "image_hash"),
|
||||||
("https://x.com/SozinhoRamalho/status/1876711053813227618", "ignore tweet, testing sensitivity warning sensitive", datetime.datetime(2024, 12, 31, 14, 18, 33, tzinfo=datetime.timezone.utc), "image_hash"),
|
("https://x.com/SozinhoRamalho/status/1876711053813227618", "ignore tweet, testing sensitivity warning sensitive", datetime.datetime(2024, 12, 31, 14, 18, 33, tzinfo=datetime.timezone.utc), "image_hash"),
|
||||||
("https://x.com/SozinhoRamalho/status/1876711141314801937", "ignore tweet, testing sensitivity warning nudity, violence, sensitivity", datetime.datetime(2024, 12, 31, 14, 18, 33, tzinfo=datetime.timezone.utc), "image_hash")
|
("https://x.com/SozinhoRamalho/status/1876711141314801937", "ignore tweet, testing sensitivity warning nudity, violence, sensitivity", datetime.datetime(2024, 12, 31, 14, 18, 33, tzinfo=datetime.timezone.utc), "image_hash"),
|
||||||
]
|
])
|
||||||
|
def test_download_sensitive_media(self, url, title, timestamp, image_hash, make_item):
|
||||||
|
|
||||||
for url, title, timestamp, image_hash in test_data:
|
"""Download tweets with sensitive media"""
|
||||||
post = self.archiver.download(self.create_item(url))
|
|
||||||
self.assertValidResponseMetadata(
|
post = self.archiver.download(make_item(url))
|
||||||
post,
|
self.assertValidResponseMetadata(
|
||||||
title,
|
post,
|
||||||
timestamp
|
title,
|
||||||
)
|
timestamp
|
||||||
assert len(post.media) == 1
|
)
|
||||||
assert post.media[0].hash == image_hash
|
assert len(post.media) == 1
|
||||||
|
assert post.media[0].hash == image_hash
|
||||||
12
tests/conftest.py
Normal file
12
tests/conftest.py
Normal file
@@ -0,0 +1,12 @@
|
|||||||
|
import pytest
|
||||||
|
from auto_archiver.core.metadata import Metadata
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def make_item():
|
||||||
|
def _make_item(url: str, **kwargs) -> Metadata:
|
||||||
|
item = Metadata().set_url(url)
|
||||||
|
for key, value in kwargs.items():
|
||||||
|
item.set(key, value)
|
||||||
|
return item
|
||||||
|
|
||||||
|
return _make_item
|
||||||
@@ -1,32 +1,22 @@
|
|||||||
import tempfile
|
|
||||||
import os
|
|
||||||
import unittest
|
|
||||||
|
|
||||||
from auto_archiver.databases.csv_db import CSVDb
|
from auto_archiver.databases.csv_db import CSVDb
|
||||||
from auto_archiver.core import Metadata
|
from auto_archiver.core import Metadata
|
||||||
|
|
||||||
|
|
||||||
|
def test_store_item(tmp_path):
|
||||||
|
"""Tests storing an item in the CSV database"""
|
||||||
|
|
||||||
class TestCSVdb(unittest.TestCase):
|
temp_db = tmp_path / "temp_db.csv"
|
||||||
|
db = CSVDb({
|
||||||
|
"csv_db": {"csv_file": temp_db.as_posix()}
|
||||||
|
})
|
||||||
|
|
||||||
def setUp(self):
|
item = Metadata().set_url("http://example.com").set_title("Example").set_content("Example content").success("my-archiver")
|
||||||
_, temp_db = tempfile.mkstemp(suffix="csv")
|
|
||||||
self.temp_db = temp_db
|
|
||||||
|
|
||||||
def tearDown(self):
|
db.done(item)
|
||||||
os.remove(self.temp_db)
|
|
||||||
|
|
||||||
def test_store_item(self):
|
with open(temp_db, "r", encoding="utf-8") as f:
|
||||||
db = CSVDb({
|
assert f.read().strip() == f"status,metadata,media\nmy-archiver: success,\"{{'_processed_at': {repr(item.get('_processed_at'))}, 'url': 'http://example.com', 'title': 'Example', 'content': 'Example content'}}\",[]"
|
||||||
"csv_db": {"csv_file": self.temp_db}
|
|
||||||
})
|
|
||||||
|
|
||||||
item = Metadata().set_url("http://example.com").set_title("Example").set_content("Example content").success("my-archiver")
|
# TODO: csv db doesn't have a fetch method - need to add it (?)
|
||||||
|
# assert db.fetch(item) == item
|
||||||
db.done(item)
|
|
||||||
|
|
||||||
with open(self.temp_db, "r") as f:
|
|
||||||
assert f.read().strip() == f"status,metadata,media\nmy-archiver: success,\"{{'_processed_at': {repr(item.get('_processed_at'))}, 'url': 'http://example.com', 'title': 'Example', 'content': 'Example content'}}\",[]"
|
|
||||||
|
|
||||||
# TODO: csv db doesn't have a fetch method - need to add it (?)
|
|
||||||
# assert db.fetch(item) == item
|
|
||||||
@@ -1,57 +1,55 @@
|
|||||||
from unittest import TestCase
|
import pytest
|
||||||
|
|
||||||
from auto_archiver.enrichers.hash_enricher import HashEnricher
|
from auto_archiver.enrichers.hash_enricher import HashEnricher
|
||||||
from auto_archiver.core import Metadata, Media
|
from auto_archiver.core import Metadata, Media
|
||||||
|
|
||||||
class TestHashEnricher(TestCase):
|
@pytest.mark.parametrize("algorithm, filename, expected_hash", [
|
||||||
def test_calculate_hash_sha256(self):
|
("SHA-256", "tests/data/testfile_1.txt", "1b4f0e9851971998e732078544c96b36c3d01cedf7caa332359d6f1d83567014"),
|
||||||
# test SHA-256
|
("SHA-256", "tests/data/testfile_2.txt", "60303ae22b998861bce3b28f33eec1be758a213c86c93c076dbe9f558c11c752"),
|
||||||
he = HashEnricher({"algorithm": "SHA-256", "chunksize": 1})
|
("SHA3-512", "tests/data/testfile_1.txt", "d2d8cc4f369b340130bd2b29b8b54e918b7c260c3279176da9ccaa37c96eb71735fc97568e892dc6220bf4ae0d748edb46bd75622751556393be3f482e6f794e"),
|
||||||
assert he.calculate_hash("tests/data/testfile_1.txt") == "1b4f0e9851971998e732078544c96b36c3d01cedf7caa332359d6f1d83567014"
|
("SHA3-512", "tests/data/testfile_2.txt", "e35970edaa1e0d8af7d948491b2da0450a49fd9cc1e83c5db4c6f175f9550cf341f642f6be8cfb0bfa476e4258e5088c5ad549087bf02811132ac2fa22b734c6")
|
||||||
assert he.calculate_hash("tests/data/testfile_2.txt") == "60303ae22b998861bce3b28f33eec1be758a213c86c93c076dbe9f558c11c752"
|
])
|
||||||
|
def test_calculate_hash(algorithm, filename, expected_hash):
|
||||||
|
# test SHA-256
|
||||||
|
he = HashEnricher({"algorithm": algorithm, "chunksize": 1})
|
||||||
|
assert he.calculate_hash(filename) == expected_hash
|
||||||
|
|
||||||
def test_calculate_hash_sha3_512(self):
|
def test_default_config_values():
|
||||||
# test SHA3-512
|
he = HashEnricher(config={})
|
||||||
he = HashEnricher({"algorithm": "SHA3-512", "chunksize": 1})
|
assert he.algorithm == "SHA-256"
|
||||||
assert he.calculate_hash("tests/data/testfile_1.txt") == "d2d8cc4f369b340130bd2b29b8b54e918b7c260c3279176da9ccaa37c96eb71735fc97568e892dc6220bf4ae0d748edb46bd75622751556393be3f482e6f794e"
|
assert he.chunksize == 16000000
|
||||||
assert he.calculate_hash("tests/data/testfile_2.txt") == "e35970edaa1e0d8af7d948491b2da0450a49fd9cc1e83c5db4c6f175f9550cf341f642f6be8cfb0bfa476e4258e5088c5ad549087bf02811132ac2fa22b734c6"
|
|
||||||
|
|
||||||
def test_default_config_values(self):
|
def test_invalid_chunksize():
|
||||||
he = HashEnricher(config={})
|
with pytest.raises(AssertionError):
|
||||||
assert he.algorithm == "SHA-256"
|
he = HashEnricher({"chunksize": "-100"})
|
||||||
assert he.chunksize == 16000000
|
|
||||||
|
|
||||||
def test_invalid_chunksize(self):
|
|
||||||
with self.assertRaises(AssertionError):
|
|
||||||
he = HashEnricher({"chunksize": "-100"})
|
|
||||||
|
|
||||||
def test_invalid_algorithm(self):
|
def test_invalid_algorithm():
|
||||||
with self.assertRaises(AssertionError):
|
with pytest.raises(AssertionError):
|
||||||
HashEnricher({"algorithm": "SHA-123"})
|
HashEnricher({"algorithm": "SHA-123"})
|
||||||
|
|
||||||
def test_config(self):
|
def test_config():
|
||||||
# test default config
|
# test default config
|
||||||
c = HashEnricher.configs()
|
c = HashEnricher.configs()
|
||||||
assert c["algorithm"]["default"] == "SHA-256"
|
assert c["algorithm"]["default"] == "SHA-256"
|
||||||
assert c["chunksize"]["default"] == 16000000
|
assert c["chunksize"]["default"] == 16000000
|
||||||
assert c["algorithm"]["choices"] == ["SHA-256", "SHA3-512"]
|
assert c["algorithm"]["choices"] == ["SHA-256", "SHA3-512"]
|
||||||
assert c["algorithm"]["help"] == "hash algorithm to use"
|
assert c["algorithm"]["help"] == "hash algorithm to use"
|
||||||
assert c["chunksize"]["help"] == "number of bytes to use when reading files in chunks (if this value is too large you will run out of RAM), default is 16MB"
|
assert c["chunksize"]["help"] == "number of bytes to use when reading files in chunks (if this value is too large you will run out of RAM), default is 16MB"
|
||||||
|
|
||||||
def test_hash_media(self):
|
def test_hash_media():
|
||||||
|
|
||||||
he = HashEnricher({"algorithm": "SHA-256", "chunksize": 1})
|
he = HashEnricher({"algorithm": "SHA-256", "chunksize": 1})
|
||||||
|
|
||||||
# generate metadata with two test files
|
# generate metadata with two test files
|
||||||
m = Metadata().set_url("https://example.com")
|
m = Metadata().set_url("https://example.com")
|
||||||
|
|
||||||
# noop - the metadata has no media. Shouldn't fail
|
# noop - the metadata has no media. Shouldn't fail
|
||||||
he.enrich(m)
|
he.enrich(m)
|
||||||
|
|
||||||
m.add_media(Media("tests/data/testfile_1.txt"))
|
m.add_media(Media("tests/data/testfile_1.txt"))
|
||||||
m.add_media(Media("tests/data/testfile_2.txt"))
|
m.add_media(Media("tests/data/testfile_2.txt"))
|
||||||
|
|
||||||
he.enrich(m)
|
he.enrich(m)
|
||||||
|
|
||||||
self.assertEqual(m.media[0].get("hash"), "SHA-256:1b4f0e9851971998e732078544c96b36c3d01cedf7caa332359d6f1d83567014")
|
assert m.media[0].get("hash") == "SHA-256:1b4f0e9851971998e732078544c96b36c3d01cedf7caa332359d6f1d83567014"
|
||||||
self.assertEqual(m.media[1].get("hash"), "SHA-256:60303ae22b998861bce3b28f33eec1be758a213c86c93c076dbe9f558c11c752")
|
assert m.media[1].get("hash") == "SHA-256:60303ae22b998861bce3b28f33eec1be758a213c86c93c076dbe9f558c11c752"
|
||||||
@@ -1,31 +1,17 @@
|
|||||||
import unittest
|
|
||||||
|
|
||||||
from auto_archiver.core.context import ArchivingContext
|
from auto_archiver.core.context import ArchivingContext
|
||||||
from auto_archiver.formatters.html_formatter import HtmlFormatter
|
from auto_archiver.formatters.html_formatter import HtmlFormatter
|
||||||
from auto_archiver.core import Metadata, Media
|
from auto_archiver.core import Metadata, Media
|
||||||
|
|
||||||
|
|
||||||
class TestHTMLFormatter(unittest.TestCase):
|
def test_format():
|
||||||
|
formatter = HtmlFormatter({})
|
||||||
|
metadata = Metadata().set("content", "Hello, world!").set_url('https://example.com')
|
||||||
|
|
||||||
def setUp(self):
|
final_media = formatter.format(metadata)
|
||||||
ArchivingContext.prev_algorithm = ArchivingContext.get("hash_enricher.algorithm", "")
|
assert isinstance(final_media, Media)
|
||||||
ArchivingContext.set("hash_enricher.algorithm", "SHA-256")
|
assert ".html" in final_media.filename
|
||||||
return super().setUp()
|
with open (final_media.filename, "r", encoding="utf-8") as f:
|
||||||
|
content = f.read()
|
||||||
def tearDown(self):
|
assert "Hello, world!" in content
|
||||||
ArchivingContext.set("hash_enricher.algorithm", ArchivingContext.prev_algorithm)
|
assert final_media.mimetype == "text/html"
|
||||||
del ArchivingContext.prev_algorithm
|
assert "SHA-256:" in final_media.get('hash')
|
||||||
return super().tearDown()
|
|
||||||
|
|
||||||
def test_format(self):
|
|
||||||
formatter = HtmlFormatter({})
|
|
||||||
metadata = Metadata().set("content", "Hello, world!").set_url('https://example.com')
|
|
||||||
|
|
||||||
final_media = formatter.format(metadata)
|
|
||||||
self.assertIsInstance(final_media, Media)
|
|
||||||
self.assertIn(".html", final_media.filename)
|
|
||||||
with open (final_media.filename, "r") as f:
|
|
||||||
content = f.read()
|
|
||||||
self.assertIn("Hello, world!", content)
|
|
||||||
self.assertEqual("text/html", final_media.mimetype)
|
|
||||||
self.assertIn("SHA-256:", final_media.get('hash'))
|
|
||||||
Reference in New Issue
Block a user