mirror of
https://github.com/bellingcat/tiktok-hashtag-analysis.git
synced 2026-06-08 03:18:31 +03:00
check formatting using black and add type hinting to functions
This commit is contained in:
@@ -1,4 +1,4 @@
|
||||
from collections import namedtuple
|
||||
from typing import NamedTuple
|
||||
import logging, logging.config
|
||||
|
||||
import file_methods
|
||||
@@ -12,17 +12,23 @@ The file contains several functions that perform data processing related tasks.
|
||||
"""
|
||||
|
||||
|
||||
diff = namedtuple("difference", "ids filter_posts")
|
||||
total = namedtuple("total", "total unique")
|
||||
class Diff(NamedTuple):
|
||||
ids: list
|
||||
filter_posts: bool
|
||||
|
||||
|
||||
def get_difference(tag, file, ids):
|
||||
class Total(NamedTuple):
|
||||
total: int
|
||||
unique: int
|
||||
|
||||
|
||||
def get_difference(tag: str, file_name: str, ids: list) -> NamedTuple:
|
||||
"""
|
||||
Compares two sets of ids and returns the difference of the two sets.
|
||||
Purpose - user to filter out the new ids by comparing the set of id list (ids/post_ids.json or videos_ids.json) and the list of newly downloaded ids.
|
||||
"""
|
||||
filter_posts = False
|
||||
current_id_data = file_methods.get_data(file)
|
||||
current_id_data = file_methods.get_data(file_name)
|
||||
if tag in current_id_data:
|
||||
current_ids = current_id_data[tag]
|
||||
set_current_ids = set(current_ids)
|
||||
@@ -36,17 +42,17 @@ def get_difference(tag, file, ids):
|
||||
total_new_ids = len(new_ids)
|
||||
if total_new_ids == total_current_ids:
|
||||
filter_posts = False
|
||||
new_data = diff(new_ids, filter_posts)
|
||||
new_data = Diff(new_ids, filter_posts)
|
||||
else:
|
||||
new_data = diff(new_ids, filter_posts)
|
||||
new_data = Diff(new_ids, filter_posts)
|
||||
return new_data
|
||||
else:
|
||||
else:
|
||||
filter_posts = True
|
||||
new_data = diff(ids, filter_posts)
|
||||
new_data = Diff(ids, filter_posts)
|
||||
return new_data
|
||||
|
||||
|
||||
def extract_posts(settings, file_name, tag):
|
||||
def extract_posts(settings: dict, file_name: str, tag: str) -> list:
|
||||
"""
|
||||
Takes the downloaded file by the tiktok-scraper that contains the posts, and returns the new posts after comparing it the list of posts (from the file ids/post_ids.json) already downloaded.
|
||||
"""
|
||||
@@ -59,7 +65,7 @@ def extract_posts(settings, file_name, tag):
|
||||
|
||||
if not ids:
|
||||
logger.warn(f"No posts were found for the hashtag: {tag}")
|
||||
|
||||
|
||||
status = file_methods.check_existence(settings["post_ids"], "file")
|
||||
if not status:
|
||||
new_data = (ids, posts)
|
||||
@@ -69,7 +75,7 @@ def extract_posts(settings, file_name, tag):
|
||||
if not new_ids:
|
||||
logger.warn(f"No new posts were found for the hashtag: {tag}")
|
||||
elif new_ids.filter_posts:
|
||||
new_posts = [post for post in posts if post['id'] in new_ids.ids]
|
||||
new_posts = [post for post in posts if post["id"] in new_ids.ids]
|
||||
new_data = (new_ids.ids, new_posts)
|
||||
return new_data
|
||||
else:
|
||||
@@ -77,7 +83,7 @@ def extract_posts(settings, file_name, tag):
|
||||
return new_data
|
||||
|
||||
|
||||
def extract_videos(settings, tag, download_list):
|
||||
def extract_videos(settings: dict, tag: str, download_list: list) -> list:
|
||||
"""
|
||||
Tiktok-scraper downloads the videos and puts them in a folder - the list of ids of the downloaded videos is fed to this function as download_list. The function returns the set of new videos after comparing it the list of videos (from the file ids/videos_ids.json) already downloaded.
|
||||
"""
|
||||
@@ -88,13 +94,17 @@ def extract_videos(settings, tag, download_list):
|
||||
else:
|
||||
new_videos = get_difference(tag, settings["video_ids"], download_list)
|
||||
if not new_videos:
|
||||
logger.warn(f"No new videos were found for the {tag} in the downloaded folder.")
|
||||
logger.warn(
|
||||
f"No new videos were found for the {tag} in the downloaded folder."
|
||||
)
|
||||
return None
|
||||
else:
|
||||
return new_videos.ids
|
||||
|
||||
|
||||
def update_posts(file_path, file_type, new_data, tag=None):
|
||||
def update_posts(
|
||||
file_path: str, file_type: str, new_data: list, tag: str = None
|
||||
) -> tuple:
|
||||
"""
|
||||
Updates the list of post ids (in the file ids/post_ids.json) with the ids of the new posts.
|
||||
"""
|
||||
@@ -106,7 +116,7 @@ def update_posts(file_path, file_type, new_data, tag=None):
|
||||
return scraped_data
|
||||
|
||||
|
||||
def update_videos(settings, new_data, tag):
|
||||
def update_videos(settings: str, new_data: list, tag: str) -> tuple:
|
||||
"""
|
||||
Updates the list of video ids (in the file ids/video_ids.json) with the ids of the new videos.
|
||||
"""
|
||||
@@ -117,7 +127,7 @@ def update_videos(settings, new_data, tag):
|
||||
return log
|
||||
|
||||
|
||||
def get_total_posts(file_path, tag):
|
||||
def get_total_posts(file_path: str, tag: str) -> NamedTuple:
|
||||
"""
|
||||
Returns total count of ids in a id list along with the number of unique ids among them.
|
||||
"""
|
||||
@@ -128,16 +138,18 @@ def get_total_posts(file_path, tag):
|
||||
data = file_methods.get_data(file_path)
|
||||
total_posts = len(data[tag])
|
||||
unique = len(set(data[tag]))
|
||||
t = total(total_posts, unique)
|
||||
t = Total(total_posts, unique)
|
||||
return t
|
||||
|
||||
|
||||
def print_total(file_path, tag, data_type):
|
||||
def print_total(file_path: str, tag: str, data_type: str):
|
||||
"""
|
||||
Prints the total count for posts or videos for a hashtag. Calls the function get_total_posts for sanity check that there are no repeating ids in the id lists.
|
||||
"""
|
||||
total = get_total_posts(file_path, tag)
|
||||
if (total.total == total.unique):
|
||||
if total.total == total.unique:
|
||||
logger.info(f"Scraped {total.total} {data_type} containing the hashtag '{tag}'")
|
||||
else:
|
||||
logger.warn(f"Out of total {data_type} for the hashtag {tag} {total.total}, only {total.unique} are unique. Something is going wrong...")
|
||||
logger.warn(
|
||||
f"Out of total {data_type} for the hashtag {tag} {total.total}, only {total.unique} are unique. Something is going wrong..."
|
||||
)
|
||||
|
||||
@@ -14,40 +14,41 @@ The file contains the functions that operate on files, such as writing or readin
|
||||
"""
|
||||
|
||||
|
||||
def create_file(name, file_type):
|
||||
def create_file(name: str, file_type: str):
|
||||
"""
|
||||
Creates a file or directory.
|
||||
"""
|
||||
if (file_type == "dir"):
|
||||
if file_type == "dir":
|
||||
os.makedirs(name, mode=0o777)
|
||||
elif (file_type == "file"):
|
||||
with open(name, "w"): pass
|
||||
elif file_type == "file":
|
||||
with open(name, "w"):
|
||||
pass
|
||||
else:
|
||||
raise ValueError(f"{file_type} has to be either 'dir' or 'file'")
|
||||
|
||||
|
||||
def check_existence(file_path, file_type):
|
||||
def check_existence(file_path: str, file_type: str):
|
||||
"""
|
||||
Checks the existence of a file or a directory. If not found, returns False, else returns True.
|
||||
"""
|
||||
if (file_type == "file"):
|
||||
if file_type == "file":
|
||||
return os.path.isfile(file_path)
|
||||
elif (file_type == "dir"):
|
||||
elif file_type == "dir":
|
||||
return os.path.isdir(file_path)
|
||||
else:
|
||||
raise ValueError(f"{file_type} has to be either 'dir' or 'file'")
|
||||
|
||||
|
||||
def check_file(file_path, file_type):
|
||||
def check_file(file_path: str, file_type: str):
|
||||
"""
|
||||
Creates a file or directory, if not found. Else, returns nothing.
|
||||
"""
|
||||
status = check_existence(file_path, file_type)
|
||||
if not status:
|
||||
create_file(file_path, file_type)
|
||||
create_file(file_path, file_type)
|
||||
|
||||
|
||||
def download_posts(settings, tag):
|
||||
def download_posts(settings: dict, tag: str):
|
||||
"""
|
||||
Runs the tiktok-scraper command to download posts for a given hashtag.
|
||||
Returns the path to the downloaded file of posts. If no file was downloaded, prints the error and returns nothing in order to move on.
|
||||
@@ -55,66 +56,69 @@ def download_posts(settings, tag):
|
||||
"""
|
||||
path = os.path.join(settings["data"], tag, settings["posts"])
|
||||
os.chdir(path)
|
||||
tiktok_command = f"tiktok-scraper hashtag {tag} -t 'json'"
|
||||
output = subprocess.check_output(tiktok_command, shell=True, encoding = 'utf-8')
|
||||
tiktok_command = f"tiktok-scraper hashtag {tag} -t 'json'"
|
||||
output = subprocess.check_output(tiktok_command, shell=True, encoding="utf-8")
|
||||
new_file = output.split()[-1]
|
||||
if ("json" in new_file):
|
||||
if "json" in new_file:
|
||||
os.chdir("../../../tiktok_downloader")
|
||||
return new_file
|
||||
return new_file
|
||||
else:
|
||||
logger.warn(f"Something's wrong with what is returned by tiktok-scraper for the hashtag {tag} - *{new_file}* is not a json file.\n\ntiktok-scraper returned {output}")
|
||||
logger.warn(
|
||||
f"Something's wrong with what is returned by tiktok-scraper for the hashtag {tag} - *{new_file}* is not a json file.\n\ntiktok-scraper returned {output}"
|
||||
)
|
||||
os.chdir("../../../tiktok_downloader")
|
||||
|
||||
|
||||
def download_videos(settings, tag):
|
||||
def download_videos(settings: dict, tag: str):
|
||||
"""
|
||||
Runs the tiktok-scraper command to download videos for a given hashtag. Note that all the videos are downloaded that are returned by the tiktok api and as a result, its a time and data consuming process.
|
||||
Runs the tiktok-scraper command to download videos for a given hashtag. Note that all the videos are downloaded that are returned by the tiktok api and as a result, its a time and data consuming process.
|
||||
The list of downloaded video ids is constucted and returned if the downloaded folder contains at least 1 video.
|
||||
os.chdir is used to execute shell commands in the right folders and then reused to come back to the original folder of execution of run_downloader script.
|
||||
"""
|
||||
path = os.path.join(settings["data"], tag, settings["videos"])
|
||||
os.chdir(path)
|
||||
tiktok_command = f"tiktok-scraper hashtag {tag} -d"
|
||||
tiktok_command = f"tiktok-scraper hashtag {tag} -d"
|
||||
result = subprocess.check_output(tiktok_command, shell=True)
|
||||
downloaded_list_tmp = os.listdir(f"./#{tag}")
|
||||
if downloaded_list_tmp:
|
||||
downloaded_list = []
|
||||
for file in downloaded_list_tmp:
|
||||
file = file.split('.')[0]
|
||||
file = file.split(".")[0]
|
||||
downloaded_list.append(file)
|
||||
|
||||
|
||||
os.chdir("../../../tiktok_downloader")
|
||||
return downloaded_list
|
||||
else:
|
||||
logger.warn(f"No video files were downloaded for the hashtag {tag}.")
|
||||
os.chdir("../../../tiktok_downloader")
|
||||
shutil.rmtree(settings['videos_delete'])
|
||||
|
||||
shutil.rmtree(settings["videos_delete"])
|
||||
|
||||
def get_data(file_path):
|
||||
|
||||
def get_data(file_path: str) -> list:
|
||||
"""
|
||||
Reads the json file and retuns the read data.
|
||||
"""
|
||||
with open(file_path, "r", encoding = "utf-8") as f:
|
||||
with open(file_path, "r", encoding="utf-8") as f:
|
||||
data = json.load(f)
|
||||
return data
|
||||
|
||||
|
||||
def dump_data(file_path, data):
|
||||
def dump_data(file_path: str, data: list):
|
||||
"""
|
||||
Writes the data to the json file.
|
||||
"""
|
||||
with open(file_path, "w", encoding = "utf-8") as f:
|
||||
with open(file_path, "w", encoding="utf-8") as f:
|
||||
json.dump(data, f)
|
||||
|
||||
def log_writer(log_data):
|
||||
|
||||
def log_writer(log_data: list):
|
||||
"""
|
||||
Creates the dictionary of total downloads (posts and videos) per hashtag.
|
||||
Example : {timstamp : {hashtag : { videos : number_of_new_videos , posts : number_of_new_posts } } }
|
||||
Example : { timetamp : { hashtag : { videos : number_of_new_videos , posts : number_of_new_posts } } }
|
||||
Writes the dictionary to the log file (logs/log.json).
|
||||
"""
|
||||
total = 0
|
||||
scraped_summary_dict = {}
|
||||
scraped_summary_dict: dict
|
||||
for hashtag, (data_type, count) in log_data:
|
||||
if hashtag in scraped_summary_dict:
|
||||
if data_type in scraped_summary_dict[hashtag]:
|
||||
@@ -123,18 +127,18 @@ def log_writer(log_data):
|
||||
scraped_summary_dict[hashtag][data_type] = count
|
||||
total += count
|
||||
else:
|
||||
scraped_summary_dict[hashtag] = {data_type : count}
|
||||
scraped_summary_dict[hashtag] = {data_type: count}
|
||||
total += count
|
||||
|
||||
now = datetime.now()
|
||||
now_str = now.strftime("%d-%m-%Y %H:%M:%S")
|
||||
data = { now_str : scraped_summary_dict }
|
||||
data = {now_str: scraped_summary_dict}
|
||||
|
||||
logger.warn(f"Logged post data: {data}")
|
||||
logger.info(f"Successfully scraped {total} total entries")
|
||||
|
||||
|
||||
def id_writer(file_path, new_data, tag, status):
|
||||
def id_writer(file_path: str, new_data: list, tag: str, status: bool) -> tuple:
|
||||
"""
|
||||
Writes the list of new ids to the post_ids or video_ids files.
|
||||
"""
|
||||
@@ -145,20 +149,20 @@ def id_writer(file_path, new_data, tag, status):
|
||||
if tag in data:
|
||||
data[tag] += new_data
|
||||
else:
|
||||
data[tag]= new_data
|
||||
data[tag] = new_data
|
||||
dump_data(file_path, data)
|
||||
except json.decoder.JSONDecodeError:
|
||||
data = { tag : new_data }
|
||||
data = {tag: new_data}
|
||||
dump_data(file_path, data)
|
||||
else:
|
||||
data = { tag : new_data }
|
||||
data = {tag: new_data}
|
||||
dump_data(file_path, data)
|
||||
logger.debug(f"SUCCESS - {total} entries added to {file_path}")
|
||||
number_scraped = (tag, total)
|
||||
return number_scraped
|
||||
|
||||
|
||||
def post_writer(file_path, new_data, status):
|
||||
def post_writer(file_path: str, new_data: list, status: bool):
|
||||
"""
|
||||
Writes the new posts in the post file of the given hashtag (/data/{hashtag}/posts/data.json)
|
||||
"""
|
||||
@@ -177,31 +181,35 @@ def post_writer(file_path, new_data, status):
|
||||
logger.debug(f"SUCCESS - {total} entries added to {file_path}")
|
||||
|
||||
|
||||
def delete_file(file_path, file_type):
|
||||
def delete_file(file_path: str, file_type: str):
|
||||
"""
|
||||
Deletes the directory or the file.
|
||||
"""
|
||||
if not check_existence(file_path, file_type):
|
||||
raise OSError(f"Attempt to delete file failed: {file_path} does not exist")
|
||||
elif (file_type == "file"):
|
||||
elif file_type == "file":
|
||||
os.remove(file_path)
|
||||
logger.debug(f"Successfully deleted {file_path}")
|
||||
elif (file_type == "dir"):
|
||||
elif file_type == "dir":
|
||||
os.rmdir(file_path)
|
||||
logger.debug(f"Successfully deleted {file_path}")
|
||||
else:
|
||||
raise OSError("{file_type} needs to be either 'file' or 'dir'")
|
||||
|
||||
|
||||
def clean_video_files(settings, tag, new_data=None):
|
||||
def clean_video_files(settings: dict, tag: str, new_data: list = None):
|
||||
"""
|
||||
Moves the new videos from the tiktok-scraper video folder to /data/{hashtag}/videos/
|
||||
Deletes the residual tiktok-scraper video folder.
|
||||
"""
|
||||
if new_data:
|
||||
for file in new_data:
|
||||
settings["videos_from"] = settings['data'] + f"/{tag}/videos/#{tag}/{file}.mp4"
|
||||
shutil.move(settings['videos_from'], settings['videos_to'])
|
||||
|
||||
shutil.rmtree(settings['videos_delete'])
|
||||
logger.debug(f"Successfully deleted the folder {settings['videos_delete']} folder of videos.")
|
||||
settings["videos_from"] = (
|
||||
settings["data"] + f"/{tag}/videos/#{tag}/{file}.mp4"
|
||||
)
|
||||
shutil.move(settings["videos_from"], settings["videos_to"])
|
||||
|
||||
shutil.rmtree(settings["videos_delete"])
|
||||
logger.debug(
|
||||
f"Successfully deleted the folder {settings['videos_delete']} folder of videos."
|
||||
)
|
||||
|
||||
@@ -17,24 +17,23 @@ DATA_FILE = "data.json"
|
||||
|
||||
|
||||
FILES = {
|
||||
"data" : DATA,
|
||||
"ids" : IDS,
|
||||
"posts" : POSTS,
|
||||
"videos" : VIDEOS,
|
||||
"images" : IMAGES,
|
||||
"post_ids" : f"{DATA}/{IDS}/{POST_IDS}",
|
||||
"video_ids" : f"{DATA}/{IDS}/{VIDEO_IDS}",
|
||||
"data_file" : f"{DATA_FILE}",
|
||||
"downloads" : [],
|
||||
}
|
||||
|
||||
"data": DATA,
|
||||
"ids": IDS,
|
||||
"posts": POSTS,
|
||||
"videos": VIDEOS,
|
||||
"images": IMAGES,
|
||||
"post_ids": f"{DATA}/{IDS}/{POST_IDS}",
|
||||
"video_ids": f"{DATA}/{IDS}/{VIDEO_IDS}",
|
||||
"data_file": f"{DATA_FILE}",
|
||||
"downloads": [],
|
||||
}
|
||||
|
||||
|
||||
# Commands
|
||||
tag = ""
|
||||
|
||||
PARAMETERS = {
|
||||
"scraper_attempts" : 3,
|
||||
# "number_of_videos" : 3, # Number of videos to be downloaded by tiktok-scraper.
|
||||
"sleep" : 8
|
||||
}
|
||||
"scraper_attempts": 3,
|
||||
# "number_of_videos" : 3, # Number of videos to be downloaded by tiktok-scraper.
|
||||
"sleep": 8,
|
||||
}
|
||||
|
||||
@@ -3,12 +3,14 @@ import json
|
||||
import argparse
|
||||
from datetime import datetime
|
||||
import warnings
|
||||
|
||||
warnings.filterwarnings("ignore", message="Glyph (.*) missing from current font")
|
||||
import logging
|
||||
|
||||
import matplotlib.pyplot as plt
|
||||
import matplotlib.ticker as mtick
|
||||
import seaborn as sns
|
||||
|
||||
sns.set_theme(style="darkgrid")
|
||||
|
||||
from file_methods import check_file, check_existence
|
||||
@@ -21,20 +23,26 @@ Plots the frequency of hashtags appearing in the set of given posts.
|
||||
|
||||
def get_hashtags(obj):
|
||||
if not obj:
|
||||
raise ValueError(f'Empty item, no hashtags to be extracted.')
|
||||
raise ValueError(f"Empty item, no hashtags to be extracted.")
|
||||
else:
|
||||
hashtags = {}
|
||||
tags = [ [tag['name'] for tag in ele['hashtags']] for ele in obj ]
|
||||
tags = [ set(ele) for ele in tags ]
|
||||
{ tag: (1 if tag not in hashtags and not hashtags.update({tag: 1})
|
||||
else hashtags[tag] + 1 and not hashtags.update({tag: hashtags[tag] + 1}))
|
||||
for ele in tags for tag in ele }
|
||||
tags = [[tag["name"] for tag in ele["hashtags"]] for ele in obj]
|
||||
tags = [set(ele) for ele in tags]
|
||||
{
|
||||
tag: (
|
||||
1
|
||||
if tag not in hashtags and not hashtags.update({tag: 1})
|
||||
else hashtags[tag] + 1 and not hashtags.update({tag: hashtags[tag] + 1})
|
||||
)
|
||||
for ele in tags
|
||||
for tag in ele
|
||||
}
|
||||
hashtags = sorted(hashtags.items(), key=lambda e: e[1], reverse=True)
|
||||
|
||||
return hashtags
|
||||
|
||||
|
||||
def get_occurrences(filename, n=1 , sort=True):
|
||||
def get_occurrences(filename, n=1, sort=True):
|
||||
"""
|
||||
Takes the json file containing posts and returns a dictionary:
|
||||
local variable occs = {
|
||||
@@ -46,29 +54,26 @@ def get_occurrences(filename, n=1 , sort=True):
|
||||
obj = json.load(f)
|
||||
l = len(obj)
|
||||
tags = get_hashtags(obj)
|
||||
occs = {
|
||||
"total": l,
|
||||
"top_n": []
|
||||
}
|
||||
occs["top_n"] = [ [ ele[i] for ele in tags[0:n] ] for i in range(2)]
|
||||
occs = {"total": l, "top_n": []}
|
||||
occs["top_n"] = [[ele[i] for ele in tags[0:n]] for i in range(2)]
|
||||
return occs
|
||||
|
||||
|
||||
def plot(n, occs, img_folder):
|
||||
y_pos = list(reversed(range(n - 1)))
|
||||
max_count = occs["top_n"][1][0]
|
||||
freqs = [count/max_count * 100 for count in occs["top_n"][1][1:]]
|
||||
freqs = [count / max_count * 100 for count in occs["top_n"][1][1:]]
|
||||
labels = occs["top_n"][0][1:]
|
||||
|
||||
fig, ax = plt.subplots(figsize = (5, 6.66))
|
||||
fig, ax = plt.subplots(figsize=(5, 6.66))
|
||||
ax.barh(y_pos, freqs)
|
||||
ax.set_yticks(y_pos)
|
||||
ax.set_yticklabels(labels)
|
||||
ax.grid(axis = 'y')
|
||||
ax.set_xlabel('Percent of posts with common hashtag')
|
||||
ax.set_ylim(min(y_pos)-1, max(y_pos)+1)
|
||||
ax.grid(axis="y")
|
||||
ax.set_xlabel("Percent of posts with common hashtag")
|
||||
ax.set_ylim(min(y_pos) - 1, max(y_pos) + 1)
|
||||
ax.set_title(f'Common hashtags for #{occs["top_n"][0][0]} posts')
|
||||
ax.xaxis.set_major_formatter(mtick.PercentFormatter(decimals = 0))
|
||||
ax.xaxis.set_major_formatter(mtick.PercentFormatter(decimals=0))
|
||||
save_plot(img_folder)
|
||||
|
||||
|
||||
@@ -78,10 +83,14 @@ def print_occurrences(occs):
|
||||
"""
|
||||
row_number = 0
|
||||
total_posts = occs["total"]
|
||||
print ("{:<8} {:<15} {:<15} {:<15}".format("Rank", 'Hashtag','Occurrences','Frequency'))
|
||||
for key,value in zip(occs["top_n"][0], occs["top_n"][1]):
|
||||
ratio = value/total_posts
|
||||
print ("{:<8} {:<15} {:<15} {:<15}".format(row_number, key, value, ratio))
|
||||
print(
|
||||
"{:<8} {:<15} {:<15} {:<15}".format(
|
||||
"Rank", "Hashtag", "Occurrences", "Frequency"
|
||||
)
|
||||
)
|
||||
for key, value in zip(occs["top_n"][0], occs["top_n"][1]):
|
||||
ratio = value / total_posts
|
||||
print("{:<8} {:<15} {:<15} {:<15}".format(row_number, key, value, ratio))
|
||||
row_number += 1
|
||||
|
||||
|
||||
@@ -92,8 +101,24 @@ def save_plot(img_folder):
|
||||
now = datetime.now()
|
||||
current_time = now.strftime("%Y_%m_%d_%H_%M_%S")
|
||||
filename = f"{img_folder}/{current_time}.png"
|
||||
logging.info(f'Plot saved to file: {filename}')
|
||||
plt.savefig(filename, bbox_inches = 'tight', facecolor = 'white', dpi = 300)
|
||||
logging.info(f"Plot saved to file: {filename}")
|
||||
plt.savefig(filename, bbox_inches="tight", facecolor="white", dpi=300)
|
||||
|
||||
|
||||
def create_parser():
|
||||
"""
|
||||
Creates the parser and the arguments for the user input.
|
||||
"""
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("input_file", help="The json hashtag file name")
|
||||
parser.add_argument("n", help="The number of top n occurrences", type=int)
|
||||
parser.add_argument(
|
||||
"-p", "--plot", help="Plot the occurrences", action="store_true"
|
||||
)
|
||||
parser.add_argument(
|
||||
"-d", "--print", help="List top n hashtags", action="store_true"
|
||||
)
|
||||
return parser
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
@@ -106,16 +131,16 @@ if __name__ == "__main__":
|
||||
"""
|
||||
img_folder = IMAGES
|
||||
check_file(img_folder, "dir")
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("input_file", help="The json hashtag file name")
|
||||
parser.add_argument("n", help="The number of top n occurrences", type=int)
|
||||
parser.add_argument("-p", "--plot", help="Plot the occurrences", action="store_true")
|
||||
parser.add_argument("-d", "--print", help="List top n hashtags", action="store_true")
|
||||
parser = create_parser()
|
||||
args = parser.parse_args()
|
||||
if args.n < 1:
|
||||
raise ValueError(f"Specified argument `n` (the number of hashtags to analyze) must be greater than zero, not: {args.n}.")
|
||||
if not check_existence(args.input_file, 'file'):
|
||||
raise FileNotFoundError(f"Specified argument `input_file` ({args.input_file}) does not exist.")
|
||||
raise ValueError(
|
||||
f"Specified argument `n` (the number of hashtags to analyze) must be greater than zero, not: {args.n}."
|
||||
)
|
||||
if not check_existence(args.input_file, "file"):
|
||||
raise FileNotFoundError(
|
||||
f"Specified argument `input_file` ({args.input_file}) does not exist."
|
||||
)
|
||||
base = os.path.splitext(args.input_file)[0]
|
||||
path = f"./{base}_sorted_hashtags.csv"
|
||||
occs = get_occurrences(args.input_file, args.n)
|
||||
|
||||
@@ -12,19 +12,24 @@ logging.config.fileConfig("../logging.config")
|
||||
logger = logging.getLogger("Logger")
|
||||
|
||||
|
||||
def get_hashtag_list(file_name):
|
||||
if not file_methods.check_existence(file_name, 'file'):
|
||||
def get_hashtag_list(file_name: str) -> list:
|
||||
if not file_methods.check_existence(file_name, "file"):
|
||||
raise OSError(f"{file_name} does not exist")
|
||||
with open(file_name) as f:
|
||||
tags = list(filter(None, [line.strip() for line in f if not line.startswith("#")]))
|
||||
tags = list(
|
||||
filter(None, [line.strip() for line in f if not line.startswith("#")])
|
||||
)
|
||||
return tags
|
||||
|
||||
|
||||
def create_parser():
|
||||
# Creating the parser
|
||||
parser = argparse.ArgumentParser(description="Download the tiktoks for the requested hashtags")
|
||||
"""
|
||||
Creates the parser and the arguments for the user input.
|
||||
"""
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Download the tiktoks for the requested hashtags"
|
||||
)
|
||||
|
||||
# Adding the arguments
|
||||
parser.add_argument("-t", type=str, nargs="*", help="List of hashtags")
|
||||
parser.add_argument("-f", type=str, help="File name with the list of hashtags")
|
||||
parser.add_argument("-p", action="store_true", help="Download posts")
|
||||
@@ -33,9 +38,9 @@ def create_parser():
|
||||
return parser
|
||||
|
||||
|
||||
def set_download_settings(download_data_type):
|
||||
def set_download_settings(download_data_type: str) -> dict:
|
||||
"""
|
||||
Loads the constants from global_data into the dict called settings and returns it.
|
||||
Loads the constants from global_data into the dict called settings and returns it.
|
||||
Purpose - easy access to global constants by various functions.
|
||||
"""
|
||||
settings = {}
|
||||
@@ -48,16 +53,15 @@ def set_download_settings(download_data_type):
|
||||
settings["posts"] = global_data.FILES["posts"]
|
||||
settings["post_ids"] = global_data.FILES["post_ids"]
|
||||
settings["data_file"] = global_data.FILES["data_file"]
|
||||
|
||||
|
||||
if download_data_type["videos"]:
|
||||
settings["videos"] = global_data.FILES["videos"]
|
||||
settings["video_ids"] = global_data.FILES["video_ids"]
|
||||
|
||||
|
||||
return settings
|
||||
|
||||
|
||||
|
||||
def get_posts(settings, tag):
|
||||
def get_posts(settings: dict, tag: str) -> tuple:
|
||||
"""
|
||||
1. calls download_posts in file_methods.py to get the posts for a given hashtag
|
||||
2. calls extract_posts from data_methods.py to extract new posts if any
|
||||
@@ -68,21 +72,24 @@ def get_posts(settings, tag):
|
||||
if file_path:
|
||||
new_data = data_methods.extract_posts(settings, file_path, tag)
|
||||
if new_data:
|
||||
data_file = os.path.join(settings["data"], tag, settings["posts"], settings["data_file"])
|
||||
data_file = os.path.join(
|
||||
settings["data"], tag, settings["posts"], settings["data_file"]
|
||||
)
|
||||
data_methods.update_posts(data_file, "file", new_data[1])
|
||||
number_scraped = data_methods.update_posts(settings["post_ids"], "file", new_data[0], tag)
|
||||
number_scraped = data_methods.update_posts(
|
||||
settings["post_ids"], "file", new_data[0], tag
|
||||
)
|
||||
file_methods.delete_file(file_path, "file")
|
||||
|
||||
|
||||
return number_scraped
|
||||
|
||||
|
||||
|
||||
def get_videos(settings, tag):
|
||||
def get_videos(settings: dict, tag: str) -> tuple:
|
||||
"""
|
||||
1. calls download_videos in file_methods.py to get the videos for a given hashtag
|
||||
2. calls extract_videos from data_methods.py to extract new videos if any
|
||||
3. calls update_videos from data_methods.py to update the id-list with the ids of newly downloaded videos.
|
||||
4. the clean_video_files function deletes the residual video folder after the data processing
|
||||
4. the clean_video_files function deletes the residual video folder after the data processing
|
||||
"""
|
||||
number_scraped = ()
|
||||
download_list = file_methods.download_videos(settings, tag)
|
||||
@@ -96,46 +103,54 @@ def get_videos(settings, tag):
|
||||
return number_scraped
|
||||
|
||||
|
||||
|
||||
def get_data(hashtags, download_data_type):
|
||||
def get_data(hashtags: list, download_data_type: str) -> list:
|
||||
"""
|
||||
The function checks for the user option "-p", "-v" or both and then
|
||||
triggers the functions get_posts, get_videos or both, respectively.
|
||||
triggers the functions get_posts, get_videos or both, respectively.
|
||||
"""
|
||||
counter = 0
|
||||
total_hashtags = len(hashtags)
|
||||
total_hashtags_offset = total_hashtags - 1
|
||||
scraped_summary_list = []
|
||||
|
||||
|
||||
if download_data_type["posts"]:
|
||||
settings = set_download_settings(download_data_type)
|
||||
while counter < total_hashtags:
|
||||
tag = hashtags[counter]
|
||||
file_methods.check_file(os.path.join(settings["data"], tag, settings["posts"]), "dir")
|
||||
file_methods.check_file(os.path.join(settings["data"], tag, settings["posts"], settings["data_file"]), "file")
|
||||
file_methods.check_file(
|
||||
os.path.join(settings["data"], tag, settings["posts"]), "dir"
|
||||
)
|
||||
file_methods.check_file(
|
||||
os.path.join(
|
||||
settings["data"], tag, settings["posts"], settings["data_file"]
|
||||
),
|
||||
"file",
|
||||
)
|
||||
res = get_posts(settings, tag)
|
||||
if res:
|
||||
number_scraped = ( res[0], ( "posts", res[1] ) )
|
||||
number_scraped = (res[0], ("posts", res[1]))
|
||||
scraped_summary_list.append(number_scraped)
|
||||
data_methods.print_total(settings["post_ids"], tag, "posts")
|
||||
|
||||
|
||||
counter += 1
|
||||
if counter < total_hashtags_offset:
|
||||
time.sleep(settings["sleep"])
|
||||
|
||||
|
||||
if download_data_type["videos"]:
|
||||
settings = set_download_settings(download_data_type)
|
||||
while counter < total_hashtags:
|
||||
tag = hashtags[counter]
|
||||
file_methods.check_file(os.path.join(settings["data"], tag, settings["videos"]), "dir")
|
||||
settings["videos_delete"] = settings['data'] + f"/{tag}/videos/#{tag}"
|
||||
settings["videos_to"] = settings['data'] + f"/{tag}/videos"
|
||||
file_methods.check_file(
|
||||
os.path.join(settings["data"], tag, settings["videos"]), "dir"
|
||||
)
|
||||
settings["videos_delete"] = settings["data"] + f"/{tag}/videos/#{tag}"
|
||||
settings["videos_to"] = settings["data"] + f"/{tag}/videos"
|
||||
res = get_videos(settings, tag)
|
||||
if res:
|
||||
res = ( res[0], ( "videos", res[1]))
|
||||
res = (res[0], ("videos", res[1]))
|
||||
scraped_summary_list.append(res)
|
||||
data_methods.print_total(settings["video_ids"], tag, "videos")
|
||||
|
||||
|
||||
counter += 1
|
||||
if counter < total_hashtags_offset:
|
||||
time.sleep(settings["sleep"])
|
||||
@@ -148,10 +163,14 @@ if __name__ == "__main__":
|
||||
args = parser.parse_args()
|
||||
|
||||
if not (args.t or args.f):
|
||||
parser.error("No hashtags were given, please use either -t option or -f to provide hashtags.")
|
||||
|
||||
parser.error(
|
||||
"No hashtags were given, please use either -t option or -f to provide hashtags."
|
||||
)
|
||||
|
||||
if not (args.p or args.v):
|
||||
parser.error("No argument given, please specify either -p for posts or -v videos or both.")
|
||||
parser.error(
|
||||
"No argument given, please specify either -p for posts or -v videos or both."
|
||||
)
|
||||
|
||||
if args.t:
|
||||
hashtags = args.t
|
||||
@@ -161,13 +180,12 @@ if __name__ == "__main__":
|
||||
|
||||
logger.info(f"Hashtags to scrape: {hashtags}")
|
||||
if not hashtags:
|
||||
raise ValueError("No hashtags were specified: please use either the -t flag to specify a sspace-separated list of one or more hashtags as a command-line argument, or use the -f flag to specify a text file of newline-separated hashtags.")
|
||||
raise ValueError(
|
||||
"No hashtags were specified: please use either the -t flag to specify a sspace-separated list of one or more hashtags as a command-line argument, or use the -f flag to specify a text file of newline-separated hashtags."
|
||||
)
|
||||
|
||||
download_data_type = {"posts": args.p, "videos": args.v}
|
||||
|
||||
download_data_type = {
|
||||
"posts": args.p,
|
||||
"videos": args.v
|
||||
}
|
||||
|
||||
scraped_summary_list = get_data(hashtags, download_data_type)
|
||||
if scraped_summary_list:
|
||||
file_methods.log_writer(scraped_summary_list)
|
||||
|
||||
Reference in New Issue
Block a user