diff --git a/tiktok_downloader/data_methods.py b/tiktok_downloader/data_methods.py index 2211814..e4bd017 100644 --- a/tiktok_downloader/data_methods.py +++ b/tiktok_downloader/data_methods.py @@ -1,4 +1,4 @@ -from collections import namedtuple +from typing import NamedTuple import logging, logging.config import file_methods @@ -12,17 +12,23 @@ The file contains several functions that perform data processing related tasks. """ -diff = namedtuple("difference", "ids filter_posts") -total = namedtuple("total", "total unique") +class Diff(NamedTuple): + ids: list + filter_posts: bool -def get_difference(tag, file, ids): +class Total(NamedTuple): + total: int + unique: int + + +def get_difference(tag: str, file_name: str, ids: list) -> NamedTuple: """ Compares two sets of ids and returns the difference of the two sets. Purpose - user to filter out the new ids by comparing the set of id list (ids/post_ids.json or videos_ids.json) and the list of newly downloaded ids. """ filter_posts = False - current_id_data = file_methods.get_data(file) + current_id_data = file_methods.get_data(file_name) if tag in current_id_data: current_ids = current_id_data[tag] set_current_ids = set(current_ids) @@ -36,17 +42,17 @@ def get_difference(tag, file, ids): total_new_ids = len(new_ids) if total_new_ids == total_current_ids: filter_posts = False - new_data = diff(new_ids, filter_posts) + new_data = Diff(new_ids, filter_posts) else: - new_data = diff(new_ids, filter_posts) + new_data = Diff(new_ids, filter_posts) return new_data - else: + else: filter_posts = True - new_data = diff(ids, filter_posts) + new_data = Diff(ids, filter_posts) return new_data -def extract_posts(settings, file_name, tag): +def extract_posts(settings: dict, file_name: str, tag: str) -> list: """ Takes the downloaded file by the tiktok-scraper that contains the posts, and returns the new posts after comparing it the list of posts (from the file ids/post_ids.json) already downloaded. """ @@ -59,7 +65,7 @@ def extract_posts(settings, file_name, tag): if not ids: logger.warn(f"No posts were found for the hashtag: {tag}") - + status = file_methods.check_existence(settings["post_ids"], "file") if not status: new_data = (ids, posts) @@ -69,7 +75,7 @@ def extract_posts(settings, file_name, tag): if not new_ids: logger.warn(f"No new posts were found for the hashtag: {tag}") elif new_ids.filter_posts: - new_posts = [post for post in posts if post['id'] in new_ids.ids] + new_posts = [post for post in posts if post["id"] in new_ids.ids] new_data = (new_ids.ids, new_posts) return new_data else: @@ -77,7 +83,7 @@ def extract_posts(settings, file_name, tag): return new_data -def extract_videos(settings, tag, download_list): +def extract_videos(settings: dict, tag: str, download_list: list) -> list: """ Tiktok-scraper downloads the videos and puts them in a folder - the list of ids of the downloaded videos is fed to this function as download_list. The function returns the set of new videos after comparing it the list of videos (from the file ids/videos_ids.json) already downloaded. """ @@ -88,13 +94,17 @@ def extract_videos(settings, tag, download_list): else: new_videos = get_difference(tag, settings["video_ids"], download_list) if not new_videos: - logger.warn(f"No new videos were found for the {tag} in the downloaded folder.") + logger.warn( + f"No new videos were found for the {tag} in the downloaded folder." + ) return None else: return new_videos.ids -def update_posts(file_path, file_type, new_data, tag=None): +def update_posts( + file_path: str, file_type: str, new_data: list, tag: str = None +) -> tuple: """ Updates the list of post ids (in the file ids/post_ids.json) with the ids of the new posts. """ @@ -106,7 +116,7 @@ def update_posts(file_path, file_type, new_data, tag=None): return scraped_data -def update_videos(settings, new_data, tag): +def update_videos(settings: str, new_data: list, tag: str) -> tuple: """ Updates the list of video ids (in the file ids/video_ids.json) with the ids of the new videos. """ @@ -117,7 +127,7 @@ def update_videos(settings, new_data, tag): return log -def get_total_posts(file_path, tag): +def get_total_posts(file_path: str, tag: str) -> NamedTuple: """ Returns total count of ids in a id list along with the number of unique ids among them. """ @@ -128,16 +138,18 @@ def get_total_posts(file_path, tag): data = file_methods.get_data(file_path) total_posts = len(data[tag]) unique = len(set(data[tag])) - t = total(total_posts, unique) + t = Total(total_posts, unique) return t -def print_total(file_path, tag, data_type): +def print_total(file_path: str, tag: str, data_type: str): """ Prints the total count for posts or videos for a hashtag. Calls the function get_total_posts for sanity check that there are no repeating ids in the id lists. """ total = get_total_posts(file_path, tag) - if (total.total == total.unique): + if total.total == total.unique: logger.info(f"Scraped {total.total} {data_type} containing the hashtag '{tag}'") else: - logger.warn(f"Out of total {data_type} for the hashtag {tag} {total.total}, only {total.unique} are unique. Something is going wrong...") + logger.warn( + f"Out of total {data_type} for the hashtag {tag} {total.total}, only {total.unique} are unique. Something is going wrong..." + ) diff --git a/tiktok_downloader/file_methods.py b/tiktok_downloader/file_methods.py index 0b28314..43184fe 100644 --- a/tiktok_downloader/file_methods.py +++ b/tiktok_downloader/file_methods.py @@ -14,40 +14,41 @@ The file contains the functions that operate on files, such as writing or readin """ -def create_file(name, file_type): +def create_file(name: str, file_type: str): """ Creates a file or directory. """ - if (file_type == "dir"): + if file_type == "dir": os.makedirs(name, mode=0o777) - elif (file_type == "file"): - with open(name, "w"): pass + elif file_type == "file": + with open(name, "w"): + pass else: raise ValueError(f"{file_type} has to be either 'dir' or 'file'") -def check_existence(file_path, file_type): +def check_existence(file_path: str, file_type: str): """ Checks the existence of a file or a directory. If not found, returns False, else returns True. """ - if (file_type == "file"): + if file_type == "file": return os.path.isfile(file_path) - elif (file_type == "dir"): + elif file_type == "dir": return os.path.isdir(file_path) else: raise ValueError(f"{file_type} has to be either 'dir' or 'file'") -def check_file(file_path, file_type): +def check_file(file_path: str, file_type: str): """ Creates a file or directory, if not found. Else, returns nothing. """ status = check_existence(file_path, file_type) if not status: - create_file(file_path, file_type) + create_file(file_path, file_type) -def download_posts(settings, tag): +def download_posts(settings: dict, tag: str): """ Runs the tiktok-scraper command to download posts for a given hashtag. Returns the path to the downloaded file of posts. If no file was downloaded, prints the error and returns nothing in order to move on. @@ -55,66 +56,69 @@ def download_posts(settings, tag): """ path = os.path.join(settings["data"], tag, settings["posts"]) os.chdir(path) - tiktok_command = f"tiktok-scraper hashtag {tag} -t 'json'" - output = subprocess.check_output(tiktok_command, shell=True, encoding = 'utf-8') + tiktok_command = f"tiktok-scraper hashtag {tag} -t 'json'" + output = subprocess.check_output(tiktok_command, shell=True, encoding="utf-8") new_file = output.split()[-1] - if ("json" in new_file): + if "json" in new_file: os.chdir("../../../tiktok_downloader") - return new_file + return new_file else: - logger.warn(f"Something's wrong with what is returned by tiktok-scraper for the hashtag {tag} - *{new_file}* is not a json file.\n\ntiktok-scraper returned {output}") + logger.warn( + f"Something's wrong with what is returned by tiktok-scraper for the hashtag {tag} - *{new_file}* is not a json file.\n\ntiktok-scraper returned {output}" + ) os.chdir("../../../tiktok_downloader") -def download_videos(settings, tag): +def download_videos(settings: dict, tag: str): """ - Runs the tiktok-scraper command to download videos for a given hashtag. Note that all the videos are downloaded that are returned by the tiktok api and as a result, its a time and data consuming process. + Runs the tiktok-scraper command to download videos for a given hashtag. Note that all the videos are downloaded that are returned by the tiktok api and as a result, its a time and data consuming process. The list of downloaded video ids is constucted and returned if the downloaded folder contains at least 1 video. os.chdir is used to execute shell commands in the right folders and then reused to come back to the original folder of execution of run_downloader script. """ path = os.path.join(settings["data"], tag, settings["videos"]) os.chdir(path) - tiktok_command = f"tiktok-scraper hashtag {tag} -d" + tiktok_command = f"tiktok-scraper hashtag {tag} -d" result = subprocess.check_output(tiktok_command, shell=True) downloaded_list_tmp = os.listdir(f"./#{tag}") if downloaded_list_tmp: downloaded_list = [] for file in downloaded_list_tmp: - file = file.split('.')[0] + file = file.split(".")[0] downloaded_list.append(file) - + os.chdir("../../../tiktok_downloader") return downloaded_list else: logger.warn(f"No video files were downloaded for the hashtag {tag}.") os.chdir("../../../tiktok_downloader") - shutil.rmtree(settings['videos_delete']) - + shutil.rmtree(settings["videos_delete"]) -def get_data(file_path): + +def get_data(file_path: str) -> list: """ Reads the json file and retuns the read data. """ - with open(file_path, "r", encoding = "utf-8") as f: + with open(file_path, "r", encoding="utf-8") as f: data = json.load(f) return data -def dump_data(file_path, data): +def dump_data(file_path: str, data: list): """ Writes the data to the json file. """ - with open(file_path, "w", encoding = "utf-8") as f: + with open(file_path, "w", encoding="utf-8") as f: json.dump(data, f) -def log_writer(log_data): + +def log_writer(log_data: list): """ Creates the dictionary of total downloads (posts and videos) per hashtag. - Example : {timstamp : {hashtag : { videos : number_of_new_videos , posts : number_of_new_posts } } } + Example : { timetamp : { hashtag : { videos : number_of_new_videos , posts : number_of_new_posts } } } Writes the dictionary to the log file (logs/log.json). """ total = 0 - scraped_summary_dict = {} + scraped_summary_dict: dict for hashtag, (data_type, count) in log_data: if hashtag in scraped_summary_dict: if data_type in scraped_summary_dict[hashtag]: @@ -123,18 +127,18 @@ def log_writer(log_data): scraped_summary_dict[hashtag][data_type] = count total += count else: - scraped_summary_dict[hashtag] = {data_type : count} + scraped_summary_dict[hashtag] = {data_type: count} total += count now = datetime.now() now_str = now.strftime("%d-%m-%Y %H:%M:%S") - data = { now_str : scraped_summary_dict } + data = {now_str: scraped_summary_dict} logger.warn(f"Logged post data: {data}") logger.info(f"Successfully scraped {total} total entries") -def id_writer(file_path, new_data, tag, status): +def id_writer(file_path: str, new_data: list, tag: str, status: bool) -> tuple: """ Writes the list of new ids to the post_ids or video_ids files. """ @@ -145,20 +149,20 @@ def id_writer(file_path, new_data, tag, status): if tag in data: data[tag] += new_data else: - data[tag]= new_data + data[tag] = new_data dump_data(file_path, data) except json.decoder.JSONDecodeError: - data = { tag : new_data } + data = {tag: new_data} dump_data(file_path, data) else: - data = { tag : new_data } + data = {tag: new_data} dump_data(file_path, data) logger.debug(f"SUCCESS - {total} entries added to {file_path}") number_scraped = (tag, total) return number_scraped -def post_writer(file_path, new_data, status): +def post_writer(file_path: str, new_data: list, status: bool): """ Writes the new posts in the post file of the given hashtag (/data/{hashtag}/posts/data.json) """ @@ -177,31 +181,35 @@ def post_writer(file_path, new_data, status): logger.debug(f"SUCCESS - {total} entries added to {file_path}") -def delete_file(file_path, file_type): +def delete_file(file_path: str, file_type: str): """ Deletes the directory or the file. """ if not check_existence(file_path, file_type): raise OSError(f"Attempt to delete file failed: {file_path} does not exist") - elif (file_type == "file"): + elif file_type == "file": os.remove(file_path) logger.debug(f"Successfully deleted {file_path}") - elif (file_type == "dir"): + elif file_type == "dir": os.rmdir(file_path) logger.debug(f"Successfully deleted {file_path}") else: raise OSError("{file_type} needs to be either 'file' or 'dir'") -def clean_video_files(settings, tag, new_data=None): +def clean_video_files(settings: dict, tag: str, new_data: list = None): """ Moves the new videos from the tiktok-scraper video folder to /data/{hashtag}/videos/ Deletes the residual tiktok-scraper video folder. """ if new_data: for file in new_data: - settings["videos_from"] = settings['data'] + f"/{tag}/videos/#{tag}/{file}.mp4" - shutil.move(settings['videos_from'], settings['videos_to']) - - shutil.rmtree(settings['videos_delete']) - logger.debug(f"Successfully deleted the folder {settings['videos_delete']} folder of videos.") + settings["videos_from"] = ( + settings["data"] + f"/{tag}/videos/#{tag}/{file}.mp4" + ) + shutil.move(settings["videos_from"], settings["videos_to"]) + + shutil.rmtree(settings["videos_delete"]) + logger.debug( + f"Successfully deleted the folder {settings['videos_delete']} folder of videos." + ) diff --git a/tiktok_downloader/global_data.py b/tiktok_downloader/global_data.py index 8c30ec3..85d4939 100644 --- a/tiktok_downloader/global_data.py +++ b/tiktok_downloader/global_data.py @@ -17,24 +17,23 @@ DATA_FILE = "data.json" FILES = { - "data" : DATA, - "ids" : IDS, - "posts" : POSTS, - "videos" : VIDEOS, - "images" : IMAGES, - "post_ids" : f"{DATA}/{IDS}/{POST_IDS}", - "video_ids" : f"{DATA}/{IDS}/{VIDEO_IDS}", - "data_file" : f"{DATA_FILE}", - "downloads" : [], - } - + "data": DATA, + "ids": IDS, + "posts": POSTS, + "videos": VIDEOS, + "images": IMAGES, + "post_ids": f"{DATA}/{IDS}/{POST_IDS}", + "video_ids": f"{DATA}/{IDS}/{VIDEO_IDS}", + "data_file": f"{DATA_FILE}", + "downloads": [], +} # Commands tag = "" PARAMETERS = { - "scraper_attempts" : 3, -# "number_of_videos" : 3, # Number of videos to be downloaded by tiktok-scraper. - "sleep" : 8 - } + "scraper_attempts": 3, + # "number_of_videos" : 3, # Number of videos to be downloaded by tiktok-scraper. + "sleep": 8, +} diff --git a/tiktok_downloader/hashtag_frequencies.py b/tiktok_downloader/hashtag_frequencies.py index 7130558..5ad3f41 100644 --- a/tiktok_downloader/hashtag_frequencies.py +++ b/tiktok_downloader/hashtag_frequencies.py @@ -3,12 +3,14 @@ import json import argparse from datetime import datetime import warnings + warnings.filterwarnings("ignore", message="Glyph (.*) missing from current font") import logging import matplotlib.pyplot as plt import matplotlib.ticker as mtick import seaborn as sns + sns.set_theme(style="darkgrid") from file_methods import check_file, check_existence @@ -21,20 +23,26 @@ Plots the frequency of hashtags appearing in the set of given posts. def get_hashtags(obj): if not obj: - raise ValueError(f'Empty item, no hashtags to be extracted.') + raise ValueError(f"Empty item, no hashtags to be extracted.") else: hashtags = {} - tags = [ [tag['name'] for tag in ele['hashtags']] for ele in obj ] - tags = [ set(ele) for ele in tags ] - { tag: (1 if tag not in hashtags and not hashtags.update({tag: 1}) - else hashtags[tag] + 1 and not hashtags.update({tag: hashtags[tag] + 1})) - for ele in tags for tag in ele } + tags = [[tag["name"] for tag in ele["hashtags"]] for ele in obj] + tags = [set(ele) for ele in tags] + { + tag: ( + 1 + if tag not in hashtags and not hashtags.update({tag: 1}) + else hashtags[tag] + 1 and not hashtags.update({tag: hashtags[tag] + 1}) + ) + for ele in tags + for tag in ele + } hashtags = sorted(hashtags.items(), key=lambda e: e[1], reverse=True) return hashtags -def get_occurrences(filename, n=1 , sort=True): +def get_occurrences(filename, n=1, sort=True): """ Takes the json file containing posts and returns a dictionary: local variable occs = { @@ -46,29 +54,26 @@ def get_occurrences(filename, n=1 , sort=True): obj = json.load(f) l = len(obj) tags = get_hashtags(obj) - occs = { - "total": l, - "top_n": [] - } - occs["top_n"] = [ [ ele[i] for ele in tags[0:n] ] for i in range(2)] + occs = {"total": l, "top_n": []} + occs["top_n"] = [[ele[i] for ele in tags[0:n]] for i in range(2)] return occs def plot(n, occs, img_folder): y_pos = list(reversed(range(n - 1))) max_count = occs["top_n"][1][0] - freqs = [count/max_count * 100 for count in occs["top_n"][1][1:]] + freqs = [count / max_count * 100 for count in occs["top_n"][1][1:]] labels = occs["top_n"][0][1:] - fig, ax = plt.subplots(figsize = (5, 6.66)) + fig, ax = plt.subplots(figsize=(5, 6.66)) ax.barh(y_pos, freqs) ax.set_yticks(y_pos) ax.set_yticklabels(labels) - ax.grid(axis = 'y') - ax.set_xlabel('Percent of posts with common hashtag') - ax.set_ylim(min(y_pos)-1, max(y_pos)+1) + ax.grid(axis="y") + ax.set_xlabel("Percent of posts with common hashtag") + ax.set_ylim(min(y_pos) - 1, max(y_pos) + 1) ax.set_title(f'Common hashtags for #{occs["top_n"][0][0]} posts') - ax.xaxis.set_major_formatter(mtick.PercentFormatter(decimals = 0)) + ax.xaxis.set_major_formatter(mtick.PercentFormatter(decimals=0)) save_plot(img_folder) @@ -78,10 +83,14 @@ def print_occurrences(occs): """ row_number = 0 total_posts = occs["total"] - print ("{:<8} {:<15} {:<15} {:<15}".format("Rank", 'Hashtag','Occurrences','Frequency')) - for key,value in zip(occs["top_n"][0], occs["top_n"][1]): - ratio = value/total_posts - print ("{:<8} {:<15} {:<15} {:<15}".format(row_number, key, value, ratio)) + print( + "{:<8} {:<15} {:<15} {:<15}".format( + "Rank", "Hashtag", "Occurrences", "Frequency" + ) + ) + for key, value in zip(occs["top_n"][0], occs["top_n"][1]): + ratio = value / total_posts + print("{:<8} {:<15} {:<15} {:<15}".format(row_number, key, value, ratio)) row_number += 1 @@ -92,8 +101,24 @@ def save_plot(img_folder): now = datetime.now() current_time = now.strftime("%Y_%m_%d_%H_%M_%S") filename = f"{img_folder}/{current_time}.png" - logging.info(f'Plot saved to file: {filename}') - plt.savefig(filename, bbox_inches = 'tight', facecolor = 'white', dpi = 300) + logging.info(f"Plot saved to file: {filename}") + plt.savefig(filename, bbox_inches="tight", facecolor="white", dpi=300) + + +def create_parser(): + """ + Creates the parser and the arguments for the user input. + """ + parser = argparse.ArgumentParser() + parser.add_argument("input_file", help="The json hashtag file name") + parser.add_argument("n", help="The number of top n occurrences", type=int) + parser.add_argument( + "-p", "--plot", help="Plot the occurrences", action="store_true" + ) + parser.add_argument( + "-d", "--print", help="List top n hashtags", action="store_true" + ) + return parser if __name__ == "__main__": @@ -106,16 +131,16 @@ if __name__ == "__main__": """ img_folder = IMAGES check_file(img_folder, "dir") - parser = argparse.ArgumentParser() - parser.add_argument("input_file", help="The json hashtag file name") - parser.add_argument("n", help="The number of top n occurrences", type=int) - parser.add_argument("-p", "--plot", help="Plot the occurrences", action="store_true") - parser.add_argument("-d", "--print", help="List top n hashtags", action="store_true") + parser = create_parser() args = parser.parse_args() if args.n < 1: - raise ValueError(f"Specified argument `n` (the number of hashtags to analyze) must be greater than zero, not: {args.n}.") - if not check_existence(args.input_file, 'file'): - raise FileNotFoundError(f"Specified argument `input_file` ({args.input_file}) does not exist.") + raise ValueError( + f"Specified argument `n` (the number of hashtags to analyze) must be greater than zero, not: {args.n}." + ) + if not check_existence(args.input_file, "file"): + raise FileNotFoundError( + f"Specified argument `input_file` ({args.input_file}) does not exist." + ) base = os.path.splitext(args.input_file)[0] path = f"./{base}_sorted_hashtags.csv" occs = get_occurrences(args.input_file, args.n) diff --git a/tiktok_downloader/run_downloader.py b/tiktok_downloader/run_downloader.py index 5b59afd..3713e5a 100644 --- a/tiktok_downloader/run_downloader.py +++ b/tiktok_downloader/run_downloader.py @@ -12,19 +12,24 @@ logging.config.fileConfig("../logging.config") logger = logging.getLogger("Logger") -def get_hashtag_list(file_name): - if not file_methods.check_existence(file_name, 'file'): +def get_hashtag_list(file_name: str) -> list: + if not file_methods.check_existence(file_name, "file"): raise OSError(f"{file_name} does not exist") with open(file_name) as f: - tags = list(filter(None, [line.strip() for line in f if not line.startswith("#")])) + tags = list( + filter(None, [line.strip() for line in f if not line.startswith("#")]) + ) return tags def create_parser(): - # Creating the parser - parser = argparse.ArgumentParser(description="Download the tiktoks for the requested hashtags") + """ + Creates the parser and the arguments for the user input. + """ + parser = argparse.ArgumentParser( + description="Download the tiktoks for the requested hashtags" + ) - # Adding the arguments parser.add_argument("-t", type=str, nargs="*", help="List of hashtags") parser.add_argument("-f", type=str, help="File name with the list of hashtags") parser.add_argument("-p", action="store_true", help="Download posts") @@ -33,9 +38,9 @@ def create_parser(): return parser -def set_download_settings(download_data_type): +def set_download_settings(download_data_type: str) -> dict: """ - Loads the constants from global_data into the dict called settings and returns it. + Loads the constants from global_data into the dict called settings and returns it. Purpose - easy access to global constants by various functions. """ settings = {} @@ -48,16 +53,15 @@ def set_download_settings(download_data_type): settings["posts"] = global_data.FILES["posts"] settings["post_ids"] = global_data.FILES["post_ids"] settings["data_file"] = global_data.FILES["data_file"] - + if download_data_type["videos"]: settings["videos"] = global_data.FILES["videos"] settings["video_ids"] = global_data.FILES["video_ids"] - + return settings - -def get_posts(settings, tag): +def get_posts(settings: dict, tag: str) -> tuple: """ 1. calls download_posts in file_methods.py to get the posts for a given hashtag 2. calls extract_posts from data_methods.py to extract new posts if any @@ -68,21 +72,24 @@ def get_posts(settings, tag): if file_path: new_data = data_methods.extract_posts(settings, file_path, tag) if new_data: - data_file = os.path.join(settings["data"], tag, settings["posts"], settings["data_file"]) + data_file = os.path.join( + settings["data"], tag, settings["posts"], settings["data_file"] + ) data_methods.update_posts(data_file, "file", new_data[1]) - number_scraped = data_methods.update_posts(settings["post_ids"], "file", new_data[0], tag) + number_scraped = data_methods.update_posts( + settings["post_ids"], "file", new_data[0], tag + ) file_methods.delete_file(file_path, "file") - + return number_scraped - -def get_videos(settings, tag): +def get_videos(settings: dict, tag: str) -> tuple: """ 1. calls download_videos in file_methods.py to get the videos for a given hashtag 2. calls extract_videos from data_methods.py to extract new videos if any 3. calls update_videos from data_methods.py to update the id-list with the ids of newly downloaded videos. - 4. the clean_video_files function deletes the residual video folder after the data processing + 4. the clean_video_files function deletes the residual video folder after the data processing """ number_scraped = () download_list = file_methods.download_videos(settings, tag) @@ -96,46 +103,54 @@ def get_videos(settings, tag): return number_scraped - -def get_data(hashtags, download_data_type): +def get_data(hashtags: list, download_data_type: str) -> list: """ The function checks for the user option "-p", "-v" or both and then - triggers the functions get_posts, get_videos or both, respectively. + triggers the functions get_posts, get_videos or both, respectively. """ counter = 0 total_hashtags = len(hashtags) total_hashtags_offset = total_hashtags - 1 scraped_summary_list = [] - + if download_data_type["posts"]: settings = set_download_settings(download_data_type) while counter < total_hashtags: tag = hashtags[counter] - file_methods.check_file(os.path.join(settings["data"], tag, settings["posts"]), "dir") - file_methods.check_file(os.path.join(settings["data"], tag, settings["posts"], settings["data_file"]), "file") + file_methods.check_file( + os.path.join(settings["data"], tag, settings["posts"]), "dir" + ) + file_methods.check_file( + os.path.join( + settings["data"], tag, settings["posts"], settings["data_file"] + ), + "file", + ) res = get_posts(settings, tag) if res: - number_scraped = ( res[0], ( "posts", res[1] ) ) + number_scraped = (res[0], ("posts", res[1])) scraped_summary_list.append(number_scraped) data_methods.print_total(settings["post_ids"], tag, "posts") - + counter += 1 if counter < total_hashtags_offset: time.sleep(settings["sleep"]) - + if download_data_type["videos"]: settings = set_download_settings(download_data_type) while counter < total_hashtags: tag = hashtags[counter] - file_methods.check_file(os.path.join(settings["data"], tag, settings["videos"]), "dir") - settings["videos_delete"] = settings['data'] + f"/{tag}/videos/#{tag}" - settings["videos_to"] = settings['data'] + f"/{tag}/videos" + file_methods.check_file( + os.path.join(settings["data"], tag, settings["videos"]), "dir" + ) + settings["videos_delete"] = settings["data"] + f"/{tag}/videos/#{tag}" + settings["videos_to"] = settings["data"] + f"/{tag}/videos" res = get_videos(settings, tag) if res: - res = ( res[0], ( "videos", res[1])) + res = (res[0], ("videos", res[1])) scraped_summary_list.append(res) data_methods.print_total(settings["video_ids"], tag, "videos") - + counter += 1 if counter < total_hashtags_offset: time.sleep(settings["sleep"]) @@ -148,10 +163,14 @@ if __name__ == "__main__": args = parser.parse_args() if not (args.t or args.f): - parser.error("No hashtags were given, please use either -t option or -f to provide hashtags.") - + parser.error( + "No hashtags were given, please use either -t option or -f to provide hashtags." + ) + if not (args.p or args.v): - parser.error("No argument given, please specify either -p for posts or -v videos or both.") + parser.error( + "No argument given, please specify either -p for posts or -v videos or both." + ) if args.t: hashtags = args.t @@ -161,13 +180,12 @@ if __name__ == "__main__": logger.info(f"Hashtags to scrape: {hashtags}") if not hashtags: - raise ValueError("No hashtags were specified: please use either the -t flag to specify a sspace-separated list of one or more hashtags as a command-line argument, or use the -f flag to specify a text file of newline-separated hashtags.") + raise ValueError( + "No hashtags were specified: please use either the -t flag to specify a sspace-separated list of one or more hashtags as a command-line argument, or use the -f flag to specify a text file of newline-separated hashtags." + ) + + download_data_type = {"posts": args.p, "videos": args.v} - download_data_type = { - "posts": args.p, - "videos": args.v - } - scraped_summary_list = get_data(hashtags, download_data_type) if scraped_summary_list: file_methods.log_writer(scraped_summary_list)