diff --git a/README.md b/README.md index 7d13107..02016b2 100644 --- a/README.md +++ b/README.md @@ -22,17 +22,17 @@ You should now be ready to start using the tool. ## About the tool ### Command-line arguments ``` -$ python run_downloader.py -h +python3 run_downloader.py --help usage: run_downloader.py [-h] [-t [T [T ...]]] [-f F] [-p] [-v] Download the tiktoks for the requested hashtags optional arguments: --h, --help show this help message and exit --t [T [T ...]] List of hashtags --f F File name with the list of hashtags --p Download posts --v Download videos + -h, --help show this help message and exit + -t [T [T ...]] List of hashtags to scrape + -f F File name containing list of hashtags to scrape + -p Download post data + -v Download video files ``` ### Structure of output data @@ -90,11 +90,11 @@ Note that video downloading is a time and data rate consuming task, as a result The script `hashtag_frequencies.py` analyzes the frequencies of top occurring hashtags in a given set of posts. ``` -python hashtag_frequencies.py --help -usage: hashtag_frequencies.py [-h] [-p] [-d] input_file n +$ python3 hashtag_frequencies.py --help +usage: hashtag_frequencies.py [-h] [-p] [-d] hashtag n positional arguments: - input_file The json hashtag file name + hashtag The hashtag of scraped posts to analyze n The number of top n occurrences optional arguments: @@ -107,7 +107,7 @@ Assume we want to analyze the 20 most frequently occurring hashtags in the downl - The results can be plotted and saved as a PNG file by executing the following command: - `python3 hashtag_frequencies.py -p ../data/london/posts/data.json 20` + `python3 hashtag_frequencies.py london 20 -p` which will produce a figure similar to that shown below:

@@ -118,31 +118,31 @@ Assume we want to analyze the 20 most frequently occurring hashtags in the downl - The results can be displayed in tabular form by executing the following command: - `python3 hashtag_frequencies.py -d ../data/london/posts/data.json 20` + `python3 hashtag_frequencies.py london 20 -d` which will produce a terminal output similar to the following: ``` - Rank Hashtag Occurrences Frequency - 0 london 962 1.0 - 1 fyp 493 0.5124740124740125 - 2 uk 238 0.24740124740124741 - 3 foryou 223 0.23180873180873182 - 4 foryoupage 186 0.19334719334719336 - 5 viral 177 0.183991683991684 - 6 fypシ 85 0.08835758835758836 - 7 funny 55 0.057172557172557176 - 8 xyzbca 52 0.05405405405405406 - 9 england 45 0.04677754677754678 - 10 british 44 0.04573804573804574 - 11 trending 39 0.04054054054054054 - 12 fy 33 0.034303534303534305 - 13 comedy 32 0.033264033264033266 - 14 roadman 28 0.029106029106029108 - 15 4u 27 0.028066528066528068 - 16 usa 26 0.02702702702702703 - 17 tiktok 26 0.02702702702702703 - 18 travel 21 0.02182952182952183 - 19 america 20 0.02079002079002079 + Rank Hashtag Occurrences Frequency + 0 london 960 1.0000 + 1 fyp 494 0.5146 + 2 uk 238 0.2479 + 3 foryou 221 0.2302 + 4 foryoupage 184 0.1917 + 5 viral 179 0.1865 + 6 fypシ 84 0.0875 + 7 funny 56 0.0583 + 8 xyzbca 51 0.0531 + 9 british 45 0.0469 + 10 england 44 0.0458 + 11 trending 40 0.0417 + 12 fy 33 0.0344 + 13 comedy 32 0.0333 + 14 roadman 28 0.0292 + 15 4u 27 0.0281 + 16 usa 26 0.0271 + 17 tiktok 26 0.0271 + 18 travel 21 0.0219 + 19 america 20 0.0208 ``` The `Frequency` column shows the ratio of the occurrence to the total number of downloaded posts. diff --git a/tiktok_downloader/data_methods.py b/tiktok_hashtag_analysis/data_methods.py similarity index 56% rename from tiktok_downloader/data_methods.py rename to tiktok_hashtag_analysis/data_methods.py index e4bd017..382e1f2 100644 --- a/tiktok_downloader/data_methods.py +++ b/tiktok_hashtag_analysis/data_methods.py @@ -1,31 +1,34 @@ -from typing import NamedTuple -import logging, logging.config +"""Utility functions that perform data processing related tasks. +""" + +from typing import NamedTuple, List, Tuple, Set, Optional, Dict, Any +import logging import file_methods -logging.config.fileConfig("../logging.config") -logger = logging.getLogger("Logger") - - -""" -The file contains several functions that perform data processing related tasks. -""" +logger = logging.getLogger() class Diff(NamedTuple): - ids: list + """Keep track of scraped post IDs and whether previously-scraped posts have been filtered.""" + + ids: Set[str] filter_posts: bool class Total(NamedTuple): + """Keep track of number of total and number of unique scraped posts.""" + total: int unique: int -def get_difference(tag: str, file_name: str, ids: list) -> NamedTuple: - """ - Compares two sets of ids and returns the difference of the two sets. - Purpose - user to filter out the new ids by comparing the set of id list (ids/post_ids.json or videos_ids.json) and the list of newly downloaded ids. +def get_difference(tag: str, file_name: str, ids: List[str]) -> Optional[Diff]: + """Find TikTok post IDs that haven't previously been scraped. + + Filter out the new posts for the hashtag `tag` by comparing the list of + post IDs contained in `filename` to the list of newly downloaded IDs + contained in `ids`. """ filter_posts = False current_id_data = file_methods.get_data(file_name) @@ -38,23 +41,25 @@ def get_difference(tag: str, file_name: str, ids: list) -> NamedTuple: if not new_ids: return None else: - new_ids = list(new_ids) total_new_ids = len(new_ids) if total_new_ids == total_current_ids: - filter_posts = False new_data = Diff(new_ids, filter_posts) else: new_data = Diff(new_ids, filter_posts) return new_data else: filter_posts = True - new_data = Diff(ids, filter_posts) + new_data = Diff(set(ids), filter_posts) return new_data -def extract_posts(settings: dict, file_name: str, tag: str) -> list: - """ - Takes the downloaded file by the tiktok-scraper that contains the posts, and returns the new posts after comparing it the list of posts (from the file ids/post_ids.json) already downloaded. +def extract_posts( + settings: Dict[Any, Any], file_name: str, tag: str +) -> Optional[Tuple[List[str], List[Dict]]]: + """Find TikTok posts that haven't previously been scraped. + + Compares the file downloaded by tiktok-scraper to the list of + previously-scraped posts (from the file ids/post_ids.json). """ ids = [] posts = [] @@ -65,6 +70,7 @@ def extract_posts(settings: dict, file_name: str, tag: str) -> list: if not ids: logger.warn(f"No posts were found for the hashtag: {tag}") + return None status = file_methods.check_existence(settings["post_ids"], "file") if not status: @@ -74,18 +80,19 @@ def extract_posts(settings: dict, file_name: str, tag: str) -> list: new_ids = get_difference(tag, settings["post_ids"], ids) if not new_ids: logger.warn(f"No new posts were found for the hashtag: {tag}") + return None elif new_ids.filter_posts: new_posts = [post for post in posts if post["id"] in new_ids.ids] - new_data = (new_ids.ids, new_posts) - return new_data + return (list(new_ids.ids), new_posts) else: - new_data = (new_ids.ids, posts) - return new_data + return (list(new_ids.ids), posts) -def extract_videos(settings: dict, tag: str, download_list: list) -> list: - """ - Tiktok-scraper downloads the videos and puts them in a folder - the list of ids of the downloaded videos is fed to this function as download_list. The function returns the set of new videos after comparing it the list of videos (from the file ids/videos_ids.json) already downloaded. +def extract_videos(settings: dict, tag: str, download_list: List[str]) -> List[str]: + """Find TikTok videos that haven't previously been scraped. + + Compares the file downloaded by tiktok-scraper to the list of + previously-scraped videos (from the file ids/video_ids.json). """ status = file_methods.check_existence(settings["video_ids"], "file") if not status: @@ -97,43 +104,44 @@ def extract_videos(settings: dict, tag: str, download_list: list) -> list: logger.warn( f"No new videos were found for the {tag} in the downloaded folder." ) - return None + return [] else: - return new_videos.ids + return list(new_videos.ids) def update_posts( - file_path: str, file_type: str, new_data: list, tag: str = None -) -> tuple: - """ - Updates the list of post ids (in the file ids/post_ids.json) with the ids of the new posts. + file_path: str, file_type: str, new_data: List[Any], tag: str = None +) -> Optional[Tuple[str, int]]: + """Update the file containing scraped post IDs (`ids/post_ids.json`) with + the IDs of the recently scraped posts. """ status = file_methods.check_existence(file_path, file_type) if not tag: file_methods.post_writer(file_path, new_data, status) + return None else: scraped_data = file_methods.id_writer(file_path, new_data, tag, status) return scraped_data -def update_videos(settings: str, new_data: list, tag: str) -> tuple: - """ - Updates the list of video ids (in the file ids/video_ids.json) with the ids of the new videos. +def update_videos( + settings: Dict[str, Any], new_data: List[str], tag: str +) -> Tuple[str, int]: + """Update the file containing video IDs (`ids/video_ids.json`) with the IDs + of the recently scraped videos. """ file_path = settings["video_ids"] file_methods.check_file(file_path, "file") - log = file_methods.id_writer(file_path, new_data, tag, True) + number_scraped = file_methods.id_writer(file_path, new_data, tag, True) file_methods.clean_video_files(settings, tag, new_data) - return log + return number_scraped -def get_total_posts(file_path: str, tag: str) -> NamedTuple: - """ - Returns total count of ids in a id list along with the number of unique ids among them. - """ +def get_total_posts(file_path: str, tag: str) -> Total: + """Count number of total scraped posts and number of unique scraped posts.""" status = file_methods.check_existence(file_path, "file") if not status: - raise OSError("{file_path} not found!") + raise OSError(f"{file_path} not found!") else: data = file_methods.get_data(file_path) total_posts = len(data[tag]) @@ -143,9 +151,7 @@ def get_total_posts(file_path: str, tag: str) -> NamedTuple: def print_total(file_path: str, tag: str, data_type: str): - """ - Prints the total count for posts or videos for a hashtag. Calls the function get_total_posts for sanity check that there are no repeating ids in the id lists. - """ + """Print number of total and unique scraped posts, warn if any non-unique posts.""" total = get_total_posts(file_path, tag) if total.total == total.unique: logger.info(f"Scraped {total.total} {data_type} containing the hashtag '{tag}'") diff --git a/tiktok_downloader/file_methods.py b/tiktok_hashtag_analysis/file_methods.py similarity index 61% rename from tiktok_downloader/file_methods.py rename to tiktok_hashtag_analysis/file_methods.py index e9d4328..e9c7256 100644 --- a/tiktok_downloader/file_methods.py +++ b/tiktok_hashtag_analysis/file_methods.py @@ -1,23 +1,21 @@ +"""Utility functions that operate on files, such as writing to reading from a file. +""" + import os import json import subprocess from datetime import datetime import shutil +from typing import Tuple, List, Optional, Dict, Any import logging, logging.config logging.config.fileConfig("../logging.config") -logger = logging.getLogger("Logger") - -""" -The file contains the functions that operate on files, such as writing or reading from files etc. -""" +logger = logging.getLogger() def create_file(name: str, file_type: str): - """ - Creates a file or directory. - """ + """Create a file or directory.""" if file_type == "dir": os.makedirs(name, mode=0o777) elif file_type == "file": @@ -28,9 +26,7 @@ def create_file(name: str, file_type: str): def check_existence(file_path: str, file_type: str): - """ - Checks the existence of a file or a directory. If not found, returns False, else returns True. - """ + """Check if a file or a directory exists.""" if file_type == "file": return os.path.isfile(file_path) elif file_type == "dir": @@ -40,85 +36,92 @@ def check_existence(file_path: str, file_type: str): def check_file(file_path: str, file_type: str): - """ - Creates a file or directory, if not found. Else, returns nothing. - """ + """If path does not exist, creates a file or directory.""" status = check_existence(file_path, file_type) if not status: create_file(file_path, file_type) -def download_posts(settings: dict, tag: str): - """ - Runs the tiktok-scraper command to download posts for a given hashtag. - Returns the path to the downloaded file of posts. If no file was downloaded, prints the error and returns nothing in order to move on. - os.chdir is used to execute shell commands in the right folders and then reused to come back to the original folder of execution of run_downloader script. +def download_posts(settings: Dict, tag: str, output_dir: Any): + """Run the tiktok-scraper command to download posts for a given hashtag. + + Returns the path to the downloaded file of posts. If no file was downloaded, + prints the error and returns nothing in order to move on. + + os.chdir is used to execute shell commands in the correct folder and then + reused to return to the original folder of execution of run_downloader script. """ path = os.path.join(settings["data"], tag, settings["posts"]) - os.chdir(path) - tiktok_command = f"tiktok-scraper hashtag {tag} -t 'json'" + os.makedirs(path, exist_ok=True) + tiktok_command = f"tiktok-scraper hashtag {tag} -t 'json' --filepath {output_dir}" output = subprocess.check_output(tiktok_command, shell=True, encoding="utf-8") new_file = output.split()[-1] if "json" in new_file: - os.chdir("../../../tiktok_downloader") return new_file else: logger.warn( f"Something's wrong with what is returned by tiktok-scraper for the hashtag {tag} - *{new_file}* is not a json file.\n\ntiktok-scraper returned {output}" ) - os.chdir("../../../tiktok_downloader") -def download_videos(settings: dict, tag: str): - """ - Runs the tiktok-scraper command to download videos for a given hashtag. Note that all the videos are downloaded that are returned by the tiktok api and as a result, its a time and data consuming process. - The list of downloaded video ids is constucted and returned if the downloaded folder contains at least 1 video. - os.chdir is used to execute shell commands in the right folders and then reused to come back to the original folder of execution of run_downloader script. +def download_videos(settings: Dict, tag: str): + """Run the tiktok-scraper command to download videos for a given hashtag. + + Note that all the videos are downloaded that are returned by the TikTok API, + making this a time- and data-intensive process. + The list of downloaded video IDs is constucted and returned if the + downloaded folder contains at least 1 video. + + os.chdir is used to execute shell commands in the correct folder and then + reused to return to the original folder of execution of run_downloader script. """ path = os.path.join(settings["data"], tag, settings["videos"]) - os.chdir(path) - tiktok_command = f"tiktok-scraper hashtag {tag} -d" + os.makedirs(path, exist_ok=True) + tiktok_command = f"tiktok-scraper hashtag {tag} -d --filepath {path}" result = subprocess.check_output(tiktok_command, shell=True) - downloaded_list_tmp = os.listdir(f"./#{tag}") + downloaded_list_tmp = os.listdir(os.path.join(path, f"#{tag}")) if downloaded_list_tmp: downloaded_list = [] for file in downloaded_list_tmp: file = file.split(".")[0] downloaded_list.append(file) - os.chdir("../../../tiktok_downloader") return downloaded_list else: logger.warn(f"No video files were downloaded for the hashtag {tag}.") - os.chdir("../../../tiktok_downloader") shutil.rmtree(settings["videos_delete"]) -def get_data(file_path: str) -> list: - """ - Reads the json file and retuns the read data. - """ +def get_data(file_path: str) -> Any: + """Read a JSON file and return the read data.""" with open(file_path, "r", encoding="utf-8") as f: data = json.load(f) - return data + return data -def dump_data(file_path: str, data: list): - """ - Writes the data to the json file. - """ +def dump_data(file_path: str, data: Any): + """Write data to a JSON file.""" with open(file_path, "w", encoding="utf-8") as f: json.dump(data, f) -def log_writer(log_data: list): - """ - Creates the dictionary of total downloads (posts and videos) per hashtag. - Example : { timetamp : { hashtag : { videos : number_of_new_videos , posts : number_of_new_posts } } } - Writes the dictionary to the log file (logs/log.json). +def log_writer(log_data: List[Tuple[str, Tuple[str, int]]]): + """Create the dictionary of total downloads (posts and videos) per hashtag. + + Example : { + timetamp : { + hashtag : { + videos : number_of_new_videos , + posts : number_of_new_posts + } + } + } + + Writes the dictionary to the log file (`logs/log.json`). """ + total = 0 - scraped_summary_dict: dict = {} + scraped_summary_dict = {} # type: Dict[str, Dict[str, int]] for hashtag, (data_type, count) in log_data: if hashtag in scraped_summary_dict: if data_type in scraped_summary_dict[hashtag]: @@ -130,18 +133,18 @@ def log_writer(log_data: list): scraped_summary_dict[hashtag] = {data_type: count} total += count - now = datetime.now() - now_str = now.strftime("%d-%m-%Y %H:%M:%S") + now_str = datetime.now().strftime("%d-%m-%Y %H:%M:%S") data = {now_str: scraped_summary_dict} - logger.warn(f"Logged post data: {data}") + logger.debug(f"Logged post data: {data}") logger.info(f"Successfully scraped {total} total entries") -def id_writer(file_path: str, new_data: list, tag: str, status: bool) -> tuple: - """ - Writes the list of new ids to the post_ids or video_ids files. - """ +def id_writer( + file_path: str, new_data: List[str], tag: str, status: bool +) -> Tuple[str, int]: + """Write the list of new ids to the post_ids or video_ids file.""" + total = len(new_data) if status: try: @@ -162,9 +165,9 @@ def id_writer(file_path: str, new_data: list, tag: str, status: bool) -> tuple: return number_scraped -def post_writer(file_path: str, new_data: list, status: bool): - """ - Writes the new posts in the post file of the given hashtag (/data/{hashtag}/posts/data.json) +def post_writer(file_path: str, new_data: List[Dict], status: bool): + """Write the new posts in the post file of the given hashtag + (`/data/{hashtag}/posts/data.json`). """ total = len(new_data) if status: @@ -182,9 +185,7 @@ def post_writer(file_path: str, new_data: list, status: bool): def delete_file(file_path: str, file_type: str): - """ - Deletes the directory or the file. - """ + """Delete a directory or file.""" if not check_existence(file_path, file_type): raise OSError(f"Attempt to delete file failed: {file_path} does not exist") elif file_type == "file": @@ -197,9 +198,8 @@ def delete_file(file_path: str, file_type: str): raise OSError("{file_type} needs to be either 'file' or 'dir'") -def clean_video_files(settings: dict, tag: str, new_data: list = None): - """ - Moves the new videos from the tiktok-scraper video folder to /data/{hashtag}/videos/ +def clean_video_files(settings: dict, tag: str, new_data: Optional[List[str]] = None): + """Move the new videos from the tiktok-scraper video folder to `/data/{hashtag}/videos/`. Deletes the residual tiktok-scraper video folder. """ if new_data: diff --git a/tiktok_downloader/global_data.py b/tiktok_hashtag_analysis/global_data.py similarity index 66% rename from tiktok_downloader/global_data.py rename to tiktok_hashtag_analysis/global_data.py index 85d4939..ed8c317 100644 --- a/tiktok_downloader/global_data.py +++ b/tiktok_hashtag_analysis/global_data.py @@ -1,5 +1,4 @@ -""" -Contains global constants relating to paths and operational parameters such as sleep time between consecutive tiktok-scraper calls. +"""Specify global constants including file paths and scraping options. """ @@ -15,7 +14,6 @@ POST_IDS = "post_ids.json" VIDEO_IDS = "video_ids.json" DATA_FILE = "data.json" - FILES = { "data": DATA, "ids": IDS, @@ -28,12 +26,7 @@ FILES = { "downloads": [], } - -# Commands -tag = "" - PARAMETERS = { "scraper_attempts": 3, - # "number_of_videos" : 3, # Number of videos to be downloaded by tiktok-scraper. "sleep": 8, } diff --git a/tiktok_downloader/hashtag_frequencies.py b/tiktok_hashtag_analysis/hashtag_frequencies.py similarity index 51% rename from tiktok_downloader/hashtag_frequencies.py rename to tiktok_hashtag_analysis/hashtag_frequencies.py index 1650f73..aa3119c 100644 --- a/tiktok_downloader/hashtag_frequencies.py +++ b/tiktok_hashtag_analysis/hashtag_frequencies.py @@ -1,118 +1,40 @@ +"""Analyze the frequency of hashtags appearing in the set of given posts. + +- The "hashtag" positional argument specifies the hashtag of scraped posts to analyze +- The "n" positional argument specifies how many hashtags does the user wants to analyze +- Specifying the "-d" flag prints the hashtag frequencies on the shell +- Specifying the "-p" flag plots the hashtag frequencies and saves as a png file +""" + import os import json import argparse from datetime import datetime import warnings - -warnings.filterwarnings("ignore", message="Glyph (.*) missing from current font") +from typing import List, Tuple, Dict, Any import logging import matplotlib.pyplot as plt import matplotlib.ticker as mtick import seaborn as sns -sns.set_theme(style="darkgrid") - from file_methods import check_file, check_existence -from global_data import IMAGES +from global_data import IMAGES, FILES -""" -Plots the frequency of hashtags appearing in the set of given posts. -""" +warnings.filterwarnings("ignore", message="Glyph (.*) missing from current font") +sns.set_theme(style="darkgrid") +logger = logging.getLogger() -def get_hashtags(obj): - if not obj: - raise ValueError(f"Empty item, no hashtags to be extracted.") - else: - hashtags = {} - tags = [[tag["name"] for tag in ele["hashtags"]] for ele in obj] - tags = [set(ele) for ele in tags] - { - tag: ( - 1 - if tag not in hashtags and not hashtags.update({tag: 1}) - else hashtags[tag] + 1 and not hashtags.update({tag: hashtags[tag] + 1}) - ) - for ele in tags - for tag in ele - } - hashtags = sorted(hashtags.items(), key=lambda e: e[1], reverse=True) - - return hashtags - - -def get_occurrences(filename, n=1, sort=True): - """ - Takes the json file containing posts and returns a dictionary: - local variable occs = { - "total": total posts in the file, - top_n: [[top n hashtags ], [frequencies of corresponding hashtags]] - } - """ - with open(filename) as f: - obj = json.load(f) - l = len(obj) - tags = get_hashtags(obj) - occs = {"total": l, "top_n": []} - occs["top_n"] = [[ele[i] for ele in tags[0:n]] for i in range(2)] - return occs - - -def plot(n, occs, img_folder): - y_pos = list(reversed(range(n - 1))) - max_count = occs["top_n"][1][0] - freqs = [count / max_count * 100 for count in occs["top_n"][1][1:]] - labels = occs["top_n"][0][1:] - - fig, ax = plt.subplots(figsize=(5, 6.66)) - ax.barh(y_pos, freqs) - ax.set_yticks(y_pos) - ax.set_yticklabels(labels) - ax.grid(axis="y") - ax.set_xlabel("Percent of posts with common hashtag") - ax.set_ylim(min(y_pos) - 1, max(y_pos) + 1) - ax.set_title(f'Common hashtags for #{occs["top_n"][0][0]} posts') - ax.xaxis.set_major_formatter(mtick.PercentFormatter(decimals=0)) - save_plot(img_folder) - - -def print_occurrences(occs): - """ - Prints the top n hashtags with their frequencies and the ratio of occurrences and total posts, all to the shell. - """ - row_number = 0 - total_posts = occs["total"] - print( - "{:<8} {:<15} {:<15} {:<15}".format( - "Rank", "Hashtag", "Occurrences", "Frequency" - ) - ) - for key, value in zip(occs["top_n"][0], occs["top_n"][1]): - ratio = value / total_posts - print("{:<8} {:<15} {:<15} {:<15}".format(row_number, key, value, ratio)) - row_number += 1 - print(f"Total posts: {total_posts}") - - -def save_plot(img_folder): - """ - Saves the plot to a png file in the folder /data/imgs/ - """ - now = datetime.now() - current_time = now.strftime("%Y_%m_%d_%H_%M_%S") - filename = f"{img_folder}/{current_time}.png" - logging.info(f"Plot saved to file: {filename}") - plt.savefig(filename, bbox_inches="tight", facecolor="white", dpi=300) - - -def create_parser(): - """ - Creates the parser and the arguments for the user input. - """ +def create_parser() -> argparse.ArgumentParser: + """Create the parser and the arguments for the user input.""" parser = argparse.ArgumentParser() - parser.add_argument("input_file", help="The json hashtag file name") - parser.add_argument("n", help="The number of top n occurrences", type=int) + parser.add_argument( + "hashtag", + type=str, + help="The hashtag of scraped posts to analyze", + ) + parser.add_argument("n", type=int, help="The number of top n occurrences") parser.add_argument( "-p", "--plot", help="Plot the occurrences", action="store_true" ) @@ -122,14 +44,89 @@ def create_parser(): return parser -if __name__ == "__main__": - """ - Option "n" specifies how many hashtags does the user wants to plot. - "-d" option prints the hashtag frequencies on the shell - "-p" option plots the hashtag frequencies and saves as a png file in the folder /data/imgs/ +def get_hashtags(obj: Dict) -> List[Tuple[str, int]]: + if not obj: + raise ValueError(f"Empty item, no hashtags could be extracted.") + else: + hashtags = {} + tags = [set([tag["name"] for tag in ele["hashtags"]]) for ele in obj] + { + tag: ( + 1 + if tag not in hashtags and not hashtags.update({tag: 1}) + else hashtags[tag] + 1 and not hashtags.update({tag: hashtags[tag] + 1}) + ) + for ele in tags + for tag in ele + } - The function get_occurrences is triggered to compute and return the top n occurrences and the hashtags. + return sorted(hashtags.items(), key=lambda e: e[1], reverse=True) + + +def get_occurrences(filename: str, n: int = 1) -> Dict[str, Any]: + """Aggregate hashtag frequency information for a specified JSON file. + + Example: { + "total": total posts in the file, + top_n: [[top n hashtags ], [frequencies of corresponding hashtags]] + } """ + with open(filename) as f: + obj = json.load(f) + l = len(obj) + tags = get_hashtags(obj) + occs = {"total": l, "top_n": []} + occs["top_n"] = [[ele[i] for ele in tags[0 : min(l, n)]] for i in range(2)] + return occs + + +def plot(occs: dict, img_folder: str): + """Save plot of common hashtags as bar chart to file.""" + y_pos = list(reversed(range(len(occs["top_n"][0]) - 1))) + max_count = occs["top_n"][1][0] + freqs = [count / max_count * 100 for count in occs["top_n"][1][1:]] + labels = occs["top_n"][0][1:] + hashtag = occs["top_n"][0][0] + + fig, ax = plt.subplots(figsize=(5, 6.66)) + ax.barh(y_pos, freqs) + ax.set_yticks(y_pos) + ax.set_yticklabels(labels) + ax.grid(axis="y") + ax.set_xlabel("Percent of posts with common hashtag") + ax.set_ylim(min(y_pos) - 1, max(y_pos) + 1) + ax.set_title(f"Common hashtags for #{hashtag} posts") + ax.xaxis.set_major_formatter(mtick.PercentFormatter(decimals=0)) + save_plot(img_folder, hashtag) + + +def save_plot(img_folder, hashtag): + """Save the plot as a png file in the folder ../data/imgs/""" + now = datetime.now() + current_time = now.strftime("%Y_%m_%d_%H_%M_%S") + filename = f"{img_folder}/{hashtag}_{current_time}.png" + logging.info(f"Plot saved to file: {filename}") + plt.savefig(filename, bbox_inches="tight", facecolor="white", dpi=300) + + +def print_occurrences(occs): + """Print information about the top n hashtags and their frequencies.""" + row_number = 0 + total_posts = occs["total"] + print( + "{:<8} {:<30} {:<15} {:<15}".format( + "Rank", "Hashtag", "Occurrences", "Frequency" + ) + ) + for key, value in zip(occs["top_n"][0], occs["top_n"][1]): + ratio = value / total_posts + print("{:<8} {:<30} {:<15} {:.4f}".format(row_number, key, value, ratio)) + row_number += 1 + print(f"Total posts: {total_posts}") + + +if __name__ == "__main__": + img_folder = IMAGES check_file(img_folder, "dir") parser = create_parser() @@ -138,14 +135,18 @@ if __name__ == "__main__": raise ValueError( f"Specified argument `n` (the number of hashtags to analyze) must be greater than zero, not: {args.n}." ) - if not check_existence(args.input_file, "file"): + input_file = data_file = os.path.join( + FILES["data"], args.hashtag, FILES["posts"], FILES["data_file"] + ) + if not check_existence(input_file, "file"): raise FileNotFoundError( - f"Specified argument `input_file` ({args.input_file}) does not exist." + f"File ({input_file}) for specified argument `hashtag` ({args.hashtag}) does not exist." ) - base = os.path.splitext(args.input_file)[0] + + base = os.path.splitext(input_file)[0] path = f"./{base}_sorted_hashtags.csv" - occs = get_occurrences(args.input_file, args.n) + occs = get_occurrences(input_file, args.n) if args.plot: - plot(args.n, occs, img_folder) + plot(occs, img_folder) else: print_occurrences(occs) diff --git a/tiktok_downloader/hashtag_list.txt b/tiktok_hashtag_analysis/hashtag_list.txt similarity index 100% rename from tiktok_downloader/hashtag_list.txt rename to tiktok_hashtag_analysis/hashtag_list.txt diff --git a/tiktok_downloader/run_downloader.py b/tiktok_hashtag_analysis/run_downloader.py similarity index 50% rename from tiktok_downloader/run_downloader.py rename to tiktok_hashtag_analysis/run_downloader.py index 3713e5a..0746e24 100644 --- a/tiktok_downloader/run_downloader.py +++ b/tiktok_hashtag_analysis/run_downloader.py @@ -1,18 +1,44 @@ +"""Download post data or videos from TikToks containing one or more specified hashtags. + +- The "-p" flag specifies that only data from posts is downloaded, no video files +- The "-v" flag specifies that only video files are downloaded, no post data +- Specifying both "-p" and "-v" flags downloads both post data and video files +- The "-t" flag allows the user to specify a list of space-separated hashtags as an argument +- The "-f" flag allows the user to specify the filename of a text file containing a list of newline-separated hashtags as an argument +""" + import os import time import argparse import logging, logging.config +from typing import List, Tuple, Dict, Any, Optional +from tempfile import TemporaryDirectory import global_data import file_methods import data_methods - -logging.config.fileConfig("../logging.config") -logger = logging.getLogger("Logger") +logger = logging.getLogger() -def get_hashtag_list(file_name: str) -> list: +def create_parser() -> argparse.ArgumentParser: + """Create the parser and the arguments for the user input.""" + parser = argparse.ArgumentParser( + description="Download the tiktoks for the requested hashtags" + ) + + parser.add_argument("-t", type=str, nargs="*", help="List of hashtags to scrape") + parser.add_argument( + "-f", type=str, help="File name containing list of hashtags to scrape" + ) + parser.add_argument("-p", action="store_true", help="Download post data") + parser.add_argument("-v", action="store_true", help="Download video files") + + return parser + + +def get_hashtag_list(file_name: str) -> List[str]: + """Extract list of newline-separated hashtags from text file.""" if not file_methods.check_existence(file_name, "file"): raise OSError(f"{file_name} does not exist") with open(file_name) as f: @@ -22,32 +48,14 @@ def get_hashtag_list(file_name: str) -> list: return tags -def create_parser(): - """ - Creates the parser and the arguments for the user input. - """ - parser = argparse.ArgumentParser( - description="Download the tiktoks for the requested hashtags" - ) - - parser.add_argument("-t", type=str, nargs="*", help="List of hashtags") - parser.add_argument("-f", type=str, help="File name with the list of hashtags") - parser.add_argument("-p", action="store_true", help="Download posts") - parser.add_argument("-v", action="store_true", help="Download videos") - - return parser - - -def set_download_settings(download_data_type: str) -> dict: - """ - Loads the constants from global_data into the dict called settings and returns it. - Purpose - easy access to global constants by various functions. - """ - settings = {} - settings["data"] = global_data.FILES["data"] - settings["ids"] = global_data.FILES["ids"] - settings["sleep"] = global_data.PARAMETERS["sleep"] - settings["scraper"] = global_data.PARAMETERS["scraper_attempts"] +def set_download_settings(download_data_type: Dict[str, bool]) -> Dict[str, Any]: + """Load the constants from global_data module into the `settings` dict.""" + settings = { + "data": global_data.FILES["data"], + "ids": global_data.FILES["ids"], + "sleep": global_data.PARAMETERS["sleep"], + "scraper": global_data.PARAMETERS["scraper_attempts"], + } file_methods.check_file(f"{settings['data']}/{settings['ids']}", "dir") if download_data_type["posts"]: settings["posts"] = global_data.FILES["posts"] @@ -61,37 +69,44 @@ def set_download_settings(download_data_type: str) -> dict: return settings -def get_posts(settings: dict, tag: str) -> tuple: +def get_posts(settings: dict, tag: str) -> Optional[Tuple[str, int]]: + """Scrape trending TikTok post data for the specified hashtag. + + 1. Calls `file_methods.download_posts` to scrape the post data for a given hashtag + 2. Calls `data_methods.extract_posts` to determine which if any posts + haven't previouly been downloaded. + 3. Calls `data_methods.update_posts` to update the ID list with the IDs of + newly downloaded posts. """ - 1. calls download_posts in file_methods.py to get the posts for a given hashtag - 2. calls extract_posts from data_methods.py to extract new posts if any - 3. calls update_posts from data_methods.py to update the id-list with the ids of newly downloaded posts. - """ - file_path = file_methods.download_posts(settings, tag) - number_scraped = () - if file_path: - new_data = data_methods.extract_posts(settings, file_path, tag) - if new_data: - data_file = os.path.join( - settings["data"], tag, settings["posts"], settings["data_file"] - ) - data_methods.update_posts(data_file, "file", new_data[1]) - number_scraped = data_methods.update_posts( - settings["post_ids"], "file", new_data[0], tag - ) - file_methods.delete_file(file_path, "file") + with TemporaryDirectory() as temp_dir: + file_path = file_methods.download_posts(settings, tag, temp_dir) + number_scraped = None + if file_path: + new_data = data_methods.extract_posts(settings, file_path, tag) + if new_data: + data_file = os.path.join( + settings["data"], tag, settings["posts"], settings["data_file"] + ) + data_methods.update_posts(data_file, "file", new_data[1]) + number_scraped = data_methods.update_posts( + settings["post_ids"], "file", new_data[0], tag + ) return number_scraped -def get_videos(settings: dict, tag: str) -> tuple: +def get_videos(settings: dict, tag: str) -> Optional[Tuple[str, int]]: + """Scrape trending TikTok video files for the specified hashtag. + + 1. Calls `file_methods.download_videos` to download the video files for a given hashtag + 2. Calls `data_methods.extract_videos` to determine which if any videos + haven't previouly been downloaded. + 3. Calls `data_methods.update_videos` to update the ID list with the IDs of + newly downloaded videos. + 4. Calls `clean_video_files` function to delete the residual video folder + after the data processing. """ - 1. calls download_videos in file_methods.py to get the videos for a given hashtag - 2. calls extract_videos from data_methods.py to extract new videos if any - 3. calls update_videos from data_methods.py to update the id-list with the ids of newly downloaded videos. - 4. the clean_video_files function deletes the residual video folder after the data processing - """ - number_scraped = () + number_scraped = None download_list = file_methods.download_videos(settings, tag) if download_list: new_data = data_methods.extract_videos(settings, tag, download_list) @@ -103,11 +118,10 @@ def get_videos(settings: dict, tag: str) -> tuple: return number_scraped -def get_data(hashtags: list, download_data_type: str) -> list: - """ - The function checks for the user option "-p", "-v" or both and then - triggers the functions get_posts, get_videos or both, respectively. - """ +def get_data( + hashtags: list, download_data_type: Dict[str, bool] +) -> List[Tuple[str, Tuple[str, int]]]: + """Check command-line arguments and scrape posts/videos for specified hashtags.""" counter = 0 total_hashtags = len(hashtags) total_hashtags_offset = total_hashtags - 1 @@ -145,10 +159,9 @@ def get_data(hashtags: list, download_data_type: str) -> list: ) settings["videos_delete"] = settings["data"] + f"/{tag}/videos/#{tag}" settings["videos_to"] = settings["data"] + f"/{tag}/videos" - res = get_videos(settings, tag) - if res: - res = (res[0], ("videos", res[1])) - scraped_summary_list.append(res) + _res = get_videos(settings, tag) + if _res: + scraped_summary_list.append((_res[0], ("videos", _res[1]))) data_methods.print_total(settings["video_ids"], tag, "videos") counter += 1 @@ -164,12 +177,12 @@ if __name__ == "__main__": if not (args.t or args.f): parser.error( - "No hashtags were given, please use either -t option or -f to provide hashtags." + "No hashtags were given, please use either the `-t` flag or the `-f` flag to specify one or more hashtags." ) if not (args.p or args.v): parser.error( - "No argument given, please specify either -p for posts or -v videos or both." + "No argument given, please specify either the `-p` flag to download post data or the `-v` flag to download video files, or both." ) if args.t: @@ -181,7 +194,7 @@ if __name__ == "__main__": logger.info(f"Hashtags to scrape: {hashtags}") if not hashtags: raise ValueError( - "No hashtags were specified: please use either the -t flag to specify a sspace-separated list of one or more hashtags as a command-line argument, or use the -f flag to specify a text file of newline-separated hashtags." + "No hashtags were specified: please use either the `-t` flag to specify a sspace-separated list of one or more hashtags as a command-line argument, or use the `-f` flag to specify a text file of newline-separated hashtags." ) download_data_type = {"posts": args.p, "videos": args.v}