made docstrings more consistent, changed argument of hashtag_frequencies script to use the hashtag rather than the post_id file for the hashtag, to make it easier to use

This commit is contained in:
Tristan Lee
2022-05-06 01:49:55 -05:00
parent be05ea0fe2
commit 0cb9d4b1b9
5 changed files with 127 additions and 119 deletions

View File

@@ -22,17 +22,17 @@ You should now be ready to start using the tool.
## About the tool
### Command-line arguments
```
$ python run_downloader.py -h
python3 run_downloader.py --help
usage: run_downloader.py [-h] [-t [T [T ...]]] [-f F] [-p] [-v]
Download the tiktoks for the requested hashtags
optional arguments:
-h, --help show this help message and exit
-t [T [T ...]] List of hashtags
-f F File name with the list of hashtags
-p Download posts
-v Download videos
-h, --help show this help message and exit
-t [T [T ...]] List of hashtags to scrape
-f F File name containing list of hashtags to scrape
-p Download post data
-v Download video files
```
### Structure of output data
@@ -90,11 +90,11 @@ Note that video downloading is a time and data rate consuming task, as a result
The script `hashtag_frequencies.py` analyzes the frequencies of top occurring hashtags in a given set of posts.
```
python hashtag_frequencies.py --help
usage: hashtag_frequencies.py [-h] [-p] [-d] input_file n
$ python3 hashtag_frequencies.py --help
usage: hashtag_frequencies.py [-h] [-p] [-d] hashtag n
positional arguments:
input_file The json hashtag file name
hashtag The hashtag of scraped posts to analyze
n The number of top n occurrences
optional arguments:
@@ -107,7 +107,7 @@ Assume we want to analyze the 20 most frequently occurring hashtags in the downl
- The results can be plotted and saved as a PNG file by executing the following command:
`python3 hashtag_frequencies.py -p ../data/london/posts/data.json 20`
`python3 hashtag_frequencies.py london 20 -p`
which will produce a figure similar to that shown below:
<p align="center">
@@ -118,7 +118,7 @@ Assume we want to analyze the 20 most frequently occurring hashtags in the downl
- The results can be displayed in tabular form by executing the following command:
`python3 hashtag_frequencies.py -d ../data/london/posts/data.json 20`
`python3 hashtag_frequencies.py london 20 -d`
which will produce a terminal output similar to the following:
```

View File

@@ -1,27 +1,30 @@
"""Utility functions that perform data processing related tasks.
"""
from typing import NamedTuple, List, Tuple, Set, Optional, Union, Dict, Any
import logging, logging.config
from typing import NamedTuple, List, Tuple, Set, Optional, Dict, Any
import logging
import file_methods
logging.config.fileConfig("../logging.config")
logger = logging.getLogger("Logger")
logger = logging.getLogger()
class Diff(NamedTuple):
"""Keep track of scraped post IDs and whether previously-scraped posts have been filtered."""
ids: Set[str]
filter_posts: bool
class Total(NamedTuple):
"""Keep track of number of total and number of unique scraped posts."""
total: int
unique: int
def get_difference(tag: str, file_name: str, ids: List[str]) -> Optional[Diff]:
"""Find TikTok posts that haven't already been scraped.
"""Find TikTok post IDs that haven't previously been scraped.
Filter out the new posts for the hashtag `tag` by comparing the list of
post IDs contained in `filename` to the list of newly downloaded IDs
@@ -52,10 +55,11 @@ def get_difference(tag: str, file_name: str, ids: List[str]) -> Optional[Diff]:
def extract_posts(
settings: Dict[Any, Any], file_name: str, tag: str
) -> Optional[Tuple[List[str], List[str]]]:
"""
) -> Optional[Tuple[List[str], List[Dict]]]:
"""Find TikTok posts that haven't previously been scraped.
Takes the downloaded file by the tiktok-scraper that contains the posts, and returns the new posts after comparing it the list of posts (from the file ids/post_ids.json) already downloaded.
Compares the file downloaded by tiktok-scraper to the list of
previously-scraped posts (from the file ids/post_ids.json).
"""
ids = []
posts = []
@@ -85,8 +89,10 @@ def extract_posts(
def extract_videos(settings: dict, tag: str, download_list: List[str]) -> List[str]:
"""
Tiktok-scraper downloads the videos and puts them in a folder - the list of ids of the downloaded videos is fed to this function as download_list. The function returns the set of new videos after comparing it the list of videos (from the file ids/videos_ids.json) already downloaded.
"""Find TikTok videos that haven't previously been scraped.
Compares the file downloaded by tiktok-scraper to the list of
previously-scraped videos (from the file ids/video_ids.json).
"""
status = file_methods.check_existence(settings["video_ids"], "file")
if not status:
@@ -104,10 +110,10 @@ def extract_videos(settings: dict, tag: str, download_list: List[str]) -> List[s
def update_posts(
file_path: str, file_type: str, new_data: List[str], tag: str = None
file_path: str, file_type: str, new_data: List[Any], tag: str = None
) -> Optional[Tuple[str, int]]:
"""
Updates the list of post ids (in the file ids/post_ids.json) with the ids of the new posts.
"""Update the file containing scraped post IDs (`ids/post_ids.json`) with
the IDs of the recently scraped posts.
"""
status = file_methods.check_existence(file_path, file_type)
if not tag:
@@ -121,8 +127,8 @@ def update_posts(
def update_videos(
settings: Dict[str, Any], new_data: List[str], tag: str
) -> Tuple[str, int]:
"""
Updates the list of video ids (in the file ids/video_ids.json) with the ids of the new videos.
"""Update the file containing video IDs (`ids/video_ids.json`) with the IDs
of the recently scraped videos.
"""
file_path = settings["video_ids"]
file_methods.check_file(file_path, "file")
@@ -132,12 +138,10 @@ def update_videos(
def get_total_posts(file_path: str, tag: str) -> Total:
"""
Returns total count of ids in a id list along with the number of unique ids among them.
"""
"""Count number of total scraped posts and number of unique scraped posts."""
status = file_methods.check_existence(file_path, "file")
if not status:
raise OSError("{file_path} not found!")
raise OSError(f"{file_path} not found!")
else:
data = file_methods.get_data(file_path)
total_posts = len(data[tag])
@@ -147,9 +151,7 @@ def get_total_posts(file_path: str, tag: str) -> Total:
def print_total(file_path: str, tag: str, data_type: str):
"""
Prints the total count for posts or videos for a hashtag. Calls the function get_total_posts for sanity check that there are no repeating ids in the id lists.
"""
"""Print number of total and unique scraped posts, warn if any non-unique posts."""
total = get_total_posts(file_path, tag)
if total.total == total.unique:
logger.info(f"Scraped {total.total} {data_type} containing the hashtag '{tag}'")

View File

@@ -15,9 +15,7 @@ logger = logging.getLogger()
def create_file(name: str, file_type: str):
"""
Creates a file or directory.
"""
"""Create a file or directory."""
if file_type == "dir":
os.makedirs(name, mode=0o777)
elif file_type == "file":
@@ -28,9 +26,7 @@ def create_file(name: str, file_type: str):
def check_existence(file_path: str, file_type: str):
"""
Checks the existence of a file or a directory. If not found, returns False, else returns True.
"""
"""Check if a file or a directory exists."""
if file_type == "file":
return os.path.isfile(file_path)
elif file_type == "dir":
@@ -40,19 +36,20 @@ def check_existence(file_path: str, file_type: str):
def check_file(file_path: str, file_type: str):
"""
Creates a file or directory, if not found. Else, returns nothing.
"""
"""If path does not exist, creates a file or directory."""
status = check_existence(file_path, file_type)
if not status:
create_file(file_path, file_type)
def download_posts(settings: dict, tag: str):
"""
Runs the tiktok-scraper command to download posts for a given hashtag.
Returns the path to the downloaded file of posts. If no file was downloaded, prints the error and returns nothing in order to move on.
os.chdir is used to execute shell commands in the right folders and then reused to come back to the original folder of execution of run_downloader script.
def download_posts(settings: Dict, tag: str):
"""Run the tiktok-scraper command to download posts for a given hashtag.
Returns the path to the downloaded file of posts. If no file was downloaded,
prints the error and returns nothing in order to move on.
os.chdir is used to execute shell commands in the correct folder and then
reused to return to the original folder of execution of run_downloader script.
"""
path = os.path.join(settings["data"], tag, settings["posts"])
os.chdir(path)
@@ -69,11 +66,16 @@ def download_posts(settings: dict, tag: str):
os.chdir("../../../tiktok_downloader")
def download_videos(settings: dict, tag: str):
"""
Runs the tiktok-scraper command to download videos for a given hashtag. Note that all the videos are downloaded that are returned by the tiktok api and as a result, its a time and data consuming process.
The list of downloaded video ids is constucted and returned if the downloaded folder contains at least 1 video.
os.chdir is used to execute shell commands in the right folders and then reused to come back to the original folder of execution of run_downloader script.
def download_videos(settings: Dict, tag: str):
"""Run the tiktok-scraper command to download videos for a given hashtag.
Note that all the videos are downloaded that are returned by the TikTok API,
making this a time- and data-intensive process.
The list of downloaded video IDs is constucted and returned if the
downloaded folder contains at least 1 video.
os.chdir is used to execute shell commands in the correct folder and then
reused to return to the original folder of execution of run_downloader script.
"""
path = os.path.join(settings["data"], tag, settings["videos"])
os.chdir(path)
@@ -95,27 +97,31 @@ def download_videos(settings: dict, tag: str):
def get_data(file_path: str) -> Any:
"""
Reads the json file and retuns the read data.
"""
"""Read a JSON file and return the read data."""
with open(file_path, "r", encoding="utf-8") as f:
data = json.load(f)
return data
def dump_data(file_path: str, data: List[dict]):
"""
Writes the data to the json file.
"""
def dump_data(file_path: str, data: Any):
"""Write data to a JSON file."""
with open(file_path, "w", encoding="utf-8") as f:
json.dump(data, f)
def log_writer(log_data: List[Tuple[str, Tuple[str, int]]]):
"""
Creates the dictionary of total downloads (posts and videos) per hashtag.
Example : { timetamp : { hashtag : { videos : number_of_new_videos , posts : number_of_new_posts } } }
Writes the dictionary to the log file (logs/log.json).
"""Create the dictionary of total downloads (posts and videos) per hashtag.
Example : {
timetamp : {
hashtag : {
videos : number_of_new_videos ,
posts : number_of_new_posts
}
}
}
Writes the dictionary to the log file (`logs/log.json`).
"""
total = 0
@@ -141,9 +147,7 @@ def log_writer(log_data: List[Tuple[str, Tuple[str, int]]]):
def id_writer(
file_path: str, new_data: List[str], tag: str, status: bool
) -> Tuple[str, int]:
"""
Writes the list of new ids to the post_ids or video_ids files.
"""
"""Write the list of new ids to the post_ids or video_ids file."""
total = len(new_data)
if status:
@@ -165,9 +169,9 @@ def id_writer(
return number_scraped
def post_writer(file_path: str, new_data: List[str], status: bool):
"""
Writes the new posts in the post file of the given hashtag (/data/{hashtag}/posts/data.json)
def post_writer(file_path: str, new_data: List[Dict], status: bool):
"""Write the new posts in the post file of the given hashtag
(`/data/{hashtag}/posts/data.json`).
"""
total = len(new_data)
if status:
@@ -185,9 +189,7 @@ def post_writer(file_path: str, new_data: List[str], status: bool):
def delete_file(file_path: str, file_type: str):
"""
Deletes the directory or the file.
"""
"""Delete a directory or file."""
if not check_existence(file_path, file_type):
raise OSError(f"Attempt to delete file failed: {file_path} does not exist")
elif file_type == "file":
@@ -201,8 +203,7 @@ def delete_file(file_path: str, file_type: str):
def clean_video_files(settings: dict, tag: str, new_data: Optional[List[str]] = None):
"""
Moves the new videos from the tiktok-scraper video folder to /data/{hashtag}/videos/
"""Move the new videos from the tiktok-scraper video folder to `/data/{hashtag}/videos/`.
Deletes the residual tiktok-scraper video folder.
"""
if new_data:

View File

@@ -1,7 +1,7 @@
"""Analyze the frequency of hashtags appearing in the set of given posts.
- The "input_file" argument specifies the JSON file containing post information for a given hashtag
- The "n" argument specifies how many hashtags does the user wants to analyze
- The "hashtag" positional argument specifies the hashtag of scraped posts to analyze
- The "n" positional argument specifies how many hashtags does the user wants to analyze
- Specifying the "-d" flag prints the hashtag frequencies on the shell
- Specifying the "-p" flag plots the hashtag frequencies and saves as a png file
"""
@@ -18,20 +18,20 @@ import matplotlib.pyplot as plt
import matplotlib.ticker as mtick
import seaborn as sns
from file_methods import check_file, check_existence
from global_data import IMAGES
from global_data import IMAGES, FILES
warnings.filterwarnings("ignore", message="Glyph (.*) missing from current font")
sns.set_theme(style="darkgrid")
logger = logging.getLogger()
def create_parser() -> argparse.ArgumentParser:
"""Create the parser and the arguments for the user input."""
parser = argparse.ArgumentParser()
parser.add_argument(
"input_file",
help="The file name of the JSON file containing posts for a given hashtag",
"hashtag",
help="The hashtag of scraped posts to analyze",
)
parser.add_argument("n", help="The number of top n occurrences", type=int)
parser.add_argument(
@@ -65,7 +65,7 @@ def get_hashtags(obj: Dict) -> List[Tuple[str, int]]:
def get_occurrences(filename: str, n: int = 1) -> Dict[str, Any]:
"""Aggregate hashtag frequency information for a specified JSON file.
Return dict `occs` with keys:
Example: {
"total": total posts in the file,
top_n: [[top n hashtags ], [frequencies of corresponding hashtags]]
}
@@ -75,16 +75,17 @@ def get_occurrences(filename: str, n: int = 1) -> Dict[str, Any]:
l = len(obj)
tags = get_hashtags(obj)
occs = {"total": l, "top_n": []}
occs["top_n"] = [[ele[i] for ele in tags[0 : max(l, n)]] for i in range(2)]
occs["top_n"] = [[ele[i] for ele in tags[0 : min(l, n)]] for i in range(2)]
return occs
def plot(n: int, occs: dict, img_folder: str):
"""Save plot of common hashtags as bar chart to file."""
y_pos = list(reversed(range(n - 1)))
y_pos = list(reversed(range(len(occs[0]) - 1)))
max_count = occs["top_n"][1][0]
freqs = [count / max_count * 100 for count in occs["top_n"][1][1:]]
labels = occs["top_n"][0][1:]
hashtag = occs["top_n"][0][0]
fig, ax = plt.subplots(figsize=(5, 6.66))
ax.barh(y_pos, freqs)
@@ -93,16 +94,16 @@ def plot(n: int, occs: dict, img_folder: str):
ax.grid(axis="y")
ax.set_xlabel("Percent of posts with common hashtag")
ax.set_ylim(min(y_pos) - 1, max(y_pos) + 1)
ax.set_title(f'Common hashtags for #{occs["top_n"][0][0]} posts')
ax.set_title(f"Common hashtags for #{hashtag} posts")
ax.xaxis.set_major_formatter(mtick.PercentFormatter(decimals=0))
save_plot(img_folder)
save_plot(img_folder, hashtag)
def save_plot(img_folder):
def save_plot(img_folder, hashtag):
"""Save the plot as a png file in the folder ../data/imgs/"""
now = datetime.now()
current_time = now.strftime("%Y_%m_%d_%H_%M_%S")
filename = f"{img_folder}/{current_time}.png"
filename = f"{img_folder}/{hashtag}_{current_time}.png"
logging.info(f"Plot saved to file: {filename}")
plt.savefig(filename, bbox_inches="tight", facecolor="white", dpi=300)
@@ -132,13 +133,17 @@ if __name__ == "__main__":
raise ValueError(
f"Specified argument `n` (the number of hashtags to analyze) must be greater than zero, not: {args.n}."
)
if not check_existence(args.input_file, "file"):
input_file = data_file = os.path.join(
FILES["data"], args.hashtag, FILES["posts"], FILES["data_file"]
)
if not check_existence(input_file, "file"):
raise FileNotFoundError(
f"Specified argument `input_file` ({args.input_file}) does not exist."
f"File {input_file}) for specified argument `hashtag` ({args.hashtag}) does not exist"
)
base = os.path.splitext(args.input_file)[0]
base = os.path.splitext(input_file)[0]
path = f"./{base}_sorted_hashtags.csv"
occs = get_occurrences(args.input_file, args.n)
occs = get_occurrences(input_file, args.n)
if args.plot:
plot(args.n, occs, img_folder)
else:

View File

@@ -17,28 +17,27 @@ import global_data
import file_methods
import data_methods
logging.config.fileConfig("../logging.config")
logger = logging.getLogger("Logger")
logger = logging.getLogger()
def create_parser() -> argparse.ArgumentParser:
"""
Creates the parser and the arguments for the user input.
"""
"""Create the parser and the arguments for the user input."""
parser = argparse.ArgumentParser(
description="Download the tiktoks for the requested hashtags"
)
parser.add_argument("-t", type=str, nargs="*", help="List of hashtags")
parser.add_argument("-f", type=str, help="File name with the list of hashtags")
parser.add_argument("-p", action="store_true", help="Download posts")
parser.add_argument("-v", action="store_true", help="Download videos")
parser.add_argument("-t", type=str, nargs="*", help="List of hashtags to scrape")
parser.add_argument(
"-f", type=str, help="File name containing list of hashtags to scrape"
)
parser.add_argument("-p", action="store_true", help="Download post data")
parser.add_argument("-v", action="store_true", help="Download video files")
return parser
def get_hashtag_list(file_name: str) -> List[str]:
"""Extract list of newline-separated hashtags from text file."""
if not file_methods.check_existence(file_name, "file"):
raise OSError(f"{file_name} does not exist")
with open(file_name) as f:
@@ -49,10 +48,7 @@ def get_hashtag_list(file_name: str) -> List[str]:
def set_download_settings(download_data_type: Dict[str, bool]) -> Dict[str, Any]:
"""
Loads the constants from global_data into the dict called settings and returns it.
Purpose - easy access to global constants by various functions.
"""
"""Load the constants from global_data module into the `settings` dict."""
settings = {
"data": global_data.FILES["data"],
"ids": global_data.FILES["ids"],
@@ -73,10 +69,13 @@ def set_download_settings(download_data_type: Dict[str, bool]) -> Dict[str, Any]
def get_posts(settings: dict, tag: str) -> Optional[Tuple[str, int]]:
"""
1. calls download_posts in file_methods.py to get the posts for a given hashtag
2. calls extract_posts from data_methods.py to extract new posts if any
3. calls update_posts from data_methods.py to update the id-list with the ids of newly downloaded posts.
"""Scrape trending TikTok post data for the specified hashtag.
1. Calls `file_methods.download_posts` to scrape the post data for a given hashtag
2. Calls `data_methods.extract_posts` to determine which if any posts
haven't previouly been downloaded.
3. Calls `data_methods.update_posts` to update the ID list with the IDs of
newly downloaded posts.
"""
file_path = file_methods.download_posts(settings, tag)
number_scraped = None
@@ -96,11 +95,15 @@ def get_posts(settings: dict, tag: str) -> Optional[Tuple[str, int]]:
def get_videos(settings: dict, tag: str) -> Optional[Tuple[str, int]]:
"""
1. calls download_videos in file_methods.py to get the videos for a given hashtag
2. calls extract_videos from data_methods.py to extract new videos if any
3. calls update_videos from data_methods.py to update the id-list with the ids of newly downloaded videos.
4. the clean_video_files function deletes the residual video folder after the data processing
"""Scrape trending TikTok video files for the specified hashtag.
1. Calls `file_methods.download_videos` to download the video files for a given hashtag
2. Calls `data_methods.extract_videos` to determine which if any videos
haven't previouly been downloaded.
3. Calls `data_methods.update_videos` to update the ID list with the IDs of
newly downloaded videos.
4. Calls `clean_video_files` function to delete the residual video folder
after the data processing.
"""
number_scraped = None
download_list = file_methods.download_videos(settings, tag)
@@ -117,10 +120,7 @@ def get_videos(settings: dict, tag: str) -> Optional[Tuple[str, int]]:
def get_data(
hashtags: list, download_data_type: Dict[str, bool]
) -> List[Tuple[str, Tuple[str, int]]]:
"""
The function checks for the user option "-p", "-v" or both and then
triggers the functions get_posts, get_videos or both, respectively.
"""
"""Check command-line arguments and scrape posts/videos for specified hashtags."""
counter = 0
total_hashtags = len(hashtags)
total_hashtags_offset = total_hashtags - 1
@@ -176,7 +176,7 @@ if __name__ == "__main__":
if not (args.t or args.f):
parser.error(
"No hashtags were given, please use either the `-t` flag or the `-f` flag to provide hashtags."
"No hashtags were given, please use either the `-t` flag or the `-f` flag to specify one or more hashtags."
)
if not (args.p or args.v):