Merge pull request #5 from bellingcat/even_more_tristan_edits

Finishing touches
This commit is contained in:
johannawild
2022-05-06 10:42:40 +02:00
committed by GitHub
7 changed files with 340 additions and 327 deletions

View File

@@ -22,17 +22,17 @@ You should now be ready to start using the tool.
## About the tool
### Command-line arguments
```
$ python run_downloader.py -h
python3 run_downloader.py --help
usage: run_downloader.py [-h] [-t [T [T ...]]] [-f F] [-p] [-v]
Download the tiktoks for the requested hashtags
optional arguments:
-h, --help show this help message and exit
-t [T [T ...]] List of hashtags
-f F File name with the list of hashtags
-p Download posts
-v Download videos
-h, --help show this help message and exit
-t [T [T ...]] List of hashtags to scrape
-f F File name containing list of hashtags to scrape
-p Download post data
-v Download video files
```
### Structure of output data
@@ -90,11 +90,11 @@ Note that video downloading is a time and data rate consuming task, as a result
The script `hashtag_frequencies.py` analyzes the frequencies of top occurring hashtags in a given set of posts.
```
python hashtag_frequencies.py --help
usage: hashtag_frequencies.py [-h] [-p] [-d] input_file n
$ python3 hashtag_frequencies.py --help
usage: hashtag_frequencies.py [-h] [-p] [-d] hashtag n
positional arguments:
input_file The json hashtag file name
hashtag The hashtag of scraped posts to analyze
n The number of top n occurrences
optional arguments:
@@ -107,7 +107,7 @@ Assume we want to analyze the 20 most frequently occurring hashtags in the downl
- The results can be plotted and saved as a PNG file by executing the following command:
`python3 hashtag_frequencies.py -p ../data/london/posts/data.json 20`
`python3 hashtag_frequencies.py london 20 -p`
which will produce a figure similar to that shown below:
<p align="center">
@@ -118,31 +118,31 @@ Assume we want to analyze the 20 most frequently occurring hashtags in the downl
- The results can be displayed in tabular form by executing the following command:
`python3 hashtag_frequencies.py -d ../data/london/posts/data.json 20`
`python3 hashtag_frequencies.py london 20 -d`
which will produce a terminal output similar to the following:
```
Rank Hashtag Occurrences Frequency
0 london 962 1.0
1 fyp 493 0.5124740124740125
2 uk 238 0.24740124740124741
3 foryou 223 0.23180873180873182
4 foryoupage 186 0.19334719334719336
5 viral 177 0.183991683991684
6 fypシ 85 0.08835758835758836
7 funny 55 0.057172557172557176
8 xyzbca 52 0.05405405405405406
9 england 45 0.04677754677754678
10 british 44 0.04573804573804574
11 trending 39 0.04054054054054054
12 fy 33 0.034303534303534305
13 comedy 32 0.033264033264033266
14 roadman 28 0.029106029106029108
15 4u 27 0.028066528066528068
16 usa 26 0.02702702702702703
17 tiktok 26 0.02702702702702703
18 travel 21 0.02182952182952183
19 america 20 0.02079002079002079
Rank Hashtag Occurrences Frequency
0 london 960 1.0000
1 fyp 494 0.5146
2 uk 238 0.2479
3 foryou 221 0.2302
4 foryoupage 184 0.1917
5 viral 179 0.1865
6 fypシ 84 0.0875
7 funny 56 0.0583
8 xyzbca 51 0.0531
9 british 45 0.0469
10 england 44 0.0458
11 trending 40 0.0417
12 fy 33 0.0344
13 comedy 32 0.0333
14 roadman 28 0.0292
15 4u 27 0.0281
16 usa 26 0.0271
17 tiktok 26 0.0271
18 travel 21 0.0219
19 america 20 0.0208
```
The `Frequency` column shows the ratio of the occurrence to the total number of downloaded posts.

View File

@@ -1,31 +1,34 @@
from typing import NamedTuple
import logging, logging.config
"""Utility functions that perform data processing related tasks.
"""
from typing import NamedTuple, List, Tuple, Set, Optional, Dict, Any
import logging
import file_methods
logging.config.fileConfig("../logging.config")
logger = logging.getLogger("Logger")
"""
The file contains several functions that perform data processing related tasks.
"""
logger = logging.getLogger()
class Diff(NamedTuple):
ids: list
"""Keep track of scraped post IDs and whether previously-scraped posts have been filtered."""
ids: Set[str]
filter_posts: bool
class Total(NamedTuple):
"""Keep track of number of total and number of unique scraped posts."""
total: int
unique: int
def get_difference(tag: str, file_name: str, ids: list) -> NamedTuple:
"""
Compares two sets of ids and returns the difference of the two sets.
Purpose - user to filter out the new ids by comparing the set of id list (ids/post_ids.json or videos_ids.json) and the list of newly downloaded ids.
def get_difference(tag: str, file_name: str, ids: List[str]) -> Optional[Diff]:
"""Find TikTok post IDs that haven't previously been scraped.
Filter out the new posts for the hashtag `tag` by comparing the list of
post IDs contained in `filename` to the list of newly downloaded IDs
contained in `ids`.
"""
filter_posts = False
current_id_data = file_methods.get_data(file_name)
@@ -38,23 +41,25 @@ def get_difference(tag: str, file_name: str, ids: list) -> NamedTuple:
if not new_ids:
return None
else:
new_ids = list(new_ids)
total_new_ids = len(new_ids)
if total_new_ids == total_current_ids:
filter_posts = False
new_data = Diff(new_ids, filter_posts)
else:
new_data = Diff(new_ids, filter_posts)
return new_data
else:
filter_posts = True
new_data = Diff(ids, filter_posts)
new_data = Diff(set(ids), filter_posts)
return new_data
def extract_posts(settings: dict, file_name: str, tag: str) -> list:
"""
Takes the downloaded file by the tiktok-scraper that contains the posts, and returns the new posts after comparing it the list of posts (from the file ids/post_ids.json) already downloaded.
def extract_posts(
settings: Dict[Any, Any], file_name: str, tag: str
) -> Optional[Tuple[List[str], List[Dict]]]:
"""Find TikTok posts that haven't previously been scraped.
Compares the file downloaded by tiktok-scraper to the list of
previously-scraped posts (from the file ids/post_ids.json).
"""
ids = []
posts = []
@@ -65,6 +70,7 @@ def extract_posts(settings: dict, file_name: str, tag: str) -> list:
if not ids:
logger.warn(f"No posts were found for the hashtag: {tag}")
return None
status = file_methods.check_existence(settings["post_ids"], "file")
if not status:
@@ -74,18 +80,19 @@ def extract_posts(settings: dict, file_name: str, tag: str) -> list:
new_ids = get_difference(tag, settings["post_ids"], ids)
if not new_ids:
logger.warn(f"No new posts were found for the hashtag: {tag}")
return None
elif new_ids.filter_posts:
new_posts = [post for post in posts if post["id"] in new_ids.ids]
new_data = (new_ids.ids, new_posts)
return new_data
return (list(new_ids.ids), new_posts)
else:
new_data = (new_ids.ids, posts)
return new_data
return (list(new_ids.ids), posts)
def extract_videos(settings: dict, tag: str, download_list: list) -> list:
"""
Tiktok-scraper downloads the videos and puts them in a folder - the list of ids of the downloaded videos is fed to this function as download_list. The function returns the set of new videos after comparing it the list of videos (from the file ids/videos_ids.json) already downloaded.
def extract_videos(settings: dict, tag: str, download_list: List[str]) -> List[str]:
"""Find TikTok videos that haven't previously been scraped.
Compares the file downloaded by tiktok-scraper to the list of
previously-scraped videos (from the file ids/video_ids.json).
"""
status = file_methods.check_existence(settings["video_ids"], "file")
if not status:
@@ -97,43 +104,44 @@ def extract_videos(settings: dict, tag: str, download_list: list) -> list:
logger.warn(
f"No new videos were found for the {tag} in the downloaded folder."
)
return None
return []
else:
return new_videos.ids
return list(new_videos.ids)
def update_posts(
file_path: str, file_type: str, new_data: list, tag: str = None
) -> tuple:
"""
Updates the list of post ids (in the file ids/post_ids.json) with the ids of the new posts.
file_path: str, file_type: str, new_data: List[Any], tag: str = None
) -> Optional[Tuple[str, int]]:
"""Update the file containing scraped post IDs (`ids/post_ids.json`) with
the IDs of the recently scraped posts.
"""
status = file_methods.check_existence(file_path, file_type)
if not tag:
file_methods.post_writer(file_path, new_data, status)
return None
else:
scraped_data = file_methods.id_writer(file_path, new_data, tag, status)
return scraped_data
def update_videos(settings: str, new_data: list, tag: str) -> tuple:
"""
Updates the list of video ids (in the file ids/video_ids.json) with the ids of the new videos.
def update_videos(
settings: Dict[str, Any], new_data: List[str], tag: str
) -> Tuple[str, int]:
"""Update the file containing video IDs (`ids/video_ids.json`) with the IDs
of the recently scraped videos.
"""
file_path = settings["video_ids"]
file_methods.check_file(file_path, "file")
log = file_methods.id_writer(file_path, new_data, tag, True)
number_scraped = file_methods.id_writer(file_path, new_data, tag, True)
file_methods.clean_video_files(settings, tag, new_data)
return log
return number_scraped
def get_total_posts(file_path: str, tag: str) -> NamedTuple:
"""
Returns total count of ids in a id list along with the number of unique ids among them.
"""
def get_total_posts(file_path: str, tag: str) -> Total:
"""Count number of total scraped posts and number of unique scraped posts."""
status = file_methods.check_existence(file_path, "file")
if not status:
raise OSError("{file_path} not found!")
raise OSError(f"{file_path} not found!")
else:
data = file_methods.get_data(file_path)
total_posts = len(data[tag])
@@ -143,9 +151,7 @@ def get_total_posts(file_path: str, tag: str) -> NamedTuple:
def print_total(file_path: str, tag: str, data_type: str):
"""
Prints the total count for posts or videos for a hashtag. Calls the function get_total_posts for sanity check that there are no repeating ids in the id lists.
"""
"""Print number of total and unique scraped posts, warn if any non-unique posts."""
total = get_total_posts(file_path, tag)
if total.total == total.unique:
logger.info(f"Scraped {total.total} {data_type} containing the hashtag '{tag}'")

View File

@@ -1,23 +1,21 @@
"""Utility functions that operate on files, such as writing to reading from a file.
"""
import os
import json
import subprocess
from datetime import datetime
import shutil
from typing import Tuple, List, Optional, Dict, Any
import logging, logging.config
logging.config.fileConfig("../logging.config")
logger = logging.getLogger("Logger")
"""
The file contains the functions that operate on files, such as writing or reading from files etc.
"""
logger = logging.getLogger()
def create_file(name: str, file_type: str):
"""
Creates a file or directory.
"""
"""Create a file or directory."""
if file_type == "dir":
os.makedirs(name, mode=0o777)
elif file_type == "file":
@@ -28,9 +26,7 @@ def create_file(name: str, file_type: str):
def check_existence(file_path: str, file_type: str):
"""
Checks the existence of a file or a directory. If not found, returns False, else returns True.
"""
"""Check if a file or a directory exists."""
if file_type == "file":
return os.path.isfile(file_path)
elif file_type == "dir":
@@ -40,85 +36,92 @@ def check_existence(file_path: str, file_type: str):
def check_file(file_path: str, file_type: str):
"""
Creates a file or directory, if not found. Else, returns nothing.
"""
"""If path does not exist, creates a file or directory."""
status = check_existence(file_path, file_type)
if not status:
create_file(file_path, file_type)
def download_posts(settings: dict, tag: str):
"""
Runs the tiktok-scraper command to download posts for a given hashtag.
Returns the path to the downloaded file of posts. If no file was downloaded, prints the error and returns nothing in order to move on.
os.chdir is used to execute shell commands in the right folders and then reused to come back to the original folder of execution of run_downloader script.
def download_posts(settings: Dict, tag: str, output_dir: Any):
"""Run the tiktok-scraper command to download posts for a given hashtag.
Returns the path to the downloaded file of posts. If no file was downloaded,
prints the error and returns nothing in order to move on.
os.chdir is used to execute shell commands in the correct folder and then
reused to return to the original folder of execution of run_downloader script.
"""
path = os.path.join(settings["data"], tag, settings["posts"])
os.chdir(path)
tiktok_command = f"tiktok-scraper hashtag {tag} -t 'json'"
os.makedirs(path, exist_ok=True)
tiktok_command = f"tiktok-scraper hashtag {tag} -t 'json' --filepath {output_dir}"
output = subprocess.check_output(tiktok_command, shell=True, encoding="utf-8")
new_file = output.split()[-1]
if "json" in new_file:
os.chdir("../../../tiktok_downloader")
return new_file
else:
logger.warn(
f"Something's wrong with what is returned by tiktok-scraper for the hashtag {tag} - *{new_file}* is not a json file.\n\ntiktok-scraper returned {output}"
)
os.chdir("../../../tiktok_downloader")
def download_videos(settings: dict, tag: str):
"""
Runs the tiktok-scraper command to download videos for a given hashtag. Note that all the videos are downloaded that are returned by the tiktok api and as a result, its a time and data consuming process.
The list of downloaded video ids is constucted and returned if the downloaded folder contains at least 1 video.
os.chdir is used to execute shell commands in the right folders and then reused to come back to the original folder of execution of run_downloader script.
def download_videos(settings: Dict, tag: str):
"""Run the tiktok-scraper command to download videos for a given hashtag.
Note that all the videos are downloaded that are returned by the TikTok API,
making this a time- and data-intensive process.
The list of downloaded video IDs is constucted and returned if the
downloaded folder contains at least 1 video.
os.chdir is used to execute shell commands in the correct folder and then
reused to return to the original folder of execution of run_downloader script.
"""
path = os.path.join(settings["data"], tag, settings["videos"])
os.chdir(path)
tiktok_command = f"tiktok-scraper hashtag {tag} -d"
os.makedirs(path, exist_ok=True)
tiktok_command = f"tiktok-scraper hashtag {tag} -d --filepath {path}"
result = subprocess.check_output(tiktok_command, shell=True)
downloaded_list_tmp = os.listdir(f"./#{tag}")
downloaded_list_tmp = os.listdir(os.path.join(path, f"#{tag}"))
if downloaded_list_tmp:
downloaded_list = []
for file in downloaded_list_tmp:
file = file.split(".")[0]
downloaded_list.append(file)
os.chdir("../../../tiktok_downloader")
return downloaded_list
else:
logger.warn(f"No video files were downloaded for the hashtag {tag}.")
os.chdir("../../../tiktok_downloader")
shutil.rmtree(settings["videos_delete"])
def get_data(file_path: str) -> list:
"""
Reads the json file and retuns the read data.
"""
def get_data(file_path: str) -> Any:
"""Read a JSON file and return the read data."""
with open(file_path, "r", encoding="utf-8") as f:
data = json.load(f)
return data
return data
def dump_data(file_path: str, data: list):
"""
Writes the data to the json file.
"""
def dump_data(file_path: str, data: Any):
"""Write data to a JSON file."""
with open(file_path, "w", encoding="utf-8") as f:
json.dump(data, f)
def log_writer(log_data: list):
"""
Creates the dictionary of total downloads (posts and videos) per hashtag.
Example : { timetamp : { hashtag : { videos : number_of_new_videos , posts : number_of_new_posts } } }
Writes the dictionary to the log file (logs/log.json).
def log_writer(log_data: List[Tuple[str, Tuple[str, int]]]):
"""Create the dictionary of total downloads (posts and videos) per hashtag.
Example : {
timetamp : {
hashtag : {
videos : number_of_new_videos ,
posts : number_of_new_posts
}
}
}
Writes the dictionary to the log file (`logs/log.json`).
"""
total = 0
scraped_summary_dict: dict = {}
scraped_summary_dict = {} # type: Dict[str, Dict[str, int]]
for hashtag, (data_type, count) in log_data:
if hashtag in scraped_summary_dict:
if data_type in scraped_summary_dict[hashtag]:
@@ -130,18 +133,18 @@ def log_writer(log_data: list):
scraped_summary_dict[hashtag] = {data_type: count}
total += count
now = datetime.now()
now_str = now.strftime("%d-%m-%Y %H:%M:%S")
now_str = datetime.now().strftime("%d-%m-%Y %H:%M:%S")
data = {now_str: scraped_summary_dict}
logger.warn(f"Logged post data: {data}")
logger.debug(f"Logged post data: {data}")
logger.info(f"Successfully scraped {total} total entries")
def id_writer(file_path: str, new_data: list, tag: str, status: bool) -> tuple:
"""
Writes the list of new ids to the post_ids or video_ids files.
"""
def id_writer(
file_path: str, new_data: List[str], tag: str, status: bool
) -> Tuple[str, int]:
"""Write the list of new ids to the post_ids or video_ids file."""
total = len(new_data)
if status:
try:
@@ -162,9 +165,9 @@ def id_writer(file_path: str, new_data: list, tag: str, status: bool) -> tuple:
return number_scraped
def post_writer(file_path: str, new_data: list, status: bool):
"""
Writes the new posts in the post file of the given hashtag (/data/{hashtag}/posts/data.json)
def post_writer(file_path: str, new_data: List[Dict], status: bool):
"""Write the new posts in the post file of the given hashtag
(`/data/{hashtag}/posts/data.json`).
"""
total = len(new_data)
if status:
@@ -182,9 +185,7 @@ def post_writer(file_path: str, new_data: list, status: bool):
def delete_file(file_path: str, file_type: str):
"""
Deletes the directory or the file.
"""
"""Delete a directory or file."""
if not check_existence(file_path, file_type):
raise OSError(f"Attempt to delete file failed: {file_path} does not exist")
elif file_type == "file":
@@ -197,9 +198,8 @@ def delete_file(file_path: str, file_type: str):
raise OSError("{file_type} needs to be either 'file' or 'dir'")
def clean_video_files(settings: dict, tag: str, new_data: list = None):
"""
Moves the new videos from the tiktok-scraper video folder to /data/{hashtag}/videos/
def clean_video_files(settings: dict, tag: str, new_data: Optional[List[str]] = None):
"""Move the new videos from the tiktok-scraper video folder to `/data/{hashtag}/videos/`.
Deletes the residual tiktok-scraper video folder.
"""
if new_data:

View File

@@ -1,5 +1,4 @@
"""
Contains global constants relating to paths and operational parameters such as sleep time between consecutive tiktok-scraper calls.
"""Specify global constants including file paths and scraping options.
"""
@@ -15,7 +14,6 @@ POST_IDS = "post_ids.json"
VIDEO_IDS = "video_ids.json"
DATA_FILE = "data.json"
FILES = {
"data": DATA,
"ids": IDS,
@@ -28,12 +26,7 @@ FILES = {
"downloads": [],
}
# Commands
tag = ""
PARAMETERS = {
"scraper_attempts": 3,
# "number_of_videos" : 3, # Number of videos to be downloaded by tiktok-scraper.
"sleep": 8,
}

View File

@@ -1,118 +1,40 @@
"""Analyze the frequency of hashtags appearing in the set of given posts.
- The "hashtag" positional argument specifies the hashtag of scraped posts to analyze
- The "n" positional argument specifies how many hashtags does the user wants to analyze
- Specifying the "-d" flag prints the hashtag frequencies on the shell
- Specifying the "-p" flag plots the hashtag frequencies and saves as a png file
"""
import os
import json
import argparse
from datetime import datetime
import warnings
warnings.filterwarnings("ignore", message="Glyph (.*) missing from current font")
from typing import List, Tuple, Dict, Any
import logging
import matplotlib.pyplot as plt
import matplotlib.ticker as mtick
import seaborn as sns
sns.set_theme(style="darkgrid")
from file_methods import check_file, check_existence
from global_data import IMAGES
from global_data import IMAGES, FILES
"""
Plots the frequency of hashtags appearing in the set of given posts.
"""
warnings.filterwarnings("ignore", message="Glyph (.*) missing from current font")
sns.set_theme(style="darkgrid")
logger = logging.getLogger()
def get_hashtags(obj):
if not obj:
raise ValueError(f"Empty item, no hashtags to be extracted.")
else:
hashtags = {}
tags = [[tag["name"] for tag in ele["hashtags"]] for ele in obj]
tags = [set(ele) for ele in tags]
{
tag: (
1
if tag not in hashtags and not hashtags.update({tag: 1})
else hashtags[tag] + 1 and not hashtags.update({tag: hashtags[tag] + 1})
)
for ele in tags
for tag in ele
}
hashtags = sorted(hashtags.items(), key=lambda e: e[1], reverse=True)
return hashtags
def get_occurrences(filename, n=1, sort=True):
"""
Takes the json file containing posts and returns a dictionary:
local variable occs = {
"total": total posts in the file,
top_n: [[top n hashtags ], [frequencies of corresponding hashtags]]
}
"""
with open(filename) as f:
obj = json.load(f)
l = len(obj)
tags = get_hashtags(obj)
occs = {"total": l, "top_n": []}
occs["top_n"] = [[ele[i] for ele in tags[0:n]] for i in range(2)]
return occs
def plot(n, occs, img_folder):
y_pos = list(reversed(range(n - 1)))
max_count = occs["top_n"][1][0]
freqs = [count / max_count * 100 for count in occs["top_n"][1][1:]]
labels = occs["top_n"][0][1:]
fig, ax = plt.subplots(figsize=(5, 6.66))
ax.barh(y_pos, freqs)
ax.set_yticks(y_pos)
ax.set_yticklabels(labels)
ax.grid(axis="y")
ax.set_xlabel("Percent of posts with common hashtag")
ax.set_ylim(min(y_pos) - 1, max(y_pos) + 1)
ax.set_title(f'Common hashtags for #{occs["top_n"][0][0]} posts')
ax.xaxis.set_major_formatter(mtick.PercentFormatter(decimals=0))
save_plot(img_folder)
def print_occurrences(occs):
"""
Prints the top n hashtags with their frequencies and the ratio of occurrences and total posts, all to the shell.
"""
row_number = 0
total_posts = occs["total"]
print(
"{:<8} {:<15} {:<15} {:<15}".format(
"Rank", "Hashtag", "Occurrences", "Frequency"
)
)
for key, value in zip(occs["top_n"][0], occs["top_n"][1]):
ratio = value / total_posts
print("{:<8} {:<15} {:<15} {:<15}".format(row_number, key, value, ratio))
row_number += 1
print(f"Total posts: {total_posts}")
def save_plot(img_folder):
"""
Saves the plot to a png file in the folder /data/imgs/
"""
now = datetime.now()
current_time = now.strftime("%Y_%m_%d_%H_%M_%S")
filename = f"{img_folder}/{current_time}.png"
logging.info(f"Plot saved to file: {filename}")
plt.savefig(filename, bbox_inches="tight", facecolor="white", dpi=300)
def create_parser():
"""
Creates the parser and the arguments for the user input.
"""
def create_parser() -> argparse.ArgumentParser:
"""Create the parser and the arguments for the user input."""
parser = argparse.ArgumentParser()
parser.add_argument("input_file", help="The json hashtag file name")
parser.add_argument("n", help="The number of top n occurrences", type=int)
parser.add_argument(
"hashtag",
type=str,
help="The hashtag of scraped posts to analyze",
)
parser.add_argument("n", type=int, help="The number of top n occurrences")
parser.add_argument(
"-p", "--plot", help="Plot the occurrences", action="store_true"
)
@@ -122,14 +44,89 @@ def create_parser():
return parser
if __name__ == "__main__":
"""
Option "n" specifies how many hashtags does the user wants to plot.
"-d" option prints the hashtag frequencies on the shell
"-p" option plots the hashtag frequencies and saves as a png file in the folder /data/imgs/
def get_hashtags(obj: Dict) -> List[Tuple[str, int]]:
if not obj:
raise ValueError(f"Empty item, no hashtags could be extracted.")
else:
hashtags = {}
tags = [set([tag["name"] for tag in ele["hashtags"]]) for ele in obj]
{
tag: (
1
if tag not in hashtags and not hashtags.update({tag: 1})
else hashtags[tag] + 1 and not hashtags.update({tag: hashtags[tag] + 1})
)
for ele in tags
for tag in ele
}
The function get_occurrences is triggered to compute and return the top n occurrences and the hashtags.
return sorted(hashtags.items(), key=lambda e: e[1], reverse=True)
def get_occurrences(filename: str, n: int = 1) -> Dict[str, Any]:
"""Aggregate hashtag frequency information for a specified JSON file.
Example: {
"total": total posts in the file,
top_n: [[top n hashtags ], [frequencies of corresponding hashtags]]
}
"""
with open(filename) as f:
obj = json.load(f)
l = len(obj)
tags = get_hashtags(obj)
occs = {"total": l, "top_n": []}
occs["top_n"] = [[ele[i] for ele in tags[0 : min(l, n)]] for i in range(2)]
return occs
def plot(occs: dict, img_folder: str):
"""Save plot of common hashtags as bar chart to file."""
y_pos = list(reversed(range(len(occs["top_n"][0]) - 1)))
max_count = occs["top_n"][1][0]
freqs = [count / max_count * 100 for count in occs["top_n"][1][1:]]
labels = occs["top_n"][0][1:]
hashtag = occs["top_n"][0][0]
fig, ax = plt.subplots(figsize=(5, 6.66))
ax.barh(y_pos, freqs)
ax.set_yticks(y_pos)
ax.set_yticklabels(labels)
ax.grid(axis="y")
ax.set_xlabel("Percent of posts with common hashtag")
ax.set_ylim(min(y_pos) - 1, max(y_pos) + 1)
ax.set_title(f"Common hashtags for #{hashtag} posts")
ax.xaxis.set_major_formatter(mtick.PercentFormatter(decimals=0))
save_plot(img_folder, hashtag)
def save_plot(img_folder, hashtag):
"""Save the plot as a png file in the folder ../data/imgs/"""
now = datetime.now()
current_time = now.strftime("%Y_%m_%d_%H_%M_%S")
filename = f"{img_folder}/{hashtag}_{current_time}.png"
logging.info(f"Plot saved to file: {filename}")
plt.savefig(filename, bbox_inches="tight", facecolor="white", dpi=300)
def print_occurrences(occs):
"""Print information about the top n hashtags and their frequencies."""
row_number = 0
total_posts = occs["total"]
print(
"{:<8} {:<30} {:<15} {:<15}".format(
"Rank", "Hashtag", "Occurrences", "Frequency"
)
)
for key, value in zip(occs["top_n"][0], occs["top_n"][1]):
ratio = value / total_posts
print("{:<8} {:<30} {:<15} {:.4f}".format(row_number, key, value, ratio))
row_number += 1
print(f"Total posts: {total_posts}")
if __name__ == "__main__":
img_folder = IMAGES
check_file(img_folder, "dir")
parser = create_parser()
@@ -138,14 +135,18 @@ if __name__ == "__main__":
raise ValueError(
f"Specified argument `n` (the number of hashtags to analyze) must be greater than zero, not: {args.n}."
)
if not check_existence(args.input_file, "file"):
input_file = data_file = os.path.join(
FILES["data"], args.hashtag, FILES["posts"], FILES["data_file"]
)
if not check_existence(input_file, "file"):
raise FileNotFoundError(
f"Specified argument `input_file` ({args.input_file}) does not exist."
f"File ({input_file}) for specified argument `hashtag` ({args.hashtag}) does not exist."
)
base = os.path.splitext(args.input_file)[0]
base = os.path.splitext(input_file)[0]
path = f"./{base}_sorted_hashtags.csv"
occs = get_occurrences(args.input_file, args.n)
occs = get_occurrences(input_file, args.n)
if args.plot:
plot(args.n, occs, img_folder)
plot(occs, img_folder)
else:
print_occurrences(occs)

View File

@@ -1,18 +1,44 @@
"""Download post data or videos from TikToks containing one or more specified hashtags.
- The "-p" flag specifies that only data from posts is downloaded, no video files
- The "-v" flag specifies that only video files are downloaded, no post data
- Specifying both "-p" and "-v" flags downloads both post data and video files
- The "-t" flag allows the user to specify a list of space-separated hashtags as an argument
- The "-f" flag allows the user to specify the filename of a text file containing a list of newline-separated hashtags as an argument
"""
import os
import time
import argparse
import logging, logging.config
from typing import List, Tuple, Dict, Any, Optional
from tempfile import TemporaryDirectory
import global_data
import file_methods
import data_methods
logging.config.fileConfig("../logging.config")
logger = logging.getLogger("Logger")
logger = logging.getLogger()
def get_hashtag_list(file_name: str) -> list:
def create_parser() -> argparse.ArgumentParser:
"""Create the parser and the arguments for the user input."""
parser = argparse.ArgumentParser(
description="Download the tiktoks for the requested hashtags"
)
parser.add_argument("-t", type=str, nargs="*", help="List of hashtags to scrape")
parser.add_argument(
"-f", type=str, help="File name containing list of hashtags to scrape"
)
parser.add_argument("-p", action="store_true", help="Download post data")
parser.add_argument("-v", action="store_true", help="Download video files")
return parser
def get_hashtag_list(file_name: str) -> List[str]:
"""Extract list of newline-separated hashtags from text file."""
if not file_methods.check_existence(file_name, "file"):
raise OSError(f"{file_name} does not exist")
with open(file_name) as f:
@@ -22,32 +48,14 @@ def get_hashtag_list(file_name: str) -> list:
return tags
def create_parser():
"""
Creates the parser and the arguments for the user input.
"""
parser = argparse.ArgumentParser(
description="Download the tiktoks for the requested hashtags"
)
parser.add_argument("-t", type=str, nargs="*", help="List of hashtags")
parser.add_argument("-f", type=str, help="File name with the list of hashtags")
parser.add_argument("-p", action="store_true", help="Download posts")
parser.add_argument("-v", action="store_true", help="Download videos")
return parser
def set_download_settings(download_data_type: str) -> dict:
"""
Loads the constants from global_data into the dict called settings and returns it.
Purpose - easy access to global constants by various functions.
"""
settings = {}
settings["data"] = global_data.FILES["data"]
settings["ids"] = global_data.FILES["ids"]
settings["sleep"] = global_data.PARAMETERS["sleep"]
settings["scraper"] = global_data.PARAMETERS["scraper_attempts"]
def set_download_settings(download_data_type: Dict[str, bool]) -> Dict[str, Any]:
"""Load the constants from global_data module into the `settings` dict."""
settings = {
"data": global_data.FILES["data"],
"ids": global_data.FILES["ids"],
"sleep": global_data.PARAMETERS["sleep"],
"scraper": global_data.PARAMETERS["scraper_attempts"],
}
file_methods.check_file(f"{settings['data']}/{settings['ids']}", "dir")
if download_data_type["posts"]:
settings["posts"] = global_data.FILES["posts"]
@@ -61,37 +69,44 @@ def set_download_settings(download_data_type: str) -> dict:
return settings
def get_posts(settings: dict, tag: str) -> tuple:
def get_posts(settings: dict, tag: str) -> Optional[Tuple[str, int]]:
"""Scrape trending TikTok post data for the specified hashtag.
1. Calls `file_methods.download_posts` to scrape the post data for a given hashtag
2. Calls `data_methods.extract_posts` to determine which if any posts
haven't previouly been downloaded.
3. Calls `data_methods.update_posts` to update the ID list with the IDs of
newly downloaded posts.
"""
1. calls download_posts in file_methods.py to get the posts for a given hashtag
2. calls extract_posts from data_methods.py to extract new posts if any
3. calls update_posts from data_methods.py to update the id-list with the ids of newly downloaded posts.
"""
file_path = file_methods.download_posts(settings, tag)
number_scraped = ()
if file_path:
new_data = data_methods.extract_posts(settings, file_path, tag)
if new_data:
data_file = os.path.join(
settings["data"], tag, settings["posts"], settings["data_file"]
)
data_methods.update_posts(data_file, "file", new_data[1])
number_scraped = data_methods.update_posts(
settings["post_ids"], "file", new_data[0], tag
)
file_methods.delete_file(file_path, "file")
with TemporaryDirectory() as temp_dir:
file_path = file_methods.download_posts(settings, tag, temp_dir)
number_scraped = None
if file_path:
new_data = data_methods.extract_posts(settings, file_path, tag)
if new_data:
data_file = os.path.join(
settings["data"], tag, settings["posts"], settings["data_file"]
)
data_methods.update_posts(data_file, "file", new_data[1])
number_scraped = data_methods.update_posts(
settings["post_ids"], "file", new_data[0], tag
)
return number_scraped
def get_videos(settings: dict, tag: str) -> tuple:
def get_videos(settings: dict, tag: str) -> Optional[Tuple[str, int]]:
"""Scrape trending TikTok video files for the specified hashtag.
1. Calls `file_methods.download_videos` to download the video files for a given hashtag
2. Calls `data_methods.extract_videos` to determine which if any videos
haven't previouly been downloaded.
3. Calls `data_methods.update_videos` to update the ID list with the IDs of
newly downloaded videos.
4. Calls `clean_video_files` function to delete the residual video folder
after the data processing.
"""
1. calls download_videos in file_methods.py to get the videos for a given hashtag
2. calls extract_videos from data_methods.py to extract new videos if any
3. calls update_videos from data_methods.py to update the id-list with the ids of newly downloaded videos.
4. the clean_video_files function deletes the residual video folder after the data processing
"""
number_scraped = ()
number_scraped = None
download_list = file_methods.download_videos(settings, tag)
if download_list:
new_data = data_methods.extract_videos(settings, tag, download_list)
@@ -103,11 +118,10 @@ def get_videos(settings: dict, tag: str) -> tuple:
return number_scraped
def get_data(hashtags: list, download_data_type: str) -> list:
"""
The function checks for the user option "-p", "-v" or both and then
triggers the functions get_posts, get_videos or both, respectively.
"""
def get_data(
hashtags: list, download_data_type: Dict[str, bool]
) -> List[Tuple[str, Tuple[str, int]]]:
"""Check command-line arguments and scrape posts/videos for specified hashtags."""
counter = 0
total_hashtags = len(hashtags)
total_hashtags_offset = total_hashtags - 1
@@ -145,10 +159,9 @@ def get_data(hashtags: list, download_data_type: str) -> list:
)
settings["videos_delete"] = settings["data"] + f"/{tag}/videos/#{tag}"
settings["videos_to"] = settings["data"] + f"/{tag}/videos"
res = get_videos(settings, tag)
if res:
res = (res[0], ("videos", res[1]))
scraped_summary_list.append(res)
_res = get_videos(settings, tag)
if _res:
scraped_summary_list.append((_res[0], ("videos", _res[1])))
data_methods.print_total(settings["video_ids"], tag, "videos")
counter += 1
@@ -164,12 +177,12 @@ if __name__ == "__main__":
if not (args.t or args.f):
parser.error(
"No hashtags were given, please use either -t option or -f to provide hashtags."
"No hashtags were given, please use either the `-t` flag or the `-f` flag to specify one or more hashtags."
)
if not (args.p or args.v):
parser.error(
"No argument given, please specify either -p for posts or -v videos or both."
"No argument given, please specify either the `-p` flag to download post data or the `-v` flag to download video files, or both."
)
if args.t:
@@ -181,7 +194,7 @@ if __name__ == "__main__":
logger.info(f"Hashtags to scrape: {hashtags}")
if not hashtags:
raise ValueError(
"No hashtags were specified: please use either the -t flag to specify a sspace-separated list of one or more hashtags as a command-line argument, or use the -f flag to specify a text file of newline-separated hashtags."
"No hashtags were specified: please use either the `-t` flag to specify a sspace-separated list of one or more hashtags as a command-line argument, or use the `-f` flag to specify a text file of newline-separated hashtags."
)
download_data_type = {"posts": args.p, "videos": args.v}