mirror of
https://github.com/bellingcat/tiktok-hashtag-analysis.git
synced 2026-06-11 21:08:31 +03:00
Update file_methods.py
This commit is contained in:
@@ -1,7 +1,8 @@
|
||||
import os, json, subprocess
|
||||
from datetime import datetime
|
||||
import global_data
|
||||
import data_methods
|
||||
import shutil
|
||||
#import data_methods
|
||||
|
||||
|
||||
"""
|
||||
@@ -18,7 +19,7 @@ def create_file(name, file_type):
|
||||
elif (file_type == "file"):
|
||||
with open(name, "w"): pass
|
||||
else:
|
||||
print(f"ERROR: either {file_type} or is not well defined.")
|
||||
raise OSError(f"{file_type} has to be a 'dir' or a 'file'!!!")
|
||||
return
|
||||
|
||||
|
||||
@@ -27,15 +28,9 @@ def check_existence(file_path, file_type):
|
||||
Checks the existence of a file or a directory. If not found, returns a False, else returns a true.
|
||||
"""
|
||||
if (file_type == "file"):
|
||||
if os.path.isfile(file_path):
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
return os.path.isfile(file_path)
|
||||
elif (file_type == "dir"):
|
||||
if os.path.isdir(file_path):
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
return os.path.isdir(file_path)
|
||||
else:
|
||||
raise OSError(f"{file_type} has to be a 'dir' or a 'file'!!!")
|
||||
|
||||
@@ -46,7 +41,7 @@ def check_file(file_path, file_type):
|
||||
"""
|
||||
status = check_existence(file_path, file_type)
|
||||
if not status:
|
||||
create_file(file_path, file_type)
|
||||
create_file(file_path, file_type)
|
||||
|
||||
return
|
||||
|
||||
@@ -60,20 +55,20 @@ def download_posts(settings, tag):
|
||||
path = os.path.join(settings["data"], tag, settings["posts"])
|
||||
os.chdir(path)
|
||||
try:
|
||||
tiktok_command = f"tiktok-scraper hashtag {tag} -t 'json'"
|
||||
tiktok_command = f"tiktok-scraper hashtag {tag} -t 'json'"
|
||||
result = subprocess.run([tiktok_command], capture_output=True, shell=True)
|
||||
if result.stdout:
|
||||
new_file = result.stdout.decode('utf-8').split()[-1]
|
||||
if ("json" in new_file):
|
||||
os.chdir("../../../tiktok_downloader")
|
||||
return new_file
|
||||
return new_file
|
||||
else:
|
||||
print(f"ERROR: Something's wrong with what is returned by tiktok-scraper for the hashtag {tag} - *{new_file}* is not a json file!!!!")
|
||||
print(f"WARNING: Something's wrong with what is returned by tiktok-scraper for the hashtag {tag} - *{new_file}* is not a json file!!!!")
|
||||
os.chdir("../../../tiktok_downloader")
|
||||
return
|
||||
else:
|
||||
os.chdir("../../../tiktok_downloader")
|
||||
print(f"ERROR: No file was downloaded by the tiktok-scraper for the {tag} !!!!")
|
||||
print(f"WARNING: No file was downloaded by the tiktok-scraper for the {tag} !!!!")
|
||||
return
|
||||
except: raise
|
||||
|
||||
@@ -81,35 +76,36 @@ def download_posts(settings, tag):
|
||||
|
||||
def download_videos(settings, tag):
|
||||
"""
|
||||
Runs the tiktok-scraper command to download videos for a given hashtag. Note that all the videos are downloaded that are returned by the tiktok api and as a result, its a time and data consuming process.
|
||||
Runs the tiktok-scraper command to download videos for a given hashtag. Note that all the videos are downloaded that are returned by the tiktok api and as a result, its a time and data consuming process.
|
||||
The list of downloaded video ids is constucted and returned if the downloaded folder contains at least 1 video.
|
||||
os.chdir is used to execute shell commands in the right folders and then reused to come back to the original folder of execution of run_downloader script.
|
||||
"""
|
||||
path = os.path.join(settings["data"], tag, settings["videos"])
|
||||
os.chdir(path)
|
||||
try:
|
||||
# tiktok_command = f"tiktok-scraper hashtag {tag} -n {settings['number_of_videos']} -d"
|
||||
tiktok_command = f"tiktok-scraper hashtag {tag} -d"
|
||||
# tiktok_command = f"tiktok-scraper hashtag {tag} -n {settings['number_of_videos']} -d"
|
||||
tiktok_command = f"tiktok-scraper hashtag {tag} -d"
|
||||
result = subprocess.run([tiktok_command], capture_output=True, shell=True)
|
||||
if result.stdout:
|
||||
downloaded_list_tmp = os.listdir(f"./#{tag}")
|
||||
if downloaded_list_tmp:
|
||||
downloaded_list = []
|
||||
for file in downloaded_list_tmp:
|
||||
file = file[0:-4]
|
||||
file = file.split('.')[0]
|
||||
downloaded_list.append(file)
|
||||
|
||||
|
||||
os.chdir("../../../tiktok_downloader")
|
||||
return downloaded_list
|
||||
else:
|
||||
print(f"WARNING: No video files were downloaded for the hashtag {tag}.")
|
||||
os.chdir("../../../tiktok_downloader")
|
||||
subprocess.call(f"rm -rf {settings['videos_delete']}", shell=True)
|
||||
shutil.rmtree(settings['videos_delete'])
|
||||
#subprocess.call(f"rm -rf {settings['videos_delete']}", shell=True)
|
||||
else:
|
||||
os.chdir("../../../tiktok_downloader")
|
||||
print(f"WARNING: Something went wrong with the tiktok-scraper video download for the {tag} !!!!")
|
||||
return
|
||||
|
||||
|
||||
except: raise
|
||||
|
||||
|
||||
@@ -128,7 +124,7 @@ def dump_data(file_path, data):
|
||||
"""
|
||||
with open(file_path, "w") as f:
|
||||
json.dump(data, f)
|
||||
return
|
||||
return
|
||||
|
||||
def log_writer(log_data):
|
||||
"""
|
||||
@@ -178,7 +174,7 @@ def id_writer(file_path, new_data, tag, status):
|
||||
if tag in data:
|
||||
data[tag] += new_data
|
||||
else:
|
||||
data[tag]= new_data
|
||||
data[tag]= new_data
|
||||
dump_data(file_path, data)
|
||||
except json.decoder.JSONDecodeError:
|
||||
data = { tag : new_data }
|
||||
@@ -219,7 +215,7 @@ def delete_file(file_path, file_type):
|
||||
Deletes the directory or the file.
|
||||
"""
|
||||
if not check_existence(file_path, file_type):
|
||||
print(f"ERROR: Attempt to delete failed. {file_path} does not exist!!!")
|
||||
raise Exception(f"ERROR: Attempt to delete failed. {file_path} does not exist!!!")
|
||||
elif (file_type == "file"):
|
||||
os.remove(file_path)
|
||||
print(f"Successfully deleted {file_path}!!!")
|
||||
@@ -229,8 +225,7 @@ def delete_file(file_path, file_type):
|
||||
print(f"Successfully deleted {file_path}!!!")
|
||||
return
|
||||
else:
|
||||
print(f"ERROR: {file_type} needs to be either 'file' or 'dir' !!!")
|
||||
return
|
||||
raise OSError(f"ERROR: {file_type} needs to be either 'file' or 'dir' !!!")
|
||||
|
||||
|
||||
def clean_video_files(settings, tag, new_data=None):
|
||||
@@ -242,9 +237,11 @@ def clean_video_files(settings, tag, new_data=None):
|
||||
if new_data:
|
||||
for file in new_data:
|
||||
settings["videos_from"] = settings['data'] + f"/{tag}/videos/#{tag}/{file}.mp4"
|
||||
subprocess.call(f"mv {settings['videos_from']} {settings['videos_to']}", shell=True)
|
||||
|
||||
subprocess.call(f"rm -rf {settings['videos_delete']}", shell=True)
|
||||
shutil.move(settings['videos_from'], settings['videos_to'])
|
||||
#subprocess.call(f"mv {settings['videos_from']} {settings['videos_to']}", shell=True)
|
||||
|
||||
shutil.rmtree(settings['videos_delete'])
|
||||
#subprocess.call(f"rm -rf {settings['videos_delete']}", shell=True)
|
||||
print(f"Successfully deleted the folder {settings['videos_delete']} folder of videos.")
|
||||
except:
|
||||
raise
|
||||
|
||||
Reference in New Issue
Block a user