fixed problems with type hints, clarified documentation

This commit is contained in:
Tristan Lee
2022-05-05 20:50:54 -05:00
parent f918f06c28
commit be05ea0fe2
5 changed files with 146 additions and 139 deletions

View File

@@ -1,4 +1,7 @@
from typing import NamedTuple
"""Utility functions that perform data processing related tasks.
"""
from typing import NamedTuple, List, Tuple, Set, Optional, Union, Dict, Any
import logging, logging.config
import file_methods
@@ -7,13 +10,8 @@ logging.config.fileConfig("../logging.config")
logger = logging.getLogger("Logger")
"""
The file contains several functions that perform data processing related tasks.
"""
class Diff(NamedTuple):
ids: list
ids: Set[str]
filter_posts: bool
@@ -22,10 +20,12 @@ class Total(NamedTuple):
unique: int
def get_difference(tag: str, file_name: str, ids: list) -> NamedTuple:
"""
Compares two sets of ids and returns the difference of the two sets.
Purpose - user to filter out the new ids by comparing the set of id list (ids/post_ids.json or videos_ids.json) and the list of newly downloaded ids.
def get_difference(tag: str, file_name: str, ids: List[str]) -> Optional[Diff]:
"""Find TikTok posts that haven't already been scraped.
Filter out the new posts for the hashtag `tag` by comparing the list of
post IDs contained in `filename` to the list of newly downloaded IDs
contained in `ids`.
"""
filter_posts = False
current_id_data = file_methods.get_data(file_name)
@@ -38,22 +38,23 @@ def get_difference(tag: str, file_name: str, ids: list) -> NamedTuple:
if not new_ids:
return None
else:
new_ids = list(new_ids)
total_new_ids = len(new_ids)
if total_new_ids == total_current_ids:
filter_posts = False
new_data = Diff(new_ids, filter_posts)
else:
new_data = Diff(new_ids, filter_posts)
return new_data
else:
filter_posts = True
new_data = Diff(ids, filter_posts)
new_data = Diff(set(ids), filter_posts)
return new_data
def extract_posts(settings: dict, file_name: str, tag: str) -> list:
def extract_posts(
settings: Dict[Any, Any], file_name: str, tag: str
) -> Optional[Tuple[List[str], List[str]]]:
"""
Takes the downloaded file by the tiktok-scraper that contains the posts, and returns the new posts after comparing it the list of posts (from the file ids/post_ids.json) already downloaded.
"""
ids = []
@@ -65,6 +66,7 @@ def extract_posts(settings: dict, file_name: str, tag: str) -> list:
if not ids:
logger.warn(f"No posts were found for the hashtag: {tag}")
return None
status = file_methods.check_existence(settings["post_ids"], "file")
if not status:
@@ -74,16 +76,15 @@ def extract_posts(settings: dict, file_name: str, tag: str) -> list:
new_ids = get_difference(tag, settings["post_ids"], ids)
if not new_ids:
logger.warn(f"No new posts were found for the hashtag: {tag}")
return None
elif new_ids.filter_posts:
new_posts = [post for post in posts if post["id"] in new_ids.ids]
new_data = (new_ids.ids, new_posts)
return new_data
return (list(new_ids.ids), new_posts)
else:
new_data = (new_ids.ids, posts)
return new_data
return (list(new_ids.ids), posts)
def extract_videos(settings: dict, tag: str, download_list: list) -> list:
def extract_videos(settings: dict, tag: str, download_list: List[str]) -> List[str]:
"""
Tiktok-scraper downloads the videos and puts them in a folder - the list of ids of the downloaded videos is fed to this function as download_list. The function returns the set of new videos after comparing it the list of videos (from the file ids/videos_ids.json) already downloaded.
"""
@@ -97,37 +98,40 @@ def extract_videos(settings: dict, tag: str, download_list: list) -> list:
logger.warn(
f"No new videos were found for the {tag} in the downloaded folder."
)
return None
return []
else:
return new_videos.ids
return list(new_videos.ids)
def update_posts(
file_path: str, file_type: str, new_data: list, tag: str = None
) -> tuple:
file_path: str, file_type: str, new_data: List[str], tag: str = None
) -> Optional[Tuple[str, int]]:
"""
Updates the list of post ids (in the file ids/post_ids.json) with the ids of the new posts.
"""
status = file_methods.check_existence(file_path, file_type)
if not tag:
file_methods.post_writer(file_path, new_data, status)
return None
else:
scraped_data = file_methods.id_writer(file_path, new_data, tag, status)
return scraped_data
def update_videos(settings: str, new_data: list, tag: str) -> tuple:
def update_videos(
settings: Dict[str, Any], new_data: List[str], tag: str
) -> Tuple[str, int]:
"""
Updates the list of video ids (in the file ids/video_ids.json) with the ids of the new videos.
"""
file_path = settings["video_ids"]
file_methods.check_file(file_path, "file")
log = file_methods.id_writer(file_path, new_data, tag, True)
number_scraped = file_methods.id_writer(file_path, new_data, tag, True)
file_methods.clean_video_files(settings, tag, new_data)
return log
return number_scraped
def get_total_posts(file_path: str, tag: str) -> NamedTuple:
def get_total_posts(file_path: str, tag: str) -> Total:
"""
Returns total count of ids in a id list along with the number of unique ids among them.
"""

View File

@@ -1,17 +1,17 @@
"""Utility functions that operate on files, such as writing to reading from a file.
"""
import os
import json
import subprocess
from datetime import datetime
import shutil
from typing import Tuple, List, Optional, Dict, Any
import logging, logging.config
logging.config.fileConfig("../logging.config")
logger = logging.getLogger("Logger")
"""
The file contains the functions that operate on files, such as writing or reading from files etc.
"""
logger = logging.getLogger()
def create_file(name: str, file_type: str):
@@ -94,16 +94,16 @@ def download_videos(settings: dict, tag: str):
shutil.rmtree(settings["videos_delete"])
def get_data(file_path: str) -> list:
def get_data(file_path: str) -> Any:
"""
Reads the json file and retuns the read data.
"""
with open(file_path, "r", encoding="utf-8") as f:
data = json.load(f)
return data
return data
def dump_data(file_path: str, data: list):
def dump_data(file_path: str, data: List[dict]):
"""
Writes the data to the json file.
"""
@@ -111,14 +111,15 @@ def dump_data(file_path: str, data: list):
json.dump(data, f)
def log_writer(log_data: list):
def log_writer(log_data: List[Tuple[str, Tuple[str, int]]]):
"""
Creates the dictionary of total downloads (posts and videos) per hashtag.
Example : { timetamp : { hashtag : { videos : number_of_new_videos , posts : number_of_new_posts } } }
Writes the dictionary to the log file (logs/log.json).
"""
total = 0
scraped_summary_dict: dict
scraped_summary_dict = {} # type: Dict[str, Dict[str, int]]
for hashtag, (data_type, count) in log_data:
if hashtag in scraped_summary_dict:
if data_type in scraped_summary_dict[hashtag]:
@@ -130,18 +131,20 @@ def log_writer(log_data: list):
scraped_summary_dict[hashtag] = {data_type: count}
total += count
now = datetime.now()
now_str = now.strftime("%d-%m-%Y %H:%M:%S")
now_str = datetime.now().strftime("%d-%m-%Y %H:%M:%S")
data = {now_str: scraped_summary_dict}
logger.warn(f"Logged post data: {data}")
logger.debug(f"Logged post data: {data}")
logger.info(f"Successfully scraped {total} total entries")
def id_writer(file_path: str, new_data: list, tag: str, status: bool) -> tuple:
def id_writer(
file_path: str, new_data: List[str], tag: str, status: bool
) -> Tuple[str, int]:
"""
Writes the list of new ids to the post_ids or video_ids files.
"""
total = len(new_data)
if status:
try:
@@ -162,7 +165,7 @@ def id_writer(file_path: str, new_data: list, tag: str, status: bool) -> tuple:
return number_scraped
def post_writer(file_path: str, new_data: list, status: bool):
def post_writer(file_path: str, new_data: List[str], status: bool):
"""
Writes the new posts in the post file of the given hashtag (/data/{hashtag}/posts/data.json)
"""
@@ -197,7 +200,7 @@ def delete_file(file_path: str, file_type: str):
raise OSError("{file_type} needs to be either 'file' or 'dir'")
def clean_video_files(settings: dict, tag: str, new_data: list = None):
def clean_video_files(settings: dict, tag: str, new_data: Optional[List[str]] = None):
"""
Moves the new videos from the tiktok-scraper video folder to /data/{hashtag}/videos/
Deletes the residual tiktok-scraper video folder.

View File

@@ -1,5 +1,4 @@
"""
Contains global constants relating to paths and operational parameters such as sleep time between consecutive tiktok-scraper calls.
"""Specify global constants including file paths and scraping options.
"""
@@ -15,7 +14,6 @@ POST_IDS = "post_ids.json"
VIDEO_IDS = "video_ids.json"
DATA_FILE = "data.json"
FILES = {
"data": DATA,
"ids": IDS,
@@ -28,12 +26,7 @@ FILES = {
"downloads": [],
}
# Commands
tag = ""
PARAMETERS = {
"scraper_attempts": 3,
# "number_of_videos" : 3, # Number of videos to be downloaded by tiktok-scraper.
"sleep": 8,
}

View File

@@ -1,33 +1,54 @@
"""Analyze the frequency of hashtags appearing in the set of given posts.
- The "input_file" argument specifies the JSON file containing post information for a given hashtag
- The "n" argument specifies how many hashtags does the user wants to analyze
- Specifying the "-d" flag prints the hashtag frequencies on the shell
- Specifying the "-p" flag plots the hashtag frequencies and saves as a png file
"""
import os
import json
import argparse
from datetime import datetime
import warnings
warnings.filterwarnings("ignore", message="Glyph (.*) missing from current font")
from typing import List, Tuple, Dict, Any
import logging
import matplotlib.pyplot as plt
import matplotlib.ticker as mtick
import seaborn as sns
sns.set_theme(style="darkgrid")
from file_methods import check_file, check_existence
from global_data import IMAGES
"""
Plots the frequency of hashtags appearing in the set of given posts.
"""
warnings.filterwarnings("ignore", message="Glyph (.*) missing from current font")
sns.set_theme(style="darkgrid")
def get_hashtags(obj):
def create_parser() -> argparse.ArgumentParser:
"""Create the parser and the arguments for the user input."""
parser = argparse.ArgumentParser()
parser.add_argument(
"input_file",
help="The file name of the JSON file containing posts for a given hashtag",
)
parser.add_argument("n", help="The number of top n occurrences", type=int)
parser.add_argument(
"-p", "--plot", help="Plot the occurrences", action="store_true"
)
parser.add_argument(
"-d", "--print", help="List top n hashtags", action="store_true"
)
return parser
def get_hashtags(obj: Dict) -> List[Tuple[str, int]]:
if not obj:
raise ValueError(f"Empty item, no hashtags to be extracted.")
raise ValueError(f"Empty item, no hashtags could be extracted.")
else:
hashtags = {}
tags = [[tag["name"] for tag in ele["hashtags"]] for ele in obj]
tags = [set(ele) for ele in tags]
tags = [set([tag["name"] for tag in ele["hashtags"]]) for ele in obj]
{
tag: (
1
@@ -37,29 +58,29 @@ def get_hashtags(obj):
for ele in tags
for tag in ele
}
hashtags = sorted(hashtags.items(), key=lambda e: e[1], reverse=True)
return hashtags
return sorted(hashtags.items(), key=lambda e: e[1], reverse=True)
def get_occurrences(filename, n=1, sort=True):
"""
Takes the json file containing posts and returns a dictionary:
local variable occs = {
def get_occurrences(filename: str, n: int = 1) -> Dict[str, Any]:
"""Aggregate hashtag frequency information for a specified JSON file.
Return dict `occs` with keys:
"total": total posts in the file,
top_n: [[top n hashtags ], [frequencies of corresponding hashtags]]
}
"""
with open(filename) as f:
obj = json.load(f)
l = len(obj)
tags = get_hashtags(obj)
occs = {"total": l, "top_n": []}
occs["top_n"] = [[ele[i] for ele in tags[0:n]] for i in range(2)]
return occs
l = len(obj)
tags = get_hashtags(obj)
occs = {"total": l, "top_n": []}
occs["top_n"] = [[ele[i] for ele in tags[0 : max(l, n)]] for i in range(2)]
return occs
def plot(n, occs, img_folder):
def plot(n: int, occs: dict, img_folder: str):
"""Save plot of common hashtags as bar chart to file."""
y_pos = list(reversed(range(n - 1)))
max_count = occs["top_n"][1][0]
freqs = [count / max_count * 100 for count in occs["top_n"][1][1:]]
@@ -77,10 +98,17 @@ def plot(n, occs, img_folder):
save_plot(img_folder)
def save_plot(img_folder):
"""Save the plot as a png file in the folder ../data/imgs/"""
now = datetime.now()
current_time = now.strftime("%Y_%m_%d_%H_%M_%S")
filename = f"{img_folder}/{current_time}.png"
logging.info(f"Plot saved to file: {filename}")
plt.savefig(filename, bbox_inches="tight", facecolor="white", dpi=300)
def print_occurrences(occs):
"""
Prints the top n hashtags with their frequencies and the ratio of occurrences and total posts, all to the shell.
"""
"""Print information about the top n hashtags and their frequencies."""
row_number = 0
total_posts = occs["total"]
print(
@@ -94,41 +122,8 @@ def print_occurrences(occs):
row_number += 1
def save_plot(img_folder):
"""
Saves the plot to a png file in the folder /data/imgs/
"""
now = datetime.now()
current_time = now.strftime("%Y_%m_%d_%H_%M_%S")
filename = f"{img_folder}/{current_time}.png"
logging.info(f"Plot saved to file: {filename}")
plt.savefig(filename, bbox_inches="tight", facecolor="white", dpi=300)
def create_parser():
"""
Creates the parser and the arguments for the user input.
"""
parser = argparse.ArgumentParser()
parser.add_argument("input_file", help="The json hashtag file name")
parser.add_argument("n", help="The number of top n occurrences", type=int)
parser.add_argument(
"-p", "--plot", help="Plot the occurrences", action="store_true"
)
parser.add_argument(
"-d", "--print", help="List top n hashtags", action="store_true"
)
return parser
if __name__ == "__main__":
"""
Option "n" specifies how many hashtags does the user wants to plot.
"-d" option prints the hashtag frequencies on the shell
"-p" option plots the hashtag frequencies and saves as a png file in the folder /data/imgs/
The function get_occurrences is triggered to compute and return the top n occurrences and the hashtags.
"""
img_folder = IMAGES
check_file(img_folder, "dir")
parser = create_parser()

View File

@@ -1,7 +1,17 @@
"""Download post data or videos from TikToks containing one or more specified hashtags.
- The "-p" flag specifies that only data from posts is downloaded, no video files
- The "-v" flag specifies that only video files are downloaded, no post data
- Specifying both "-p" and "-v" flags downloads both post data and video files
- The "-t" flag allows the user to specify a list of space-separated hashtags as an argument
- The "-f" flag allows the user to specify the filename of a text file containing a list of newline-separated hashtags as an argument
"""
import os
import time
import argparse
import logging, logging.config
from typing import List, Tuple, Dict, Any, Optional
import global_data
import file_methods
@@ -12,17 +22,7 @@ logging.config.fileConfig("../logging.config")
logger = logging.getLogger("Logger")
def get_hashtag_list(file_name: str) -> list:
if not file_methods.check_existence(file_name, "file"):
raise OSError(f"{file_name} does not exist")
with open(file_name) as f:
tags = list(
filter(None, [line.strip() for line in f if not line.startswith("#")])
)
return tags
def create_parser():
def create_parser() -> argparse.ArgumentParser:
"""
Creates the parser and the arguments for the user input.
"""
@@ -38,16 +38,27 @@ def create_parser():
return parser
def set_download_settings(download_data_type: str) -> dict:
def get_hashtag_list(file_name: str) -> List[str]:
if not file_methods.check_existence(file_name, "file"):
raise OSError(f"{file_name} does not exist")
with open(file_name) as f:
tags = list(
filter(None, [line.strip() for line in f if not line.startswith("#")])
)
return tags
def set_download_settings(download_data_type: Dict[str, bool]) -> Dict[str, Any]:
"""
Loads the constants from global_data into the dict called settings and returns it.
Purpose - easy access to global constants by various functions.
"""
settings = {}
settings["data"] = global_data.FILES["data"]
settings["ids"] = global_data.FILES["ids"]
settings["sleep"] = global_data.PARAMETERS["sleep"]
settings["scraper"] = global_data.PARAMETERS["scraper_attempts"]
settings = {
"data": global_data.FILES["data"],
"ids": global_data.FILES["ids"],
"sleep": global_data.PARAMETERS["sleep"],
"scraper": global_data.PARAMETERS["scraper_attempts"],
}
file_methods.check_file(f"{settings['data']}/{settings['ids']}", "dir")
if download_data_type["posts"]:
settings["posts"] = global_data.FILES["posts"]
@@ -61,14 +72,14 @@ def set_download_settings(download_data_type: str) -> dict:
return settings
def get_posts(settings: dict, tag: str) -> tuple:
def get_posts(settings: dict, tag: str) -> Optional[Tuple[str, int]]:
"""
1. calls download_posts in file_methods.py to get the posts for a given hashtag
2. calls extract_posts from data_methods.py to extract new posts if any
3. calls update_posts from data_methods.py to update the id-list with the ids of newly downloaded posts.
"""
file_path = file_methods.download_posts(settings, tag)
number_scraped = ()
number_scraped = None
if file_path:
new_data = data_methods.extract_posts(settings, file_path, tag)
if new_data:
@@ -84,14 +95,14 @@ def get_posts(settings: dict, tag: str) -> tuple:
return number_scraped
def get_videos(settings: dict, tag: str) -> tuple:
def get_videos(settings: dict, tag: str) -> Optional[Tuple[str, int]]:
"""
1. calls download_videos in file_methods.py to get the videos for a given hashtag
2. calls extract_videos from data_methods.py to extract new videos if any
3. calls update_videos from data_methods.py to update the id-list with the ids of newly downloaded videos.
4. the clean_video_files function deletes the residual video folder after the data processing
"""
number_scraped = ()
number_scraped = None
download_list = file_methods.download_videos(settings, tag)
if download_list:
new_data = data_methods.extract_videos(settings, tag, download_list)
@@ -103,7 +114,9 @@ def get_videos(settings: dict, tag: str) -> tuple:
return number_scraped
def get_data(hashtags: list, download_data_type: str) -> list:
def get_data(
hashtags: list, download_data_type: Dict[str, bool]
) -> List[Tuple[str, Tuple[str, int]]]:
"""
The function checks for the user option "-p", "-v" or both and then
triggers the functions get_posts, get_videos or both, respectively.
@@ -145,10 +158,9 @@ def get_data(hashtags: list, download_data_type: str) -> list:
)
settings["videos_delete"] = settings["data"] + f"/{tag}/videos/#{tag}"
settings["videos_to"] = settings["data"] + f"/{tag}/videos"
res = get_videos(settings, tag)
if res:
res = (res[0], ("videos", res[1]))
scraped_summary_list.append(res)
_res = get_videos(settings, tag)
if _res:
scraped_summary_list.append((_res[0], ("videos", _res[1])))
data_methods.print_total(settings["video_ids"], tag, "videos")
counter += 1
@@ -164,12 +176,12 @@ if __name__ == "__main__":
if not (args.t or args.f):
parser.error(
"No hashtags were given, please use either -t option or -f to provide hashtags."
"No hashtags were given, please use either the `-t` flag or the `-f` flag to provide hashtags."
)
if not (args.p or args.v):
parser.error(
"No argument given, please specify either -p for posts or -v videos or both."
"No argument given, please specify either the `-p` flag to download post data or the `-v` flag to download video files, or both."
)
if args.t:
@@ -181,7 +193,7 @@ if __name__ == "__main__":
logger.info(f"Hashtags to scrape: {hashtags}")
if not hashtags:
raise ValueError(
"No hashtags were specified: please use either the -t flag to specify a sspace-separated list of one or more hashtags as a command-line argument, or use the -f flag to specify a text file of newline-separated hashtags."
"No hashtags were specified: please use either the `-t` flag to specify a sspace-separated list of one or more hashtags as a command-line argument, or use the `-f` flag to specify a text file of newline-separated hashtags."
)
download_data_type = {"posts": args.p, "videos": args.v}