new version

This commit is contained in:
X
2022-02-07 01:15:46 +01:00
commit 2e78f39c5c
10 changed files with 696 additions and 0 deletions

View File

@@ -0,0 +1,108 @@
import os, sys
import csv, json
import argparse
import matplotlib.pyplot as plt
from datetime import datetime
sys.path.insert(0, '../tiktok_downloader')
import file_methods, global_data
def get_hashtags(obj):
if not obj:
print(f'ERROR: Empty item, no hashtags to be extracted.')
return
else:
hashtags = {}
l = len(obj)
for i in range(l):
for hashtag in obj[i]['hashtags']:
if hashtag['name'] in hashtags:
hashtags[hashtag['name']].add(i)
else:
hashtags[hashtag['name']] = {i}
return hashtags
def get_occurrences(filename, n=1 , sort=True):
with open(filename) as f:
obj = json.load(f)
l = len(obj)
tags = get_hashtags(obj)
tags = {key: (len(value), value) for (key, value) in tags.items()}
if not sort:
k = list(tags.keys())
v = list(tags.values())
return obj, k, v
else:
sorted_tags = {k: v for k,v in sorted(tags.items(), key=lambda item: item[1], reverse=True)}
k = list(sorted_tags.keys())
v = list(sorted_tags.values())
k = k[:n]
v_total = [i[0] for i in v]
v_total = v_total[:n]
return l, k, v_total
def plot(n, length, k, v, img_folder):
plt.scatter(k, v)
plt.tight_layout()
plt.xticks(rotation=45)
plt.title(f'Hashtag Distribution')
plt.xlabel(f'Top {n} hashtags from {length} posts.')
plt.ylabel(f'Number of occurrences')
save_plot(plt, img_folder)
plt.show(block=None)
return
def print_occurrences(l, k, v):
row_number = 0
total_posts = l
print ("{:<8} {:<15} {:<15} {:<15}".format("Rank", 'Hashtag','Occurrences',f'Frequency (Occurrences/Total-Posts({l}))'))
#print(f'Hashtag Occurrences Frequency(Occurances/Total-Posts)')
for key,value in zip(k, v):
ratio = value/total_posts
print ("{:<8} {:<15} {:<15} {:<15}".format(row_number, key, value, ratio))
#print(f'{row_number}\t{key}\t\t{value}\t\t{ratio:.3f}')
row_number += 1
return
def save_plot(plt, img_folder):
try:
now = datetime.now()
current_time = now.strftime("%Y_%m_%d_%H_%M_%S")
plt.savefig(f"{img_folder}/{current_time}.png")
return
except: raise
if __name__ == "__main__":
img_folder = global_data.IMAGES
file_methods.check_file(img_folder, "dir")
parser = argparse.ArgumentParser()
parser.add_argument("input_file", help="The json hashtag file name")
parser.add_argument("n", help="The number of top n occurrences", type=int)
parser.add_argument("-p", "--plot", help="Plot the occurrences", action="store_true")
parser.add_argument("-d", "--print", help="List top n hashtags", action="store_true")
args = parser.parse_args()
if args.input_file and args.n:
if args.n < 1:
print(f"Please make sure the number of top occurrences is a positive integer.")
sys.exit()
base = os.path.splitext(args.input_file)[0]
path = f"./{base}_sorted_hashtags.csv"
if args.plot:
length, keys, values = get_occurrences(args.input_file, args.n)
plot(args.n, length, keys, values, img_folder)
else:
length, keys, values = get_occurrences(args.input_file, args.n)
print_occurrences(length, keys, values)
else:
print(f'ERROR: either {args.input_file} or {args.n} or both contains error.')

View File

@@ -0,0 +1,4 @@
"""
Yet to be written ...
"""

View File

@@ -0,0 +1,123 @@
import os
from collections import namedtuple
from datetime import datetime
import global_data
import file_methods
Difference = namedtuple("Difference", "new_ids size")
Total = namedtuple("Total", "total unique")
def get_difference(tag, file, ids):
maiden_entry = False
current_id_data = file_methods.get_data(file)
if tag in current_id_data:
current_ids = current_id_data[tag]
set1 = set(current_ids)
set2 = set(ids)
new_ids = set2.difference(set1)
if new_ids:
new_ids = list(new_ids)
size = len(new_ids)
diff = Difference(new_ids, size)
return (diff, maiden_entry)
else:
return ([], maiden_entry)
else:
maiden_entry = True
return (ids, maiden_entry)
def extract_posts(settings, file_name, tag):
ids = []
posts = []
new_posts = []
posts = file_methods.get_data(file_name)
for post in posts:
ids.append(post["id"])
if not ids:
print(f"WARNING: no posts were found for {tag} in the file - {file_name}")
return
status = file_methods.check_existence(settings["post_ids"], "file")
if not status:
new_data = (ids, posts)
return new_data
else:
res = get_difference(tag, settings["post_ids"], ids)
if res[1]:
new_data = (ids, posts)
return new_data
else:
if res[0]:
for i in res[0].new_ids:
for post in posts:
if (i == post["id"]):
new_posts.append(post)
new_data = (res[0].new_ids, new_posts)
return new_data
else:
print(f"WARNING: No new posts were found in the downloaded file - {file_name}")
return
def extract_videos(settings, tag, download_list):
status = file_methods.check_existence(settings["video_ids"], "file")
if not status:
new_data = download_list
return new_data
else:
res = get_difference(tag, settings["video_ids"], download_list)
if res[1]:
return download_list
else:
if res[0]:
new_data = res[0].new_ids
return new_data
else:
print(f"WARNING: No new videos were found for the {tag} in the downloaded folder.")
return
def update_posts(file_path, file_type, new_data, tag=None):
try:
status = file_methods.check_existence(file_path, file_type)
if not tag:
file_methods.post_writer(file_path, new_data, status)
else:
log = file_methods.id_writer(file_path, new_data, tag, status)
return log
except:
raise
def update_videos(settings, new_data, tag):
file_path = settings["video_ids"]
file_methods.check_file(file_path, "file")
log = file_methods.id_writer(file_path, new_data, tag, True)
file_methods.clean_video_files(settings, tag, new_data)
return log
def get_total_posts(file_path, tag):
status = file_methods.check_existence(file_path, "file")
if not status:
raise OSError("{file_path} not found!")
else:
data = file_methods.get_data(file_path)
total = len(data[tag])
unique = len(set(data[tag]))
total = Total(total, unique)
return total
def print_total(file_path, tag, data_type):
total = get_total_posts(file_path, tag)
if (total.total == total.unique):
print(f"Total {data_type} for the hashtag {tag} are: {total.total}")
return
else:
print(f"WARNING: out of total {data_type} for the hashtag {tag} {total.total}, only {total.unique} are unique. Something is going wrong...")
return

View File

@@ -0,0 +1,202 @@
import os, json, subprocess
from datetime import datetime
import global_data
import data_methods
def create_file(name, file_type):
if (file_type == "dir"):
os.makedirs(name, mode=0o777)
elif (file_type == "file"):
with open(name, "w"): pass
else:
print(f"ERROR: either {file_type} or is not well defined.")
return
def check_existence(file_path, file_type):
if (file_type == "file"):
if os.path.isfile(file_path):
return True
else:
return False
elif (file_type == "dir"):
if os.path.isdir(file_path):
return True
else:
return False
else:
raise OSError(f"{file_type} has to be a 'dir' or a 'file'!!!")
def check_file(file_path, file_type):
status = check_existence(file_path, file_type)
if not status:
create_file(file_path, file_type)
return
def download_posts(settings, tag):
path = os.path.join(settings["data"], tag, settings["posts"])
os.chdir(path)
try:
tiktok_command = f"tiktok-scraper hashtag {tag} -t 'json'"
result = subprocess.run([tiktok_command], capture_output=True, shell=True)
if result.stdout:
new_file = result.stdout.decode('utf-8').split()[-1]
if ("json" in new_file):
os.chdir("../../../tiktok_downloader")
return new_file
else:
print(f"ERROR: Something's wrong with what is returned by tiktok-scraper for the hashtag {tag} - *{new_file}* is not a json file!!!!")
os.chdir("../../../tiktok_downloader")
return
else:
os.chdir("../../../tiktok_downloader")
print(f"ERROR: No file was downloaded by the tiktok-scraper for the {tag} !!!!")
return
except: raise
def download_videos(settings, tag):
path = os.path.join(settings["data"], tag, settings["videos"])
os.chdir(path)
try:
# tiktok_command = f"tiktok-scraper hashtag {tag} -n {settings['number_of_videos']} -d"
tiktok_command = f"tiktok-scraper hashtag {tag} -d"
result = subprocess.run([tiktok_command], capture_output=True, shell=True)
if result.stdout:
downloaded_list_tmp = os.listdir(f"./#{tag}")
if downloaded_list_tmp:
downloaded_list = []
for file in downloaded_list_tmp:
file = file[0:-4]
downloaded_list.append(file)
os.chdir("../../../tiktok_downloader")
return downloaded_list
else:
print(f"WARNING: No video files were downloaded for the hashtag {tag}.")
os.chdir("../../../tiktok_downloader")
subprocess.call(f"rm -rf {settings['videos_delete']}", shell=True)
else:
os.chdir("../../../tiktok_downloader")
print(f"WARNING: Something went wrong with the tiktok-scraper video download for the {tag} !!!!")
return
except: raise
def get_data(file_path):
with open(file_path, "r") as f:
data = json.load(f)
return data
def dump_data(file_path, data):
with open(file_path, "w") as f:
json.dump(data, f)
return
def log_writer(log_data):
total = 0
try:
log_dict = {}
for ele in log_data:
if ele[0] in log_dict:
if ele[1][0] in log_dict[ele[0]]:
log_dict[ele[0]][ele[1][0]] += ele[1][1]
else:
log_dict[ele[0]][ele[1][0]] = ele[1][1]
total += ele[1][1]
else:
log_dict[ele[0]] = { ele[1][0] : ele[1][1] }
total += ele[1][1]
logger = global_data.FILES["logger"]
now = datetime.now()
now_str = now.strftime("%d-%m-%Y %H:%M:%S")
status = check_existence(logger, "file")
if status:
data = get_data(logger)
data[now_str] = log_dict
dump_data(logger, data)
else:
data = { now_str : log_dict }
dump_data(logger, data)
print(f"Successfully logged {total} entries!!!!")
return
except: raise
def id_writer(file_path, new_data, tag, status):
try:
total = len(new_data)
if status:
try:
data = get_data(file_path)
if tag in data:
data[tag] += new_data
else:
data[tag]= new_data
dump_data(file_path, data)
except json.decoder.JSONDecodeError:
data = { tag : new_data }
dump_data(file_path, data)
else:
data = { tag : new_data }
dump_data(file_path, data)
print(f"SUCCESS - {total} entries added to {file_path}!!!")
log_data = (tag, total)
return log_data
except: raise
def post_writer(file_path, new_data, status):
try:
total = len(new_data)
if status:
try:
data = get_data(file_path)
data += new_data
dump_data(file_path, data)
except json.decoder.JSONDecodeError:
data = new_data
dump_data(file_path, data)
else:
data = new_data
dump_data(file_path, data)
print(f"SUCCESS - {total} entries added to {file_path}!!!")
return
except: raise
def delete_file(file_path, file_type):
if not check_existence(file_path, file_type):
print(f"ERROR: Attempt to delete failed. {file_path} does not exist!!!")
elif (file_type == "file"):
os.remove(file_path)
print(f"Successfully deleted {file_path}!!!")
return
elif (file_type == "dir"):
os.rmdir(file_path)
print(f"Successfully deleted {file_path}!!!")
return
else:
print(f"ERROR: {file_type} needs to be either 'file' or 'dir' !!!")
return
def clean_video_files(settings, tag, new_data=None):
try:
if new_data:
for file in new_data:
settings["videos_from"] = settings['data'] + f"/{tag}/videos/#{tag}/{file}.mp4"
subprocess.call(f"mv {settings['videos_from']} {settings['videos_to']}", shell=True)
subprocess.call(f"rm -rf {settings['videos_delete']}", shell=True)
print(f"Successfully deleted the folder {settings['videos_delete']} folder of videos.")
except:
raise

View File

@@ -0,0 +1,39 @@
# Directories
DATA = "../data"
IDS = "ids"
LOG = "log"
POSTS = "posts"
VIDEOS = "videos"
IMAGES = f"{DATA}/img"
# Files
POST_IDS = "post_ids.json"
VIDEO_IDS = "video_ids.json"
DATA_FILE = "data.json"
LOG_FILE = "log.json"
FILES = {
"data" : DATA,
"ids" : IDS,
"log" : LOG,
"posts" : POSTS,
"videos" : VIDEOS,
"images" : IMAGES,
"post_ids" : f"{DATA}/{IDS}/{POST_IDS}",
"video_ids" : f"{DATA}/{IDS}/{VIDEO_IDS}",
"data_file" : f"{DATA_FILE}",
"downloads" : [],
"logger" : f"{DATA}/{LOG}/{LOG_FILE}",
}
# Commands
tag = ""
PARAMETERS = {
"scraper_attempts" : 3,
# "number_of_videos" : 3, # Number of videos to be downloaded by tiktok-scraper.
"sleep" : 8
}

View File

@@ -0,0 +1,7 @@
hashtag_list = [
# This is a sample hashtag list. Please enter your hashtag list (without the comment).
"london",
"paris",
"newyork",
"tokyo"
]

View File

@@ -0,0 +1,213 @@
import os, sys
import time
import json
import argparse, importlib
import global_data
import file_methods
import data_methods
command = "python3 post_downloader.py "
def get_hashtag_list(file_name):
try:
f = importlib.import_module(file_name) # exec(f"from {file_name} import hashtag_list")
print(f.hashtag_list)
return f.hashtag_list
except ImportError as error:
print("ImportError: " + str(error))
print(f"Please provide at least one hashtag either by entering as an argument or by adding hashtags to the variable hashtag_list in the file {file_name}")
sys.exit()
def create_parser():
# Creating the parser
parser = argparse.ArgumentParser(description="Download the tiktoks for the requested hashtags")
# Adding the arguments
#parser.add_argument("--h", type=str, nargs="*", required=True, help="List of hashtags")
parser.add_argument("--h", type=str, nargs="*", help="List of hashtags")
parser.add_argument("-f", type=str, help="File name with the list of hashtags")
parser.add_argument("-p", action="store_true", help="Download posts")
parser.add_argument("-v", action="store_true", help="Download videos")
return parser
def set_download_settings(download_data_type):
settings = {}
settings["data"] = global_data.FILES["data"]
settings["ids"] = global_data.FILES["ids"]
settings["log"] = global_data.FILES["log"]
settings["logger"] = global_data.FILES["logger"]
settings["sleep"] = global_data.PARAMETERS["sleep"]
settings["scraper"] = global_data.PARAMETERS["scraper_attempts"]
file_methods.check_file(f"{settings['data']}/{settings['ids']}", "dir")
file_methods.check_file(f"{settings['data']}/{settings['log']}", "dir")
if download_data_type == "posts":
settings["posts"] = global_data.FILES["posts"]
settings["post_ids"] = global_data.FILES["post_ids"]
settings["data_file"] = global_data.FILES["data_file"]
return settings
elif download_data_type == "videos":
settings["videos"] = global_data.FILES["videos"]
settings["video_ids"] = global_data.FILES["video_ids"]
return settings
elif download_data_type == "posts-videos":
settings["posts"] = global_data.FILES["posts"]
settings["post_ids"] = global_data.FILES["post_ids"]
settings["data_file"] = global_data.FILES["data_file"]
settings["videos"] = global_data.FILES["videos"]
settings["video_ids"] = global_data.FILES["video_ids"]
return settings
else:
print(f"ERROR: The download_data_type must be either posts, videos or posts-videos.")
sys.exit()
def get_posts(settings, tag):
file_path = file_methods.download_posts(settings, tag)
log = ()
if file_path:
new_data = data_methods.extract_posts(settings, file_path, tag)
if new_data:
data_file = os.path.join(settings["data"], tag, settings["posts"], settings["data_file"])
data_methods.update_posts(data_file, "file", new_data[1])
log = data_methods.update_posts(settings["post_ids"], "file", new_data[0], tag)
file_methods.delete_file(file_path, "file")
return log
def get_videos(settings, tag):
log = ()
download_list = file_methods.download_videos(settings, tag)
if download_list:
new_data = data_methods.extract_videos(settings, tag, download_list)
if new_data:
log = data_methods.update_videos(settings, new_data, tag)
else:
file_methods.clean_video_files(settings, tag)
return log
def get_data(hashtags, download_data_type):
counter = 0
total_hashtags = len(hashtags)
total_hashtags_offset = total_hashtags - 1
log_data = []
if download_data_type == "posts":
settings = set_download_settings(download_data_type)
while counter < total_hashtags:
tag = hashtags[counter]
file_methods.check_file(os.path.join(settings["data"], tag, settings["posts"]), "dir")
file_methods.check_file(os.path.join(settings["data"], tag, settings["posts"], settings["data_file"]), "file")
res = get_posts(settings, tag)
if res:
log = ( res[0], ( "posts", res[1] ) )
log_data.append(log)
data_methods.print_total(settings["post_ids"], tag, download_data_type)
counter += 1
if counter < total_hashtags_offset:
time.sleep(settings["sleep"])
elif download_data_type == "videos":
settings = set_download_settings(download_data_type)
while counter < total_hashtags:
tag = hashtags[counter]
file_methods.check_file(os.path.join(settings["data"], tag, settings["videos"]), "dir")
settings["videos_delete"] = settings['data'] + f"/{tag}/videos/#{tag}"
settings["videos_to"] = settings['data'] + f"/{tag}/videos"
res = get_videos(settings, tag)
if res:
res = ( res[0], ( "videos", res[1]))
log_data.append(res)
data_methods.print_total(settings["video_ids"], tag, download_data_type)
counter += 1
if counter < total_hashtags_offset:
time.sleep(settings["sleep"])
elif download_data_type == "posts-videos":
settings = set_download_settings(download_data_type)
while counter < total_hashtags:
tag = hashtags[counter]
file_methods.check_file(os.path.join(settings["data"], tag, settings["posts"]), "dir")
file_methods.check_file(os.path.join(settings["data"], tag, settings["posts"], settings["data_file"]), "file")
file_methods.check_file(os.path.join(settings["data"], tag, settings["videos"]), "dir")
settings["videos_delete"] = settings['data'] + f"/{tag}/videos/#{tag}"
settings["videos_to"] = settings['data'] + f"/{tag}/videos"
requests = [("posts", "post_ids", get_posts), ("videos", "video_ids", get_videos)]
total_reqs_offset = len(requests) - 1
req_counter = 0
for req in requests:
res = req[2](settings, tag)
if res:
res = ( res[0], (req[0], res[1]) )
log_data.append(res)
data_methods.print_total(settings[req[1]], tag, req[0])
if req_counter < total_reqs_offset:
time.sleep(settings["sleep"])
req_counter += 1
counter += 1
if counter < total_hashtags_offset:
time.sleep(settings["sleep"])
else:
print(f"ERROR: The download_data_type must be either posts, videos or posts-videos.")
sys.exit()
return log_data
def get_hashtags(file_name, hashtag_list):
try:
from hashtag_list import hashtag_list
return hashtag_list
except:
print(f"ERROR: something went wrong while reading the file {file_name}!")
raise
if __name__ == "__main__":
parser = create_parser()
args = parser.parse_args()
if not (args.h or args.f):
parser.error("No hashtags were given, please use either --h option or -f to provide hashtags.")
sys.exit()
if not (args.p or args.v):
parser.error("No argument given, please specify either -p for posts or -v videos or both.")
sys.exit()
if args.h:
hashtags = args.h
elif args.f:
file_name = args.f
hashtags = get_hashtag_list(file_name)
print(hashtags)
if not hashtags:
print("No hashtags were given, please use either --h option or -f to provide hashtags.")
sys.exit(0)
if (args.p and args.v):
download_data_type = "posts-videos"
elif args.p:
download_data_type = "posts"
else:
download_data_type = "videos"
try:
log_data = get_data(hashtags, download_data_type)
if log_data:
file_methods.log_writer(log_data)
except:
raise