diff --git a/README.md b/README.md index 7d13107..08c0b47 100644 --- a/README.md +++ b/README.md @@ -22,17 +22,17 @@ You should now be ready to start using the tool. ## About the tool ### Command-line arguments ``` -$ python run_downloader.py -h +python3 run_downloader.py --help usage: run_downloader.py [-h] [-t [T [T ...]]] [-f F] [-p] [-v] Download the tiktoks for the requested hashtags optional arguments: --h, --help show this help message and exit --t [T [T ...]] List of hashtags --f F File name with the list of hashtags --p Download posts --v Download videos + -h, --help show this help message and exit + -t [T [T ...]] List of hashtags to scrape + -f F File name containing list of hashtags to scrape + -p Download post data + -v Download video files ``` ### Structure of output data @@ -90,11 +90,11 @@ Note that video downloading is a time and data rate consuming task, as a result The script `hashtag_frequencies.py` analyzes the frequencies of top occurring hashtags in a given set of posts. ``` -python hashtag_frequencies.py --help -usage: hashtag_frequencies.py [-h] [-p] [-d] input_file n +$ python3 hashtag_frequencies.py --help +usage: hashtag_frequencies.py [-h] [-p] [-d] hashtag n positional arguments: - input_file The json hashtag file name + hashtag The hashtag of scraped posts to analyze n The number of top n occurrences optional arguments: @@ -107,7 +107,7 @@ Assume we want to analyze the 20 most frequently occurring hashtags in the downl - The results can be plotted and saved as a PNG file by executing the following command: - `python3 hashtag_frequencies.py -p ../data/london/posts/data.json 20` + `python3 hashtag_frequencies.py london 20 -p` which will produce a figure similar to that shown below:
@@ -118,7 +118,7 @@ Assume we want to analyze the 20 most frequently occurring hashtags in the downl - The results can be displayed in tabular form by executing the following command: - `python3 hashtag_frequencies.py -d ../data/london/posts/data.json 20` + `python3 hashtag_frequencies.py london 20 -d` which will produce a terminal output similar to the following: ``` diff --git a/tiktok_downloader/data_methods.py b/tiktok_downloader/data_methods.py index 7334079..382e1f2 100644 --- a/tiktok_downloader/data_methods.py +++ b/tiktok_downloader/data_methods.py @@ -1,27 +1,30 @@ """Utility functions that perform data processing related tasks. """ -from typing import NamedTuple, List, Tuple, Set, Optional, Union, Dict, Any -import logging, logging.config +from typing import NamedTuple, List, Tuple, Set, Optional, Dict, Any +import logging import file_methods -logging.config.fileConfig("../logging.config") -logger = logging.getLogger("Logger") +logger = logging.getLogger() class Diff(NamedTuple): + """Keep track of scraped post IDs and whether previously-scraped posts have been filtered.""" + ids: Set[str] filter_posts: bool class Total(NamedTuple): + """Keep track of number of total and number of unique scraped posts.""" + total: int unique: int def get_difference(tag: str, file_name: str, ids: List[str]) -> Optional[Diff]: - """Find TikTok posts that haven't already been scraped. + """Find TikTok post IDs that haven't previously been scraped. Filter out the new posts for the hashtag `tag` by comparing the list of post IDs contained in `filename` to the list of newly downloaded IDs @@ -52,10 +55,11 @@ def get_difference(tag: str, file_name: str, ids: List[str]) -> Optional[Diff]: def extract_posts( settings: Dict[Any, Any], file_name: str, tag: str -) -> Optional[Tuple[List[str], List[str]]]: - """ +) -> Optional[Tuple[List[str], List[Dict]]]: + """Find TikTok posts that haven't previously been scraped. - Takes the downloaded file by the tiktok-scraper that contains the posts, and returns the new posts after comparing it the list of posts (from the file ids/post_ids.json) already downloaded. + Compares the file downloaded by tiktok-scraper to the list of + previously-scraped posts (from the file ids/post_ids.json). """ ids = [] posts = [] @@ -85,8 +89,10 @@ def extract_posts( def extract_videos(settings: dict, tag: str, download_list: List[str]) -> List[str]: - """ - Tiktok-scraper downloads the videos and puts them in a folder - the list of ids of the downloaded videos is fed to this function as download_list. The function returns the set of new videos after comparing it the list of videos (from the file ids/videos_ids.json) already downloaded. + """Find TikTok videos that haven't previously been scraped. + + Compares the file downloaded by tiktok-scraper to the list of + previously-scraped videos (from the file ids/video_ids.json). """ status = file_methods.check_existence(settings["video_ids"], "file") if not status: @@ -104,10 +110,10 @@ def extract_videos(settings: dict, tag: str, download_list: List[str]) -> List[s def update_posts( - file_path: str, file_type: str, new_data: List[str], tag: str = None + file_path: str, file_type: str, new_data: List[Any], tag: str = None ) -> Optional[Tuple[str, int]]: - """ - Updates the list of post ids (in the file ids/post_ids.json) with the ids of the new posts. + """Update the file containing scraped post IDs (`ids/post_ids.json`) with + the IDs of the recently scraped posts. """ status = file_methods.check_existence(file_path, file_type) if not tag: @@ -121,8 +127,8 @@ def update_posts( def update_videos( settings: Dict[str, Any], new_data: List[str], tag: str ) -> Tuple[str, int]: - """ - Updates the list of video ids (in the file ids/video_ids.json) with the ids of the new videos. + """Update the file containing video IDs (`ids/video_ids.json`) with the IDs + of the recently scraped videos. """ file_path = settings["video_ids"] file_methods.check_file(file_path, "file") @@ -132,12 +138,10 @@ def update_videos( def get_total_posts(file_path: str, tag: str) -> Total: - """ - Returns total count of ids in a id list along with the number of unique ids among them. - """ + """Count number of total scraped posts and number of unique scraped posts.""" status = file_methods.check_existence(file_path, "file") if not status: - raise OSError("{file_path} not found!") + raise OSError(f"{file_path} not found!") else: data = file_methods.get_data(file_path) total_posts = len(data[tag]) @@ -147,9 +151,7 @@ def get_total_posts(file_path: str, tag: str) -> Total: def print_total(file_path: str, tag: str, data_type: str): - """ - Prints the total count for posts or videos for a hashtag. Calls the function get_total_posts for sanity check that there are no repeating ids in the id lists. - """ + """Print number of total and unique scraped posts, warn if any non-unique posts.""" total = get_total_posts(file_path, tag) if total.total == total.unique: logger.info(f"Scraped {total.total} {data_type} containing the hashtag '{tag}'") diff --git a/tiktok_downloader/file_methods.py b/tiktok_downloader/file_methods.py index 62fd881..53d6c00 100644 --- a/tiktok_downloader/file_methods.py +++ b/tiktok_downloader/file_methods.py @@ -15,9 +15,7 @@ logger = logging.getLogger() def create_file(name: str, file_type: str): - """ - Creates a file or directory. - """ + """Create a file or directory.""" if file_type == "dir": os.makedirs(name, mode=0o777) elif file_type == "file": @@ -28,9 +26,7 @@ def create_file(name: str, file_type: str): def check_existence(file_path: str, file_type: str): - """ - Checks the existence of a file or a directory. If not found, returns False, else returns True. - """ + """Check if a file or a directory exists.""" if file_type == "file": return os.path.isfile(file_path) elif file_type == "dir": @@ -40,19 +36,20 @@ def check_existence(file_path: str, file_type: str): def check_file(file_path: str, file_type: str): - """ - Creates a file or directory, if not found. Else, returns nothing. - """ + """If path does not exist, creates a file or directory.""" status = check_existence(file_path, file_type) if not status: create_file(file_path, file_type) -def download_posts(settings: dict, tag: str): - """ - Runs the tiktok-scraper command to download posts for a given hashtag. - Returns the path to the downloaded file of posts. If no file was downloaded, prints the error and returns nothing in order to move on. - os.chdir is used to execute shell commands in the right folders and then reused to come back to the original folder of execution of run_downloader script. +def download_posts(settings: Dict, tag: str): + """Run the tiktok-scraper command to download posts for a given hashtag. + + Returns the path to the downloaded file of posts. If no file was downloaded, + prints the error and returns nothing in order to move on. + + os.chdir is used to execute shell commands in the correct folder and then + reused to return to the original folder of execution of run_downloader script. """ path = os.path.join(settings["data"], tag, settings["posts"]) os.chdir(path) @@ -69,11 +66,16 @@ def download_posts(settings: dict, tag: str): os.chdir("../../../tiktok_downloader") -def download_videos(settings: dict, tag: str): - """ - Runs the tiktok-scraper command to download videos for a given hashtag. Note that all the videos are downloaded that are returned by the tiktok api and as a result, its a time and data consuming process. - The list of downloaded video ids is constucted and returned if the downloaded folder contains at least 1 video. - os.chdir is used to execute shell commands in the right folders and then reused to come back to the original folder of execution of run_downloader script. +def download_videos(settings: Dict, tag: str): + """Run the tiktok-scraper command to download videos for a given hashtag. + + Note that all the videos are downloaded that are returned by the TikTok API, + making this a time- and data-intensive process. + The list of downloaded video IDs is constucted and returned if the + downloaded folder contains at least 1 video. + + os.chdir is used to execute shell commands in the correct folder and then + reused to return to the original folder of execution of run_downloader script. """ path = os.path.join(settings["data"], tag, settings["videos"]) os.chdir(path) @@ -95,27 +97,31 @@ def download_videos(settings: dict, tag: str): def get_data(file_path: str) -> Any: - """ - Reads the json file and retuns the read data. - """ + """Read a JSON file and return the read data.""" with open(file_path, "r", encoding="utf-8") as f: data = json.load(f) return data -def dump_data(file_path: str, data: List[dict]): - """ - Writes the data to the json file. - """ +def dump_data(file_path: str, data: Any): + """Write data to a JSON file.""" with open(file_path, "w", encoding="utf-8") as f: json.dump(data, f) def log_writer(log_data: List[Tuple[str, Tuple[str, int]]]): - """ - Creates the dictionary of total downloads (posts and videos) per hashtag. - Example : { timetamp : { hashtag : { videos : number_of_new_videos , posts : number_of_new_posts } } } - Writes the dictionary to the log file (logs/log.json). + """Create the dictionary of total downloads (posts and videos) per hashtag. + + Example : { + timetamp : { + hashtag : { + videos : number_of_new_videos , + posts : number_of_new_posts + } + } + } + + Writes the dictionary to the log file (`logs/log.json`). """ total = 0 @@ -141,9 +147,7 @@ def log_writer(log_data: List[Tuple[str, Tuple[str, int]]]): def id_writer( file_path: str, new_data: List[str], tag: str, status: bool ) -> Tuple[str, int]: - """ - Writes the list of new ids to the post_ids or video_ids files. - """ + """Write the list of new ids to the post_ids or video_ids file.""" total = len(new_data) if status: @@ -165,9 +169,9 @@ def id_writer( return number_scraped -def post_writer(file_path: str, new_data: List[str], status: bool): - """ - Writes the new posts in the post file of the given hashtag (/data/{hashtag}/posts/data.json) +def post_writer(file_path: str, new_data: List[Dict], status: bool): + """Write the new posts in the post file of the given hashtag + (`/data/{hashtag}/posts/data.json`). """ total = len(new_data) if status: @@ -185,9 +189,7 @@ def post_writer(file_path: str, new_data: List[str], status: bool): def delete_file(file_path: str, file_type: str): - """ - Deletes the directory or the file. - """ + """Delete a directory or file.""" if not check_existence(file_path, file_type): raise OSError(f"Attempt to delete file failed: {file_path} does not exist") elif file_type == "file": @@ -201,8 +203,7 @@ def delete_file(file_path: str, file_type: str): def clean_video_files(settings: dict, tag: str, new_data: Optional[List[str]] = None): - """ - Moves the new videos from the tiktok-scraper video folder to /data/{hashtag}/videos/ + """Move the new videos from the tiktok-scraper video folder to `/data/{hashtag}/videos/`. Deletes the residual tiktok-scraper video folder. """ if new_data: diff --git a/tiktok_downloader/hashtag_frequencies.py b/tiktok_downloader/hashtag_frequencies.py index ff70bb7..ddcad15 100644 --- a/tiktok_downloader/hashtag_frequencies.py +++ b/tiktok_downloader/hashtag_frequencies.py @@ -1,7 +1,7 @@ """Analyze the frequency of hashtags appearing in the set of given posts. -- The "input_file" argument specifies the JSON file containing post information for a given hashtag -- The "n" argument specifies how many hashtags does the user wants to analyze +- The "hashtag" positional argument specifies the hashtag of scraped posts to analyze +- The "n" positional argument specifies how many hashtags does the user wants to analyze - Specifying the "-d" flag prints the hashtag frequencies on the shell - Specifying the "-p" flag plots the hashtag frequencies and saves as a png file """ @@ -18,20 +18,20 @@ import matplotlib.pyplot as plt import matplotlib.ticker as mtick import seaborn as sns - from file_methods import check_file, check_existence -from global_data import IMAGES +from global_data import IMAGES, FILES warnings.filterwarnings("ignore", message="Glyph (.*) missing from current font") sns.set_theme(style="darkgrid") +logger = logging.getLogger() def create_parser() -> argparse.ArgumentParser: """Create the parser and the arguments for the user input.""" parser = argparse.ArgumentParser() parser.add_argument( - "input_file", - help="The file name of the JSON file containing posts for a given hashtag", + "hashtag", + help="The hashtag of scraped posts to analyze", ) parser.add_argument("n", help="The number of top n occurrences", type=int) parser.add_argument( @@ -65,7 +65,7 @@ def get_hashtags(obj: Dict) -> List[Tuple[str, int]]: def get_occurrences(filename: str, n: int = 1) -> Dict[str, Any]: """Aggregate hashtag frequency information for a specified JSON file. - Return dict `occs` with keys: + Example: { "total": total posts in the file, top_n: [[top n hashtags ], [frequencies of corresponding hashtags]] } @@ -75,16 +75,17 @@ def get_occurrences(filename: str, n: int = 1) -> Dict[str, Any]: l = len(obj) tags = get_hashtags(obj) occs = {"total": l, "top_n": []} - occs["top_n"] = [[ele[i] for ele in tags[0 : max(l, n)]] for i in range(2)] + occs["top_n"] = [[ele[i] for ele in tags[0 : min(l, n)]] for i in range(2)] return occs def plot(n: int, occs: dict, img_folder: str): """Save plot of common hashtags as bar chart to file.""" - y_pos = list(reversed(range(n - 1))) + y_pos = list(reversed(range(len(occs[0]) - 1))) max_count = occs["top_n"][1][0] freqs = [count / max_count * 100 for count in occs["top_n"][1][1:]] labels = occs["top_n"][0][1:] + hashtag = occs["top_n"][0][0] fig, ax = plt.subplots(figsize=(5, 6.66)) ax.barh(y_pos, freqs) @@ -93,16 +94,16 @@ def plot(n: int, occs: dict, img_folder: str): ax.grid(axis="y") ax.set_xlabel("Percent of posts with common hashtag") ax.set_ylim(min(y_pos) - 1, max(y_pos) + 1) - ax.set_title(f'Common hashtags for #{occs["top_n"][0][0]} posts') + ax.set_title(f"Common hashtags for #{hashtag} posts") ax.xaxis.set_major_formatter(mtick.PercentFormatter(decimals=0)) - save_plot(img_folder) + save_plot(img_folder, hashtag) -def save_plot(img_folder): +def save_plot(img_folder, hashtag): """Save the plot as a png file in the folder ../data/imgs/""" now = datetime.now() current_time = now.strftime("%Y_%m_%d_%H_%M_%S") - filename = f"{img_folder}/{current_time}.png" + filename = f"{img_folder}/{hashtag}_{current_time}.png" logging.info(f"Plot saved to file: {filename}") plt.savefig(filename, bbox_inches="tight", facecolor="white", dpi=300) @@ -132,13 +133,17 @@ if __name__ == "__main__": raise ValueError( f"Specified argument `n` (the number of hashtags to analyze) must be greater than zero, not: {args.n}." ) - if not check_existence(args.input_file, "file"): + input_file = data_file = os.path.join( + FILES["data"], args.hashtag, FILES["posts"], FILES["data_file"] + ) + if not check_existence(input_file, "file"): raise FileNotFoundError( - f"Specified argument `input_file` ({args.input_file}) does not exist." + f"File {input_file}) for specified argument `hashtag` ({args.hashtag}) does not exist" ) - base = os.path.splitext(args.input_file)[0] + + base = os.path.splitext(input_file)[0] path = f"./{base}_sorted_hashtags.csv" - occs = get_occurrences(args.input_file, args.n) + occs = get_occurrences(input_file, args.n) if args.plot: plot(args.n, occs, img_folder) else: diff --git a/tiktok_downloader/run_downloader.py b/tiktok_downloader/run_downloader.py index 7d1f3ef..2ee8af3 100644 --- a/tiktok_downloader/run_downloader.py +++ b/tiktok_downloader/run_downloader.py @@ -17,28 +17,27 @@ import global_data import file_methods import data_methods - -logging.config.fileConfig("../logging.config") -logger = logging.getLogger("Logger") +logger = logging.getLogger() def create_parser() -> argparse.ArgumentParser: - """ - Creates the parser and the arguments for the user input. - """ + """Create the parser and the arguments for the user input.""" parser = argparse.ArgumentParser( description="Download the tiktoks for the requested hashtags" ) - parser.add_argument("-t", type=str, nargs="*", help="List of hashtags") - parser.add_argument("-f", type=str, help="File name with the list of hashtags") - parser.add_argument("-p", action="store_true", help="Download posts") - parser.add_argument("-v", action="store_true", help="Download videos") + parser.add_argument("-t", type=str, nargs="*", help="List of hashtags to scrape") + parser.add_argument( + "-f", type=str, help="File name containing list of hashtags to scrape" + ) + parser.add_argument("-p", action="store_true", help="Download post data") + parser.add_argument("-v", action="store_true", help="Download video files") return parser def get_hashtag_list(file_name: str) -> List[str]: + """Extract list of newline-separated hashtags from text file.""" if not file_methods.check_existence(file_name, "file"): raise OSError(f"{file_name} does not exist") with open(file_name) as f: @@ -49,10 +48,7 @@ def get_hashtag_list(file_name: str) -> List[str]: def set_download_settings(download_data_type: Dict[str, bool]) -> Dict[str, Any]: - """ - Loads the constants from global_data into the dict called settings and returns it. - Purpose - easy access to global constants by various functions. - """ + """Load the constants from global_data module into the `settings` dict.""" settings = { "data": global_data.FILES["data"], "ids": global_data.FILES["ids"], @@ -73,10 +69,13 @@ def set_download_settings(download_data_type: Dict[str, bool]) -> Dict[str, Any] def get_posts(settings: dict, tag: str) -> Optional[Tuple[str, int]]: - """ - 1. calls download_posts in file_methods.py to get the posts for a given hashtag - 2. calls extract_posts from data_methods.py to extract new posts if any - 3. calls update_posts from data_methods.py to update the id-list with the ids of newly downloaded posts. + """Scrape trending TikTok post data for the specified hashtag. + + 1. Calls `file_methods.download_posts` to scrape the post data for a given hashtag + 2. Calls `data_methods.extract_posts` to determine which if any posts + haven't previouly been downloaded. + 3. Calls `data_methods.update_posts` to update the ID list with the IDs of + newly downloaded posts. """ file_path = file_methods.download_posts(settings, tag) number_scraped = None @@ -96,11 +95,15 @@ def get_posts(settings: dict, tag: str) -> Optional[Tuple[str, int]]: def get_videos(settings: dict, tag: str) -> Optional[Tuple[str, int]]: - """ - 1. calls download_videos in file_methods.py to get the videos for a given hashtag - 2. calls extract_videos from data_methods.py to extract new videos if any - 3. calls update_videos from data_methods.py to update the id-list with the ids of newly downloaded videos. - 4. the clean_video_files function deletes the residual video folder after the data processing + """Scrape trending TikTok video files for the specified hashtag. + + 1. Calls `file_methods.download_videos` to download the video files for a given hashtag + 2. Calls `data_methods.extract_videos` to determine which if any videos + haven't previouly been downloaded. + 3. Calls `data_methods.update_videos` to update the ID list with the IDs of + newly downloaded videos. + 4. Calls `clean_video_files` function to delete the residual video folder + after the data processing. """ number_scraped = None download_list = file_methods.download_videos(settings, tag) @@ -117,10 +120,7 @@ def get_videos(settings: dict, tag: str) -> Optional[Tuple[str, int]]: def get_data( hashtags: list, download_data_type: Dict[str, bool] ) -> List[Tuple[str, Tuple[str, int]]]: - """ - The function checks for the user option "-p", "-v" or both and then - triggers the functions get_posts, get_videos or both, respectively. - """ + """Check command-line arguments and scrape posts/videos for specified hashtags.""" counter = 0 total_hashtags = len(hashtags) total_hashtags_offset = total_hashtags - 1 @@ -176,7 +176,7 @@ if __name__ == "__main__": if not (args.t or args.f): parser.error( - "No hashtags were given, please use either the `-t` flag or the `-f` flag to provide hashtags." + "No hashtags were given, please use either the `-t` flag or the `-f` flag to specify one or more hashtags." ) if not (args.p or args.v):