mirror of
https://github.com/bellingcat/tiktok-hashtag-analysis.git
synced 2026-06-07 19:08:32 +03:00
11
README.md
11
README.md
@@ -7,15 +7,14 @@ The tool helps to download posts and videos from TikTok for a given set of hasht
|
||||
## Pre-requisites
|
||||
1. Make sure you have Python 3.9 or a later version installed
|
||||
2. Install the tool with pip: `pip install tiktok-hashtag-analysis`
|
||||
1. or directly from the repo version: `pip install git+https://github.com/bellingcat/tiktok-hashtag-analysis`
|
||||
- Alternatively you can install directly from the latest version on GitHub: `pip install git+https://github.com/bellingcat/tiktok-hashtag-analysis`
|
||||
|
||||
You should now be ready to start using it.
|
||||
|
||||
|
||||
## About the tool
|
||||
### Command-line arguments
|
||||
```
|
||||
usage: tiktok-hashtag-analysis [-h] [--file FILE] [-d] [--number NUMBER] [-p] [-t] [--output-dir OUTPUT_DIR] [--config CONFIG] [--log LOG] [hashtags ...]
|
||||
usage: tiktok-hashtag-analysis [-h] [--file FILE] [-d] [--number NUMBER] [-p] [-t] [--output-dir OUTPUT_DIR] [--config CONFIG] [--log LOG] [--limit LIMIT] [-v] [hashtags ...]
|
||||
|
||||
Analyze hashtags within posts scraped from TikTok.
|
||||
|
||||
@@ -33,6 +32,8 @@ optional arguments:
|
||||
Directory to save scraped data and visualizations to
|
||||
--config CONFIG File name of configuration file to store TikTok credentials to
|
||||
--log LOG File to write logs to
|
||||
--limit LIMIT Maximum number of videos to download for each hashtag
|
||||
-v, --verbose Increase output verbosity
|
||||
```
|
||||
|
||||
### Structure of output data
|
||||
@@ -137,7 +138,7 @@ Assume we want to analyze the 20 most frequently co-occurring hashtags in the do
|
||||
To run the build-in tests in the `tests/` directory, first install the test dependency packages:
|
||||
|
||||
```
|
||||
pip install .[test]
|
||||
pip install .[dev]
|
||||
```
|
||||
|
||||
and then run the tests using the following command:
|
||||
@@ -146,4 +147,4 @@ and then run the tests using the following command:
|
||||
pytest
|
||||
```
|
||||
|
||||
This repo uses [black](https://github.com/psf/black) to format source code, please run the `black` command before submitting a PR.
|
||||
This repo uses [black](https://github.com/psf/black) to format source code and [mypy](https://mypy.readthedocs.io/en/stable/) for static type checking. Before submitting a pull request, please run both tools on the source code.
|
||||
|
||||
@@ -1,5 +0,0 @@
|
||||
seaborn==0.12.2
|
||||
matplotlib==3.7.2
|
||||
yt-dlp==2023.7.6
|
||||
TikTokApi==6.1.1
|
||||
requests==2.31.0
|
||||
47
setup.py
47
setup.py
@@ -1,30 +1,6 @@
|
||||
from setuptools import setup
|
||||
|
||||
|
||||
def read_requirements(filename: str):
|
||||
with open(filename) as requirements_file:
|
||||
import re
|
||||
|
||||
def fix_url_dependencies(req: str) -> str:
|
||||
"""Pip and setuptools disagree about how URL dependencies should be handled."""
|
||||
m = re.match(
|
||||
r"^(git\+)?(https|ssh)://(git@)?github\.com/([\w-]+)/(?P<name>[\w-]+)\.git",
|
||||
req,
|
||||
)
|
||||
if m is None:
|
||||
return req
|
||||
else:
|
||||
return f"{m.group('name')} @ {req}"
|
||||
|
||||
requirements = []
|
||||
for line in requirements_file:
|
||||
line = line.strip()
|
||||
if line.startswith("#") or len(line) <= 0:
|
||||
continue
|
||||
requirements.append(fix_url_dependencies(line))
|
||||
return requirements
|
||||
|
||||
|
||||
with open("README.md", "r", encoding="utf-8") as file:
|
||||
long_description = file.read()
|
||||
|
||||
@@ -45,10 +21,25 @@ setup(
|
||||
long_description_content_type="text/markdown",
|
||||
url="https://github.com/bellingcat/tiktok-hashtag-analysis",
|
||||
license="MIT License",
|
||||
# install_requires=read_requirements("requirements.txt"),
|
||||
# extras_require={"dev": read_requirements("dev-requirements.txt")},
|
||||
install_requires=["seaborn", "matplotlib", "TikTokApi", "requests", "yt_dlp"],
|
||||
extras_require={"test": ["pytest", "pytest-cov", "pytest-html", "pytest-metadata"]},
|
||||
install_requires=[
|
||||
"seaborn",
|
||||
"matplotlib",
|
||||
"TikTokApi",
|
||||
"requests",
|
||||
"yt_dlp",
|
||||
"tenacity",
|
||||
"msvc-runtime; os_name=='nt'",
|
||||
],
|
||||
extras_require={
|
||||
"dev": [
|
||||
"pytest",
|
||||
"pytest-cov",
|
||||
"pytest-html",
|
||||
"pytest-metadata",
|
||||
"black",
|
||||
"mypy",
|
||||
]
|
||||
},
|
||||
classifiers=[
|
||||
"Development Status :: 5 - Production/Stable",
|
||||
"Intended Audience :: Information Technology",
|
||||
|
||||
@@ -1,24 +0,0 @@
|
||||
import pytest
|
||||
|
||||
from tiktok_hashtag_analysis.auth import Authorization
|
||||
|
||||
MS_TOKEN = "thisisafakemstokenfortiktok"
|
||||
|
||||
|
||||
def test_auth_input(tmp_path, monkeypatch):
|
||||
config_file = tmp_path / ".tiktok"
|
||||
monkeypatch.setattr("builtins.input", lambda _: MS_TOKEN)
|
||||
auth = Authorization(config_file=config_file)
|
||||
auth.get_token()
|
||||
|
||||
assert auth.ms_token == MS_TOKEN
|
||||
|
||||
|
||||
def test_auth(tmp_path):
|
||||
config_file = tmp_path / ".tiktok"
|
||||
auth = Authorization(config_file=config_file)
|
||||
|
||||
auth.dump_token(ms_token=MS_TOKEN)
|
||||
auth.get_token()
|
||||
|
||||
assert auth.ms_token == MS_TOKEN
|
||||
@@ -3,7 +3,7 @@ from tiktok_hashtag_analysis.base import TikTokDownloader, load_hashtags_from_fi
|
||||
|
||||
def test_scrape(tmp_path, hashtags):
|
||||
downloader = TikTokDownloader(hashtags=hashtags[:1], data_dir=tmp_path)
|
||||
downloader.run(download=True, plot=True, table=True, number=20)
|
||||
downloader.run(limit=1000, download=True, plot=True, table=True, number=20)
|
||||
|
||||
|
||||
def test_load_hashtags_from_file(tmp_path, hashtags):
|
||||
|
||||
60
tests/cli.py
60
tests/cli.py
@@ -1,23 +1,33 @@
|
||||
import os
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
from tiktok_hashtag_analysis.cli import create_parser
|
||||
from tiktok_hashtag_analysis.cli import (
|
||||
create_parser,
|
||||
process_output_dir,
|
||||
DEFAULT_OUTPUT_DIR,
|
||||
)
|
||||
|
||||
ARGUMENTS = [
|
||||
PARSER_ARGUMENTS = [
|
||||
("file", "hashtags.txt", "--file"),
|
||||
("download", True, "--download"),
|
||||
("download", True, "-d"),
|
||||
("limit", 1000, "--limit"),
|
||||
("number", 20, "--number"),
|
||||
("plot", True, "--plot"),
|
||||
("plot", True, "-p"),
|
||||
("table", True, "--table"),
|
||||
("table", True, "-t"),
|
||||
("verbose", True, "--verbose"),
|
||||
("verbose", True, "-v"),
|
||||
("output_dir", "/tmp/tiktok_download", "--output-dir"),
|
||||
("config", "~/.tiktok", "--config"),
|
||||
("log", "../logfile.log", "--log"),
|
||||
]
|
||||
|
||||
|
||||
@pytest.mark.parametrize("attribute,value,flag", ARGUMENTS)
|
||||
@pytest.mark.parametrize("attribute,value,flag", PARSER_ARGUMENTS)
|
||||
def test_parser(hashtags, attribute, value, flag):
|
||||
argument_list = [*hashtags, flag]
|
||||
|
||||
@@ -29,3 +39,47 @@ def test_parser(hashtags, attribute, value, flag):
|
||||
|
||||
assert args.get(attribute) == value
|
||||
assert args.get("hashtags") == hashtags
|
||||
|
||||
|
||||
def test_process_output_dir(monkeypatch, tmp_path):
|
||||
home_dir = Path.home().resolve()
|
||||
|
||||
# Specified nonexistent output directory without write permissions
|
||||
parser = create_parser()
|
||||
specified_output_dir = home_dir.parent / "test"
|
||||
with pytest.raises(SystemExit) as system_exit:
|
||||
result = process_output_dir(
|
||||
specified_output_dir=specified_output_dir, parser=parser
|
||||
)
|
||||
assert system_exit.type == SystemExit
|
||||
|
||||
# Specified existing output directory without write permissions
|
||||
parser = create_parser()
|
||||
specified_output_dir = home_dir.parent
|
||||
with pytest.raises(SystemExit) as system_exit:
|
||||
result = process_output_dir(
|
||||
specified_output_dir=specified_output_dir, parser=parser
|
||||
)
|
||||
assert system_exit.type == SystemExit
|
||||
|
||||
# Unspecified, in current directory without write permissions
|
||||
cwd = os.getcwd()
|
||||
monkeypatch.chdir(specified_output_dir)
|
||||
result = process_output_dir(specified_output_dir=None, parser=parser)
|
||||
monkeypatch.chdir(cwd)
|
||||
assert result == DEFAULT_OUTPUT_DIR
|
||||
|
||||
# Specified nonexisting output directory with write permissions
|
||||
parser = create_parser()
|
||||
specified_output_dir = tmp_path / "test" / "tiktok"
|
||||
result = process_output_dir(
|
||||
specified_output_dir=specified_output_dir, parser=parser
|
||||
)
|
||||
assert result == specified_output_dir
|
||||
|
||||
# Unspecified, in current directory with write permissions
|
||||
cwd = os.getcwd()
|
||||
monkeypatch.chdir(specified_output_dir)
|
||||
result = process_output_dir(specified_output_dir=None, parser=parser)
|
||||
monkeypatch.chdir(cwd)
|
||||
assert result == DEFAULT_OUTPUT_DIR
|
||||
|
||||
@@ -1,72 +0,0 @@
|
||||
import os
|
||||
import configparser
|
||||
from pathlib import Path
|
||||
import logging
|
||||
from typing import Optional
|
||||
|
||||
|
||||
class Authorization:
|
||||
"""Handle authorization for TikTok, using the `msToken`."""
|
||||
|
||||
def __init__(self, config_file: Optional[str] = None):
|
||||
if config_file:
|
||||
self.config_file = Path(config_file)
|
||||
else:
|
||||
self.config_file = Path.home() / ".tiktok"
|
||||
|
||||
self.section = "TikTok"
|
||||
self.ms_token = None
|
||||
|
||||
def get_token(self) -> str:
|
||||
"""Load the "msToken" cookie taken from TikTok, which the scraper requires."""
|
||||
|
||||
# Step 1: check if MS_TOKEN is defined as environment variable
|
||||
if ms_token := os.environ.get("MS_TOKEN"):
|
||||
self.ms_token = ms_token
|
||||
logging.info("Loaded token from environment variable")
|
||||
|
||||
# Step 2: check if MS_TOKEN is defined in config file
|
||||
elif self.config_file.is_file():
|
||||
if ms_token := self.load_token():
|
||||
self.ms_token = ms_token
|
||||
logging.info(f"Loaded token from config file: {self.config_file}")
|
||||
|
||||
# Step 3: have user enter MS_TOKEN via terminal
|
||||
else:
|
||||
ms_token = self.input_token()
|
||||
self.dump_token(ms_token=ms_token)
|
||||
self.ms_token = ms_token
|
||||
logging.info(
|
||||
f"Loaded token from user input and saved to config file: {self.config_file}"
|
||||
)
|
||||
|
||||
return self.ms_token
|
||||
|
||||
def load_token(self) -> Optional[str]:
|
||||
"""Parse a config file and extract the token."""
|
||||
|
||||
config = configparser.ConfigParser()
|
||||
config.read(self.config_file)
|
||||
return config.get(section=self.section, option="MS_TOKEN", fallback=None)
|
||||
|
||||
def dump_token(self, ms_token: str):
|
||||
"""Write the token to a config file."""
|
||||
|
||||
config = configparser.ConfigParser()
|
||||
config.read(self.config_file)
|
||||
config.add_section(self.section)
|
||||
config.set(section=self.section, option="MS_TOKEN", value=ms_token)
|
||||
|
||||
with open(self.config_file, "w", encoding="utf-8") as f:
|
||||
config.write(f)
|
||||
|
||||
def input_token(self) -> str:
|
||||
"""Allow user to manually enter the token in the terminal."""
|
||||
|
||||
print(
|
||||
"\nPlease copy and paste your `msToken` cookie taken from your web browser when visiting the TikTok website. See [THIS VIDEO] for more information.\n"
|
||||
)
|
||||
|
||||
ms_token = input("msToken: ")
|
||||
|
||||
return ms_token
|
||||
@@ -7,21 +7,31 @@ import warnings
|
||||
import asyncio
|
||||
import logging
|
||||
import re
|
||||
from typing import List, Dict
|
||||
from urllib.error import HTTPError
|
||||
from typing import List, Dict, Optional
|
||||
|
||||
import yt_dlp
|
||||
from yt_dlp.utils import ExtractorError, DownloadError
|
||||
import requests
|
||||
import matplotlib.pyplot as plt
|
||||
import matplotlib.ticker as mtick
|
||||
import seaborn as sns
|
||||
|
||||
from tenacity import (
|
||||
retry,
|
||||
retry_if_exception_type,
|
||||
stop_after_attempt,
|
||||
TryAgain,
|
||||
wait_exponential,
|
||||
)
|
||||
from playwright._impl._api_types import Error
|
||||
from TikTokApi import TikTokApi
|
||||
|
||||
from .auth import Authorization
|
||||
|
||||
warnings.filterwarnings("ignore", message="Glyph (.*) missing from current font")
|
||||
sns.set_theme(style="darkgrid")
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def process_hashtag_list(hashtags: List[str]) -> List[str]:
|
||||
"""Convert a list of hashtags to a standard form (remove whitespace, make
|
||||
@@ -40,12 +50,14 @@ def load_hashtags_from_file(file: str) -> List[str]:
|
||||
return process_hashtag_list(hashtags=hashtags)
|
||||
|
||||
|
||||
async def _fetch_hashtag_data(hashtag: str, ms_token: str) -> List[Dict]:
|
||||
# Retry upon encountering transient playwright errors
|
||||
@retry(retry=retry_if_exception_type(Error), stop=stop_after_attempt(3))
|
||||
async def _fetch_hashtag_data(hashtag: str, limit: int) -> List[Dict]:
|
||||
"""Fetch data for videos containing a specified hashtag, asynchronously."""
|
||||
data = []
|
||||
async with TikTokApi() as api:
|
||||
await api.create_sessions(ms_tokens=[ms_token], num_sessions=1, sleep_after=3)
|
||||
async for video in api.hashtag(name=hashtag).videos(count=1000):
|
||||
await api.create_sessions(ms_tokens=[], num_sessions=1, sleep_after=3)
|
||||
async for video in api.hashtag(name=hashtag).videos(count=limit):
|
||||
data.append(video.as_dict)
|
||||
return data
|
||||
|
||||
@@ -63,22 +75,44 @@ def json_dump(file_path: Path, data: List):
|
||||
json.dump(obj=data, fp=f)
|
||||
|
||||
|
||||
@retry(wait=wait_exponential(multiplier=1, max=10))
|
||||
def _get(url: str) -> requests.Response:
|
||||
"""Safe version of requests.get that can handle timeouts and retries"""
|
||||
|
||||
r = requests.get(url=url, timeout=30)
|
||||
if r.status_code not in {200, 403}:
|
||||
raise TryAgain
|
||||
else:
|
||||
return r
|
||||
|
||||
|
||||
def download_file_and_save(url: str, filepath: Path):
|
||||
"""Download a file from a specified URL and write its contents to a file"""
|
||||
|
||||
r = _get(url=url)
|
||||
if r.status_code == 403:
|
||||
return
|
||||
ext = r.headers["Content-Type"].split("/")[-1]
|
||||
path_with_ext = filepath.with_suffix(f".{ext}")
|
||||
with open(path_with_ext, "wb") as f:
|
||||
f.write(r.content)
|
||||
logger.debug(f"Saved file to: {path_with_ext}")
|
||||
|
||||
|
||||
def download_gallery(video_data: Dict, video_dir: Path):
|
||||
"""yt-dlp doesn't seem to support downloading images from an image gallery,
|
||||
so this is a quick fix that likely will fail on edge cases."""
|
||||
"""yt-dlp doesn't support downloading images from an image gallery,
|
||||
so this downloads all images and audio files from image galleries."""
|
||||
|
||||
video_id = video_data["id"]
|
||||
# A small percentage of image galleries don't have an associated audio file
|
||||
if play_url := video_data["music"]["playUrl"]:
|
||||
r = requests.get(play_url)
|
||||
with open(video_dir / f"{video_id}.mp3", "wb") as f:
|
||||
f.write(r.content)
|
||||
filepath = video_dir / f"{video_id}"
|
||||
download_file_and_save(url=play_url, filepath=filepath)
|
||||
|
||||
for i, image in enumerate(video_data["imagePost"]["images"]):
|
||||
image_url = image["imageURL"]["urlList"][0]
|
||||
r = requests.get(image_url)
|
||||
ext = r.headers["Content-Type"].split("/")[-1]
|
||||
with open(video_dir / f"{video_id}_{i:02d}.{ext}", "wb") as f:
|
||||
f.write(r.content)
|
||||
filepath = video_dir / f"{video_id}_{i:02d}"
|
||||
download_file_and_save(url=image_url, filepath=filepath)
|
||||
|
||||
|
||||
def aggregate_cooccurring_hashtags(hashtag_file: Path) -> Counter:
|
||||
@@ -101,17 +135,29 @@ def aggregate_cooccurring_hashtags(hashtag_file: Path) -> Counter:
|
||||
class TikTokDownloader:
|
||||
"""Main class for scraping data from TikTok."""
|
||||
|
||||
def __init__(self, hashtags: List[str], data_dir: str, config_file: str = None):
|
||||
def __init__(
|
||||
self, hashtags: List[str], data_dir: Path, config_file: Optional[str] = None
|
||||
):
|
||||
self.hashtags = process_hashtag_list(hashtags)
|
||||
logging.info(f"Hashtags to scrape: {hashtags}")
|
||||
|
||||
self.data_dir = Path(data_dir)
|
||||
os.makedirs(self.data_dir, exist_ok=True)
|
||||
|
||||
self.auth = Authorization(config_file=config_file)
|
||||
self.ms_token = self.auth.get_token()
|
||||
self.prioritize_hashtags()
|
||||
logger.info(f"Hashtags to scrape: {self.hashtags}")
|
||||
logger.info(f"Writing data to directory: {self.data_dir}")
|
||||
|
||||
def get_hashtag_posts(self, hashtag: str):
|
||||
def prioritize_hashtags(self):
|
||||
"""Order hashtags based on whether they've been scraped before, and
|
||||
the time they were most recently scraped"""
|
||||
|
||||
last_edited = {
|
||||
file.parts[-2]: file.lstat().st_mtime
|
||||
for file in self.data_dir.glob("*/posts.json")
|
||||
}
|
||||
self.hashtags.sort(key=lambda h: last_edited.get(h, 0))
|
||||
|
||||
def get_hashtag_posts(self, hashtag: str, limit: int):
|
||||
"""Fetch data about posts that used a specified hashtag and merge with
|
||||
existing data, if it exists."""
|
||||
|
||||
@@ -122,31 +168,30 @@ class TikTokDownloader:
|
||||
# If there are previously scraped posts, load them
|
||||
if hashtag_file.is_file():
|
||||
already_fetched_data = json_load(file_path=hashtag_file)
|
||||
already_fetched_ids = set(video["id"] for video in already_fetched_data)
|
||||
else:
|
||||
already_fetched_ids = set()
|
||||
already_fetched_data = []
|
||||
already_fetched_ids = set(video["id"] for video in already_fetched_data)
|
||||
|
||||
# Scrape posts that use the specified hashtag
|
||||
fetched_data = asyncio.run(
|
||||
_fetch_hashtag_data(hashtag=hashtag, ms_token=self.ms_token)
|
||||
)
|
||||
fetched_data = asyncio.run(_fetch_hashtag_data(hashtag=hashtag, limit=limit))
|
||||
fetched_ids = set(video["id"] for video in fetched_data)
|
||||
|
||||
if len(fetched_data) == 0:
|
||||
logging.warning(f"No posts were found for the hashtag: {hashtag}")
|
||||
logger.warning(f"No posts were found for the hashtag: {hashtag}")
|
||||
|
||||
# Determine which newly scraped posts haven't been scraped before
|
||||
new_fetched_data = [
|
||||
video for video in fetched_data if video["id"] not in already_fetched_ids
|
||||
old_fetched_data = [
|
||||
video for video in already_fetched_data if video["id"] not in fetched_ids
|
||||
]
|
||||
if len(new_fetched_data) == 0:
|
||||
logging.warning(f"No new posts were found for the hashtag: {hashtag}")
|
||||
new_post_count = len(fetched_ids - already_fetched_ids)
|
||||
old_post_count = len(already_fetched_ids)
|
||||
|
||||
# Merge new and old data and write to file
|
||||
all_fetched_data = already_fetched_data + new_fetched_data
|
||||
all_fetched_data = old_fetched_data + fetched_data
|
||||
json_dump(file_path=hashtag_file, data=all_fetched_data)
|
||||
logging.info(
|
||||
f"Scraped {len(new_fetched_data)} new posts containing the hashtag "
|
||||
f"'{hashtag}', with {len(already_fetched_data)} posts previously scraped"
|
||||
logger.info(
|
||||
f"Scraped {new_post_count} new posts containing the hashtag "
|
||||
f"'{hashtag}', with {old_post_count} posts previously scraped"
|
||||
)
|
||||
|
||||
def get_hashtag_videos(self, hashtag: str):
|
||||
@@ -167,10 +212,6 @@ class TikTokDownloader:
|
||||
new_video_list = [
|
||||
video for video in video_list if video["id"] not in already_downloaded_ids
|
||||
]
|
||||
if len(new_video_list) == 0:
|
||||
logging.warning(
|
||||
f"No new videos to be downloaded for the hashtag: {hashtag}"
|
||||
)
|
||||
|
||||
# Populate list of URLs to download using yt-dlp, and list of image
|
||||
# galleries to download using the `download_gallery` function
|
||||
@@ -178,6 +219,8 @@ class TikTokDownloader:
|
||||
galleries_to_download = []
|
||||
for video in new_video_list:
|
||||
if video.get("imagePost") is None:
|
||||
if video.get("author") is None:
|
||||
continue
|
||||
url = f"https://www.tiktok.com/@{video['author']['uniqueId']}/video/{video['id']}"
|
||||
urls_to_download.append(url)
|
||||
else:
|
||||
@@ -185,19 +228,29 @@ class TikTokDownloader:
|
||||
|
||||
# Download audio and image files for all image gallery posts
|
||||
if len(galleries_to_download) > 0:
|
||||
logging.info(f"Downloading image galleries for hashtag {hashtag}")
|
||||
logger.info(f"Downloading image galleries for hashtag {hashtag}")
|
||||
for video in galleries_to_download:
|
||||
logger.debug(f"Downloading image gallery for video: {video['id']}")
|
||||
download_gallery(video_data=video, video_dir=video_dir)
|
||||
|
||||
# Download video files for all video posts
|
||||
if len(urls_to_download) > 0:
|
||||
logging.info(f"Downloading media for hashtag {hashtag}")
|
||||
logger.info(f"Downloading media for hashtag {hashtag}")
|
||||
|
||||
ydl_opts = {
|
||||
"outtmpl": os.path.join(video_dir, "%(id)s.%(ext)s"),
|
||||
"ignore_errors": True,
|
||||
"quiet": logger.getEffectiveLevel() > logging.DEBUG,
|
||||
}
|
||||
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
|
||||
ydl.download(urls_to_download)
|
||||
for url in urls_to_download:
|
||||
try:
|
||||
ydl.download([url])
|
||||
except (HTTPError, TypeError, ExtractorError, DownloadError) as e:
|
||||
# Catch urllib and yt-dlp errors when video not found
|
||||
logger.warning(
|
||||
f"Encountered error {e} when attempting to download url: {url}"
|
||||
)
|
||||
|
||||
def frequency_table(self, hashtag: str, number: int):
|
||||
"""Print `number`-most commonly co-occurring hashtags for a specified
|
||||
@@ -226,10 +279,11 @@ class TikTokDownloader:
|
||||
|
||||
# Define labels and other fields used in plot
|
||||
total_posts = max(frequencies.values())
|
||||
frequencies.pop(hashtag)
|
||||
sorted_frequencices = frequencies.most_common(number)
|
||||
labels = [label for label, _ in sorted_frequencices[1:]]
|
||||
ratios = [freq / total_posts * 100 for _, freq in sorted_frequencices[1:]]
|
||||
y_pos = list(reversed(range(len(sorted_frequencices) - 1)))
|
||||
labels = [label for label, _ in sorted_frequencices]
|
||||
ratios = [freq / total_posts * 100 for _, freq in sorted_frequencices]
|
||||
y_pos = list(reversed(range(len(sorted_frequencices))))
|
||||
|
||||
# Visualize data in bar chart
|
||||
fig, ax = plt.subplots(figsize=(5, 6.66))
|
||||
@@ -247,21 +301,18 @@ class TikTokDownloader:
|
||||
plot_file = self.data_dir / hashtag / "plots" / f"{hashtag}__{current_time}.png"
|
||||
plot_file.parent.mkdir(exist_ok=True, parents=True)
|
||||
plt.savefig(plot_file, bbox_inches="tight", facecolor="white", dpi=300)
|
||||
logging.info(f"Plot saved to file: {plot_file}")
|
||||
logger.info(f"Plot saved to file: {plot_file}")
|
||||
|
||||
def run(self, download: bool, plot: bool, table: bool, number: int):
|
||||
def run(self, limit: int, download: bool, plot: bool, table: bool, number: int):
|
||||
"""Execute the specified operations on all specified hashtags."""
|
||||
|
||||
# Scrape all specified hashtags and perform analyses, depending on if
|
||||
# `--table` and `--plot` flags are used in the command
|
||||
# `--table`, `--plot`, and `--download` flags are used in the command
|
||||
for hashtag in self.hashtags:
|
||||
self.get_hashtag_posts(hashtag=hashtag)
|
||||
self.get_hashtag_posts(hashtag=hashtag, limit=limit)
|
||||
if plot:
|
||||
self.plot(hashtag=hashtag, number=number)
|
||||
if table:
|
||||
self.frequency_table(hashtag=hashtag, number=number)
|
||||
|
||||
# Download media for all hashtags if `--download` flag is used in the command
|
||||
for hashtag in self.hashtags:
|
||||
if download:
|
||||
self.get_hashtag_videos(hashtag=hashtag)
|
||||
|
||||
@@ -1,9 +1,14 @@
|
||||
import os
|
||||
import logging
|
||||
import argparse
|
||||
from pathlib import Path
|
||||
|
||||
from typing import Optional
|
||||
from .base import TikTokDownloader, load_hashtags_from_file
|
||||
|
||||
DEFAULT_OUTPUT_DIR = Path.home() / "tiktok_hashtag_data"
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def create_parser():
|
||||
"""Create parser tp parse input command-line arguments."""
|
||||
@@ -51,7 +56,7 @@ def create_parser():
|
||||
"--output-dir",
|
||||
type=str,
|
||||
help="Directory to save scraped data and visualizations to",
|
||||
default=Path(".").resolve().parent / "data",
|
||||
default=None,
|
||||
)
|
||||
parser.add_argument(
|
||||
"--config",
|
||||
@@ -60,10 +65,45 @@ def create_parser():
|
||||
default=None,
|
||||
)
|
||||
parser.add_argument("--log", type=str, help="File to write logs to", default=None)
|
||||
parser.add_argument(
|
||||
"--limit",
|
||||
type=int,
|
||||
help="Maximum number of videos to download for each hashtag",
|
||||
default=1000,
|
||||
)
|
||||
parser.add_argument(
|
||||
"-v",
|
||||
"--verbose",
|
||||
help="Increase output verbosity",
|
||||
action="store_true",
|
||||
)
|
||||
|
||||
return parser
|
||||
|
||||
|
||||
def process_output_dir(
|
||||
specified_output_dir: Optional[str], parser: argparse.ArgumentParser
|
||||
) -> Path:
|
||||
"""Make sure the output directory can be created or has write permissions."""
|
||||
|
||||
error_message = (
|
||||
lambda _output_dir: f"You don't have write permissions for the specified output directory (`{_output_dir}`). Please specify an output directory that you have write access to."
|
||||
)
|
||||
|
||||
if specified_output_dir is None:
|
||||
return DEFAULT_OUTPUT_DIR
|
||||
else:
|
||||
_output_dir = Path(specified_output_dir).resolve()
|
||||
try:
|
||||
os.makedirs(_output_dir, exist_ok=True)
|
||||
if not os.access(path=_output_dir, mode=os.W_OK):
|
||||
parser.error(error_message(_output_dir))
|
||||
else:
|
||||
return _output_dir
|
||||
except PermissionError:
|
||||
parser.error(error_message(_output_dir))
|
||||
|
||||
|
||||
def main():
|
||||
"""Parse and process command-line arguments, scrape specified hashtags, and perform specified analyses."""
|
||||
|
||||
@@ -71,7 +111,7 @@ def main():
|
||||
args = parser.parse_args()
|
||||
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
level=logging.DEBUG if args.verbose else logging.INFO,
|
||||
filename=args.log,
|
||||
format="%(asctime)s %(levelname)s | %(message)s",
|
||||
datefmt="%Y-%m-%d %H:%M:%S",
|
||||
@@ -89,12 +129,18 @@ def main():
|
||||
else:
|
||||
hashtags = args.hashtags
|
||||
|
||||
output_dir = process_output_dir(specified_output_dir=args.output_dir, parser=parser)
|
||||
|
||||
downloader = TikTokDownloader(
|
||||
hashtags=hashtags, data_dir=args.output_dir, config_file=args.config
|
||||
hashtags=hashtags, data_dir=output_dir, config_file=args.config
|
||||
)
|
||||
|
||||
downloader.run(
|
||||
download=args.download, plot=args.plot, table=args.table, number=args.number
|
||||
limit=args.limit,
|
||||
download=args.download,
|
||||
plot=args.plot,
|
||||
table=args.table,
|
||||
number=args.number,
|
||||
)
|
||||
|
||||
|
||||
|
||||
@@ -2,7 +2,7 @@ _MAJOR = "2"
|
||||
_MINOR = "0"
|
||||
# On main and in a nightly release the patch should be one ahead of the last
|
||||
# released build.
|
||||
_PATCH = "0"
|
||||
_PATCH = "1"
|
||||
# This is mainly for nightly builds which have the suffix ".dev$DATE". See
|
||||
# https://semver.org/#is-v123-a-semantic-version for the semantics.
|
||||
_SUFFIX = ""
|
||||
|
||||
Reference in New Issue
Block a user