From be05ea0fe251138f91554f143d8aa2b771f56e99 Mon Sep 17 00:00:00 2001 From: Tristan Lee Date: Thu, 5 May 2022 20:50:54 -0500 Subject: [PATCH] fixed problems with type hints, clarified documentation --- tiktok_downloader/data_methods.py | 60 ++++++------ tiktok_downloader/file_methods.py | 35 +++---- tiktok_downloader/global_data.py | 9 +- tiktok_downloader/hashtag_frequencies.py | 111 +++++++++++------------ tiktok_downloader/run_downloader.py | 70 ++++++++------ 5 files changed, 146 insertions(+), 139 deletions(-) diff --git a/tiktok_downloader/data_methods.py b/tiktok_downloader/data_methods.py index e4bd017..7334079 100644 --- a/tiktok_downloader/data_methods.py +++ b/tiktok_downloader/data_methods.py @@ -1,4 +1,7 @@ -from typing import NamedTuple +"""Utility functions that perform data processing related tasks. +""" + +from typing import NamedTuple, List, Tuple, Set, Optional, Union, Dict, Any import logging, logging.config import file_methods @@ -7,13 +10,8 @@ logging.config.fileConfig("../logging.config") logger = logging.getLogger("Logger") -""" -The file contains several functions that perform data processing related tasks. -""" - - class Diff(NamedTuple): - ids: list + ids: Set[str] filter_posts: bool @@ -22,10 +20,12 @@ class Total(NamedTuple): unique: int -def get_difference(tag: str, file_name: str, ids: list) -> NamedTuple: - """ - Compares two sets of ids and returns the difference of the two sets. - Purpose - user to filter out the new ids by comparing the set of id list (ids/post_ids.json or videos_ids.json) and the list of newly downloaded ids. +def get_difference(tag: str, file_name: str, ids: List[str]) -> Optional[Diff]: + """Find TikTok posts that haven't already been scraped. + + Filter out the new posts for the hashtag `tag` by comparing the list of + post IDs contained in `filename` to the list of newly downloaded IDs + contained in `ids`. """ filter_posts = False current_id_data = file_methods.get_data(file_name) @@ -38,22 +38,23 @@ def get_difference(tag: str, file_name: str, ids: list) -> NamedTuple: if not new_ids: return None else: - new_ids = list(new_ids) total_new_ids = len(new_ids) if total_new_ids == total_current_ids: - filter_posts = False new_data = Diff(new_ids, filter_posts) else: new_data = Diff(new_ids, filter_posts) return new_data else: filter_posts = True - new_data = Diff(ids, filter_posts) + new_data = Diff(set(ids), filter_posts) return new_data -def extract_posts(settings: dict, file_name: str, tag: str) -> list: +def extract_posts( + settings: Dict[Any, Any], file_name: str, tag: str +) -> Optional[Tuple[List[str], List[str]]]: """ + Takes the downloaded file by the tiktok-scraper that contains the posts, and returns the new posts after comparing it the list of posts (from the file ids/post_ids.json) already downloaded. """ ids = [] @@ -65,6 +66,7 @@ def extract_posts(settings: dict, file_name: str, tag: str) -> list: if not ids: logger.warn(f"No posts were found for the hashtag: {tag}") + return None status = file_methods.check_existence(settings["post_ids"], "file") if not status: @@ -74,16 +76,15 @@ def extract_posts(settings: dict, file_name: str, tag: str) -> list: new_ids = get_difference(tag, settings["post_ids"], ids) if not new_ids: logger.warn(f"No new posts were found for the hashtag: {tag}") + return None elif new_ids.filter_posts: new_posts = [post for post in posts if post["id"] in new_ids.ids] - new_data = (new_ids.ids, new_posts) - return new_data + return (list(new_ids.ids), new_posts) else: - new_data = (new_ids.ids, posts) - return new_data + return (list(new_ids.ids), posts) -def extract_videos(settings: dict, tag: str, download_list: list) -> list: +def extract_videos(settings: dict, tag: str, download_list: List[str]) -> List[str]: """ Tiktok-scraper downloads the videos and puts them in a folder - the list of ids of the downloaded videos is fed to this function as download_list. The function returns the set of new videos after comparing it the list of videos (from the file ids/videos_ids.json) already downloaded. """ @@ -97,37 +98,40 @@ def extract_videos(settings: dict, tag: str, download_list: list) -> list: logger.warn( f"No new videos were found for the {tag} in the downloaded folder." ) - return None + return [] else: - return new_videos.ids + return list(new_videos.ids) def update_posts( - file_path: str, file_type: str, new_data: list, tag: str = None -) -> tuple: + file_path: str, file_type: str, new_data: List[str], tag: str = None +) -> Optional[Tuple[str, int]]: """ Updates the list of post ids (in the file ids/post_ids.json) with the ids of the new posts. """ status = file_methods.check_existence(file_path, file_type) if not tag: file_methods.post_writer(file_path, new_data, status) + return None else: scraped_data = file_methods.id_writer(file_path, new_data, tag, status) return scraped_data -def update_videos(settings: str, new_data: list, tag: str) -> tuple: +def update_videos( + settings: Dict[str, Any], new_data: List[str], tag: str +) -> Tuple[str, int]: """ Updates the list of video ids (in the file ids/video_ids.json) with the ids of the new videos. """ file_path = settings["video_ids"] file_methods.check_file(file_path, "file") - log = file_methods.id_writer(file_path, new_data, tag, True) + number_scraped = file_methods.id_writer(file_path, new_data, tag, True) file_methods.clean_video_files(settings, tag, new_data) - return log + return number_scraped -def get_total_posts(file_path: str, tag: str) -> NamedTuple: +def get_total_posts(file_path: str, tag: str) -> Total: """ Returns total count of ids in a id list along with the number of unique ids among them. """ diff --git a/tiktok_downloader/file_methods.py b/tiktok_downloader/file_methods.py index 43184fe..62fd881 100644 --- a/tiktok_downloader/file_methods.py +++ b/tiktok_downloader/file_methods.py @@ -1,17 +1,17 @@ +"""Utility functions that operate on files, such as writing to reading from a file. +""" + import os import json import subprocess from datetime import datetime import shutil +from typing import Tuple, List, Optional, Dict, Any import logging, logging.config logging.config.fileConfig("../logging.config") -logger = logging.getLogger("Logger") - -""" -The file contains the functions that operate on files, such as writing or reading from files etc. -""" +logger = logging.getLogger() def create_file(name: str, file_type: str): @@ -94,16 +94,16 @@ def download_videos(settings: dict, tag: str): shutil.rmtree(settings["videos_delete"]) -def get_data(file_path: str) -> list: +def get_data(file_path: str) -> Any: """ Reads the json file and retuns the read data. """ with open(file_path, "r", encoding="utf-8") as f: data = json.load(f) - return data + return data -def dump_data(file_path: str, data: list): +def dump_data(file_path: str, data: List[dict]): """ Writes the data to the json file. """ @@ -111,14 +111,15 @@ def dump_data(file_path: str, data: list): json.dump(data, f) -def log_writer(log_data: list): +def log_writer(log_data: List[Tuple[str, Tuple[str, int]]]): """ Creates the dictionary of total downloads (posts and videos) per hashtag. Example : { timetamp : { hashtag : { videos : number_of_new_videos , posts : number_of_new_posts } } } Writes the dictionary to the log file (logs/log.json). """ + total = 0 - scraped_summary_dict: dict + scraped_summary_dict = {} # type: Dict[str, Dict[str, int]] for hashtag, (data_type, count) in log_data: if hashtag in scraped_summary_dict: if data_type in scraped_summary_dict[hashtag]: @@ -130,18 +131,20 @@ def log_writer(log_data: list): scraped_summary_dict[hashtag] = {data_type: count} total += count - now = datetime.now() - now_str = now.strftime("%d-%m-%Y %H:%M:%S") + now_str = datetime.now().strftime("%d-%m-%Y %H:%M:%S") data = {now_str: scraped_summary_dict} - logger.warn(f"Logged post data: {data}") + logger.debug(f"Logged post data: {data}") logger.info(f"Successfully scraped {total} total entries") -def id_writer(file_path: str, new_data: list, tag: str, status: bool) -> tuple: +def id_writer( + file_path: str, new_data: List[str], tag: str, status: bool +) -> Tuple[str, int]: """ Writes the list of new ids to the post_ids or video_ids files. """ + total = len(new_data) if status: try: @@ -162,7 +165,7 @@ def id_writer(file_path: str, new_data: list, tag: str, status: bool) -> tuple: return number_scraped -def post_writer(file_path: str, new_data: list, status: bool): +def post_writer(file_path: str, new_data: List[str], status: bool): """ Writes the new posts in the post file of the given hashtag (/data/{hashtag}/posts/data.json) """ @@ -197,7 +200,7 @@ def delete_file(file_path: str, file_type: str): raise OSError("{file_type} needs to be either 'file' or 'dir'") -def clean_video_files(settings: dict, tag: str, new_data: list = None): +def clean_video_files(settings: dict, tag: str, new_data: Optional[List[str]] = None): """ Moves the new videos from the tiktok-scraper video folder to /data/{hashtag}/videos/ Deletes the residual tiktok-scraper video folder. diff --git a/tiktok_downloader/global_data.py b/tiktok_downloader/global_data.py index 85d4939..ed8c317 100644 --- a/tiktok_downloader/global_data.py +++ b/tiktok_downloader/global_data.py @@ -1,5 +1,4 @@ -""" -Contains global constants relating to paths and operational parameters such as sleep time between consecutive tiktok-scraper calls. +"""Specify global constants including file paths and scraping options. """ @@ -15,7 +14,6 @@ POST_IDS = "post_ids.json" VIDEO_IDS = "video_ids.json" DATA_FILE = "data.json" - FILES = { "data": DATA, "ids": IDS, @@ -28,12 +26,7 @@ FILES = { "downloads": [], } - -# Commands -tag = "" - PARAMETERS = { "scraper_attempts": 3, - # "number_of_videos" : 3, # Number of videos to be downloaded by tiktok-scraper. "sleep": 8, } diff --git a/tiktok_downloader/hashtag_frequencies.py b/tiktok_downloader/hashtag_frequencies.py index 5ad3f41..ff70bb7 100644 --- a/tiktok_downloader/hashtag_frequencies.py +++ b/tiktok_downloader/hashtag_frequencies.py @@ -1,33 +1,54 @@ +"""Analyze the frequency of hashtags appearing in the set of given posts. + +- The "input_file" argument specifies the JSON file containing post information for a given hashtag +- The "n" argument specifies how many hashtags does the user wants to analyze +- Specifying the "-d" flag prints the hashtag frequencies on the shell +- Specifying the "-p" flag plots the hashtag frequencies and saves as a png file +""" + import os import json import argparse from datetime import datetime import warnings - -warnings.filterwarnings("ignore", message="Glyph (.*) missing from current font") +from typing import List, Tuple, Dict, Any import logging import matplotlib.pyplot as plt import matplotlib.ticker as mtick import seaborn as sns -sns.set_theme(style="darkgrid") from file_methods import check_file, check_existence from global_data import IMAGES -""" -Plots the frequency of hashtags appearing in the set of given posts. -""" +warnings.filterwarnings("ignore", message="Glyph (.*) missing from current font") +sns.set_theme(style="darkgrid") -def get_hashtags(obj): +def create_parser() -> argparse.ArgumentParser: + """Create the parser and the arguments for the user input.""" + parser = argparse.ArgumentParser() + parser.add_argument( + "input_file", + help="The file name of the JSON file containing posts for a given hashtag", + ) + parser.add_argument("n", help="The number of top n occurrences", type=int) + parser.add_argument( + "-p", "--plot", help="Plot the occurrences", action="store_true" + ) + parser.add_argument( + "-d", "--print", help="List top n hashtags", action="store_true" + ) + return parser + + +def get_hashtags(obj: Dict) -> List[Tuple[str, int]]: if not obj: - raise ValueError(f"Empty item, no hashtags to be extracted.") + raise ValueError(f"Empty item, no hashtags could be extracted.") else: hashtags = {} - tags = [[tag["name"] for tag in ele["hashtags"]] for ele in obj] - tags = [set(ele) for ele in tags] + tags = [set([tag["name"] for tag in ele["hashtags"]]) for ele in obj] { tag: ( 1 @@ -37,29 +58,29 @@ def get_hashtags(obj): for ele in tags for tag in ele } - hashtags = sorted(hashtags.items(), key=lambda e: e[1], reverse=True) - return hashtags + return sorted(hashtags.items(), key=lambda e: e[1], reverse=True) -def get_occurrences(filename, n=1, sort=True): - """ - Takes the json file containing posts and returns a dictionary: - local variable occs = { +def get_occurrences(filename: str, n: int = 1) -> Dict[str, Any]: + """Aggregate hashtag frequency information for a specified JSON file. + + Return dict `occs` with keys: "total": total posts in the file, top_n: [[top n hashtags ], [frequencies of corresponding hashtags]] } """ with open(filename) as f: obj = json.load(f) - l = len(obj) - tags = get_hashtags(obj) - occs = {"total": l, "top_n": []} - occs["top_n"] = [[ele[i] for ele in tags[0:n]] for i in range(2)] - return occs + l = len(obj) + tags = get_hashtags(obj) + occs = {"total": l, "top_n": []} + occs["top_n"] = [[ele[i] for ele in tags[0 : max(l, n)]] for i in range(2)] + return occs -def plot(n, occs, img_folder): +def plot(n: int, occs: dict, img_folder: str): + """Save plot of common hashtags as bar chart to file.""" y_pos = list(reversed(range(n - 1))) max_count = occs["top_n"][1][0] freqs = [count / max_count * 100 for count in occs["top_n"][1][1:]] @@ -77,10 +98,17 @@ def plot(n, occs, img_folder): save_plot(img_folder) +def save_plot(img_folder): + """Save the plot as a png file in the folder ../data/imgs/""" + now = datetime.now() + current_time = now.strftime("%Y_%m_%d_%H_%M_%S") + filename = f"{img_folder}/{current_time}.png" + logging.info(f"Plot saved to file: {filename}") + plt.savefig(filename, bbox_inches="tight", facecolor="white", dpi=300) + + def print_occurrences(occs): - """ - Prints the top n hashtags with their frequencies and the ratio of occurrences and total posts, all to the shell. - """ + """Print information about the top n hashtags and their frequencies.""" row_number = 0 total_posts = occs["total"] print( @@ -94,41 +122,8 @@ def print_occurrences(occs): row_number += 1 -def save_plot(img_folder): - """ - Saves the plot to a png file in the folder /data/imgs/ - """ - now = datetime.now() - current_time = now.strftime("%Y_%m_%d_%H_%M_%S") - filename = f"{img_folder}/{current_time}.png" - logging.info(f"Plot saved to file: {filename}") - plt.savefig(filename, bbox_inches="tight", facecolor="white", dpi=300) - - -def create_parser(): - """ - Creates the parser and the arguments for the user input. - """ - parser = argparse.ArgumentParser() - parser.add_argument("input_file", help="The json hashtag file name") - parser.add_argument("n", help="The number of top n occurrences", type=int) - parser.add_argument( - "-p", "--plot", help="Plot the occurrences", action="store_true" - ) - parser.add_argument( - "-d", "--print", help="List top n hashtags", action="store_true" - ) - return parser - - if __name__ == "__main__": - """ - Option "n" specifies how many hashtags does the user wants to plot. - "-d" option prints the hashtag frequencies on the shell - "-p" option plots the hashtag frequencies and saves as a png file in the folder /data/imgs/ - The function get_occurrences is triggered to compute and return the top n occurrences and the hashtags. - """ img_folder = IMAGES check_file(img_folder, "dir") parser = create_parser() diff --git a/tiktok_downloader/run_downloader.py b/tiktok_downloader/run_downloader.py index 3713e5a..7d1f3ef 100644 --- a/tiktok_downloader/run_downloader.py +++ b/tiktok_downloader/run_downloader.py @@ -1,7 +1,17 @@ +"""Download post data or videos from TikToks containing one or more specified hashtags. + +- The "-p" flag specifies that only data from posts is downloaded, no video files +- The "-v" flag specifies that only video files are downloaded, no post data +- Specifying both "-p" and "-v" flags downloads both post data and video files +- The "-t" flag allows the user to specify a list of space-separated hashtags as an argument +- The "-f" flag allows the user to specify the filename of a text file containing a list of newline-separated hashtags as an argument +""" + import os import time import argparse import logging, logging.config +from typing import List, Tuple, Dict, Any, Optional import global_data import file_methods @@ -12,17 +22,7 @@ logging.config.fileConfig("../logging.config") logger = logging.getLogger("Logger") -def get_hashtag_list(file_name: str) -> list: - if not file_methods.check_existence(file_name, "file"): - raise OSError(f"{file_name} does not exist") - with open(file_name) as f: - tags = list( - filter(None, [line.strip() for line in f if not line.startswith("#")]) - ) - return tags - - -def create_parser(): +def create_parser() -> argparse.ArgumentParser: """ Creates the parser and the arguments for the user input. """ @@ -38,16 +38,27 @@ def create_parser(): return parser -def set_download_settings(download_data_type: str) -> dict: +def get_hashtag_list(file_name: str) -> List[str]: + if not file_methods.check_existence(file_name, "file"): + raise OSError(f"{file_name} does not exist") + with open(file_name) as f: + tags = list( + filter(None, [line.strip() for line in f if not line.startswith("#")]) + ) + return tags + + +def set_download_settings(download_data_type: Dict[str, bool]) -> Dict[str, Any]: """ Loads the constants from global_data into the dict called settings and returns it. Purpose - easy access to global constants by various functions. """ - settings = {} - settings["data"] = global_data.FILES["data"] - settings["ids"] = global_data.FILES["ids"] - settings["sleep"] = global_data.PARAMETERS["sleep"] - settings["scraper"] = global_data.PARAMETERS["scraper_attempts"] + settings = { + "data": global_data.FILES["data"], + "ids": global_data.FILES["ids"], + "sleep": global_data.PARAMETERS["sleep"], + "scraper": global_data.PARAMETERS["scraper_attempts"], + } file_methods.check_file(f"{settings['data']}/{settings['ids']}", "dir") if download_data_type["posts"]: settings["posts"] = global_data.FILES["posts"] @@ -61,14 +72,14 @@ def set_download_settings(download_data_type: str) -> dict: return settings -def get_posts(settings: dict, tag: str) -> tuple: +def get_posts(settings: dict, tag: str) -> Optional[Tuple[str, int]]: """ 1. calls download_posts in file_methods.py to get the posts for a given hashtag 2. calls extract_posts from data_methods.py to extract new posts if any 3. calls update_posts from data_methods.py to update the id-list with the ids of newly downloaded posts. """ file_path = file_methods.download_posts(settings, tag) - number_scraped = () + number_scraped = None if file_path: new_data = data_methods.extract_posts(settings, file_path, tag) if new_data: @@ -84,14 +95,14 @@ def get_posts(settings: dict, tag: str) -> tuple: return number_scraped -def get_videos(settings: dict, tag: str) -> tuple: +def get_videos(settings: dict, tag: str) -> Optional[Tuple[str, int]]: """ 1. calls download_videos in file_methods.py to get the videos for a given hashtag 2. calls extract_videos from data_methods.py to extract new videos if any 3. calls update_videos from data_methods.py to update the id-list with the ids of newly downloaded videos. 4. the clean_video_files function deletes the residual video folder after the data processing """ - number_scraped = () + number_scraped = None download_list = file_methods.download_videos(settings, tag) if download_list: new_data = data_methods.extract_videos(settings, tag, download_list) @@ -103,7 +114,9 @@ def get_videos(settings: dict, tag: str) -> tuple: return number_scraped -def get_data(hashtags: list, download_data_type: str) -> list: +def get_data( + hashtags: list, download_data_type: Dict[str, bool] +) -> List[Tuple[str, Tuple[str, int]]]: """ The function checks for the user option "-p", "-v" or both and then triggers the functions get_posts, get_videos or both, respectively. @@ -145,10 +158,9 @@ def get_data(hashtags: list, download_data_type: str) -> list: ) settings["videos_delete"] = settings["data"] + f"/{tag}/videos/#{tag}" settings["videos_to"] = settings["data"] + f"/{tag}/videos" - res = get_videos(settings, tag) - if res: - res = (res[0], ("videos", res[1])) - scraped_summary_list.append(res) + _res = get_videos(settings, tag) + if _res: + scraped_summary_list.append((_res[0], ("videos", _res[1]))) data_methods.print_total(settings["video_ids"], tag, "videos") counter += 1 @@ -164,12 +176,12 @@ if __name__ == "__main__": if not (args.t or args.f): parser.error( - "No hashtags were given, please use either -t option or -f to provide hashtags." + "No hashtags were given, please use either the `-t` flag or the `-f` flag to provide hashtags." ) if not (args.p or args.v): parser.error( - "No argument given, please specify either -p for posts or -v videos or both." + "No argument given, please specify either the `-p` flag to download post data or the `-v` flag to download video files, or both." ) if args.t: @@ -181,7 +193,7 @@ if __name__ == "__main__": logger.info(f"Hashtags to scrape: {hashtags}") if not hashtags: raise ValueError( - "No hashtags were specified: please use either the -t flag to specify a sspace-separated list of one or more hashtags as a command-line argument, or use the -f flag to specify a text file of newline-separated hashtags." + "No hashtags were specified: please use either the `-t` flag to specify a sspace-separated list of one or more hashtags as a command-line argument, or use the `-f` flag to specify a text file of newline-separated hashtags." ) download_data_type = {"posts": args.p, "videos": args.v}