From ce75d40f76103916efa6241d8da13ed01f4b8f75 Mon Sep 17 00:00:00 2001 From: Richard Mwewa <74001397+rly0nheart@users.noreply.github.com> Date: Sat, 12 Aug 2023 03:46:23 +0200 Subject: [PATCH] Update __rpst.py Changed post ouput format --- rpst/__rpst.py | 204 ++++++++++++++++++++++++++++++------------------- 1 file changed, 125 insertions(+), 79 deletions(-) diff --git a/rpst/__rpst.py b/rpst/__rpst.py index 7b07523..c6a0092 100644 --- a/rpst/__rpst.py +++ b/rpst/__rpst.py @@ -1,6 +1,9 @@ +import os import json import logging import argparse +from datetime import datetime + import requests from rich.tree import Tree from rich import print as xprint @@ -18,21 +21,25 @@ def convert_timestamp_to_datetime(timestamp: int) -> str: utc_from_timestamp = datetime.utcfromtimestamp(timestamp) datetime_object = utc_from_timestamp.strftime("%d %B %Y, %I:%M:%S%p") return datetime_object - -def write_post_data(post_data: dict, filename: str): + +def write_post_data(post_data: dict, filename: str) -> str: """ Writes post data to a specified JSON file. :param post_data: A dictionary containing post data. :param filename: The name of the file to which post data will be written. + :returns: A string representation of the file path. """ + home_directory = os.path.expanduser("~") + file_path = os.path.join(home_directory, f"{filename}.json") + # Write the data to a JSON file - with open(filename + ".json", 'a') as file: + with open(file_path, "a") as file: file.write(json.dumps(post_data)) - file.write('\n') # write a newline to separate posts - - log.info(f"Post data written to '{file.name}'") + file.write("\n") # write a newline to separate posts + + return file.name def check_updates(version_tag: str): @@ -44,13 +51,14 @@ def check_updates(version_tag: str): """ # Make a GET request to the GitHub API to get the latest release of the project - response = requests.get("https://api.github.com/repos/bellingcat/reddit-post-scraping-tool/releases/latest").json() + response = requests.get( + "https://api.github.com/repos/bellingcat/reddit-post-scraping-tool/releases/latest" + ).json() # Check if the latest release's tag matches the current version tag - if response['tag_name'] != version_tag: - + if response["tag_name"] != version_tag: # If not, convert the release notes from Markdown to HTML - raw_release_notes = response['body'] + raw_release_notes = response["body"] markdown_release_notes = Markdown(raw_release_notes) # Log an info message about the new release @@ -63,58 +71,61 @@ def check_updates(version_tag: str): xprint(markdown_release_notes) -def format_post_data(post: dict, keyword: str, output: bool): +def create_post_branch(post: dict, keyword: str, output: bool, tree: Tree) -> Tree: """ - This function extracts relevant data from a Reddit post and displays it in a tree structure, + This function extracts relevant data from a Reddit post and adds it in a tree branch structure, followed by the post's selftext. :param post: A dictionary containing the data of a Reddit post. :param keyword: The keyword that is used to find posts, in his case gets uses as the filename. :param output: If specified, all found posts will be written to a json file. + :param tree: Tree where the post branch will be added. + :returns: The main tree with added post branches. """ # Define the data to extract from the post post_data = { - 'Author': post['data']['author'], - 'ID': post['data']['id'], - 'Subreddit': post["data"]["subreddit_name_prefixed"], - 'Visibility': post['data']['subreddit_type'], - 'Thumbnail': post["data"]["thumbnail"], - 'NSFW': post['data']['over_18'], - 'Gilded': post['data']['gilded'], - 'Upvotes': post["data"]["ups"], - 'Upvote ratio': post["data"]["upvote_ratio"], - 'Downvotes': post["data"]["downs"], - 'Awards': post["data"]["total_awards_received"], - 'Top award': post['data']['top_awarded_type'], - 'Is crosspostable?': post['data']['is_crosspostable'], - 'Score': post["data"]["score"], - 'Category': post['data']['category'], - 'Domain': post["data"]["domain"], - 'Created on': convert_timestamp_to_datetime(post['data']['created']), - 'Approved at': post['data']['approved_at_utc'], - 'Approved by': post['data']['approved_by'], + # 'Author': post['data']['author'], + "ID": post["data"]["id"], + "Subreddit": post["data"]["subreddit_name_prefixed"], + "Visibility": post["data"]["subreddit_type"], + "Thumbnail": post["data"]["thumbnail"], + "NSFW": post["data"]["over_18"], + "Gilded": post["data"]["gilded"], + "Upvotes": post["data"]["ups"], + "Upvote ratio": post["data"]["upvote_ratio"], + "Downvotes": post["data"]["downs"], + "Awards": post["data"]["total_awards_received"], + "Top award": post["data"]["top_awarded_type"], + "Is crosspostable?": post["data"]["is_crosspostable"], + "Score": post["data"]["score"], + "Category": post["data"]["category"], + "Domain": post["data"]["domain"], + "Posted on": convert_timestamp_to_datetime(post["data"]["created"]), + "Approved at": post["data"]["approved_at_utc"], + "Approved by": post["data"]["approved_by"], } if output: - write_post_data(filename=keyword, post_data=post_data) - # Create a tree structure with the post's title as the root - post_tree = Tree("\n" + post['data']['title']) + output_file = write_post_data(filename=keyword, post_data=post_data) + tree.add( + f":page_facing_up: Post data written/appended to " + f"[italic][link file://{output_file}]{output_file}[/]" + ) - # Add each piece of extracted data as a branch of the tree + # Add the post's branch to the main tree. + post_branch = tree.add(f":scroll: {post['data']['title']}") + + # Add each piece of extracted data as a branch of the post_branch for post_key, post_value in post_data.items(): - post_tree.add(f"{post_key}: {post_value}") + post_branch.add(f"{post_key}: {post_value}", style="dim") + post_branch.add(post["data"]["selftext"], style="italic") - # Print the tree structure - xprint(post_tree) - - # Print the post's selftext - print(post['data']['selftext'] + "\n") + return tree def get_posts(arguments: argparse): """ Scrapes a given subreddit for posts that contain a specified keyword. - The search is limited by the number of posts and timeframe specified. The results are either - printed to the console or saved to a specified file, based on the 'output' argument. + The search is limited by the number of posts and timeframe specified. :param arguments: Namespace object from argparse. @@ -126,8 +137,6 @@ def get_posts(arguments: argparse): - timeframe: The timeframe from which to scrape posts. This could be 'day', 'week', etc. - limit: The maximum number of posts to scrape. - json: If specified, all found posts will be written to a json file. - - Also logs the number of posts in which the keyword was found. """ keyword = arguments.keyword subreddit = arguments.subreddit @@ -136,30 +145,52 @@ def get_posts(arguments: argparse): limit = arguments.limit json_output = arguments.json + # Create main result tree. + main_tree = Tree(f"[bold]{datetime.now()}[/]", guide_style="bold bright_blue") + # Start a new session session = requests.session() # Set the User-Agent to mimic a Safari browser on a Mac - session.headers = {'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, ' - 'like Gecko) Version/14.1.1 Safari/605.1.15'} + session.headers = { + "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, " + "like Gecko) Version/14.1.1 Safari/605.1.15" + } # Send a GET request to the specified subreddit and listing, # limiting the response by the specified limit and timeframe - response = session.get(f'https://reddit.com/r/{subreddit}/{listing}' - f'.json?limit={limit}&t={timeframe}').json() + response = session.get( + f"https://reddit.com/r/{subreddit}/{listing}" + f".json?limit={limit}&t={timeframe}" + ).json() # Initialize a counter for the number of posts found that contain the keyword found_posts = 0 # Loop through each post in the response - for post in response['data']['children']: + for post_index, post in enumerate(response["data"]["children"], start=1): # If the keyword is found in the post's selftext or title, increment the counter and process the post - if keyword.lower() in post['data']['selftext'] or keyword.lower() in post['data']['title']: + if ( + keyword.lower() in post["data"]["selftext"] + or keyword.lower() in post["data"]["title"] + ): + # Create a branch for found post(s) and show post index and post author as the title + found_tree = main_tree.add( + f":bust_in_silhouette: #{post_index} by [bold]@{post['data']['author']}[/]" + ) found_posts += 1 - format_post_data(post=post, keyword=keyword, output=json_output) + create_post_branch( + post=post, + keyword=keyword, + output=json_output, + tree=found_tree, + ) # Log the number of posts in which the keyword was found - log.info(f"Keyword ('{keyword}') was found in {found_posts}/{len(response['data']['children'])} " - f"{listing} posts from r/{subreddit}.") + main_tree.add( + f"Keyword ('{keyword}') was found in {found_posts}/{len(response['data']['children'])} " + f"{listing} posts from r/{subreddit}." + ) + xprint(main_tree) def create_parser(): @@ -169,45 +200,60 @@ def create_parser(): :return: A configured argparse.ArgumentParser object ready to parse the command line arguments. """ parser = argparse.ArgumentParser( - description='RPST: Reddit Post Scraping Tool —by Richard Mwewa | https://about.me/rly0nheart', - epilog='Given a subreddit name and a keyword, ' - 'RPST returns all top (by default) posts that contain the specified keyword.' + description="RPST (Reddit Post Scraping Tool) —by Richard Mwewa | https://about.me/rly0nheart", + epilog="Given a subreddit name and a keyword, " + "RPST returns all top (by default) posts that contain the specified keyword.", ) - parser.add_argument('-k', '--keyword', help='The keyword to search for in the posts.', required=True) - parser.add_argument('-s', '--subreddit', help='The subreddit to scrape.', required=True) parser.add_argument( - '-c', '--limit', - help='The maximum number of posts to scrape (1-100). (default: %(default)s)', + "-k", "--keyword", help="The keyword to search for in the posts.", required=True + ) + parser.add_argument( + "-s", "--subreddit", help="The subreddit to scrape.", required=True + ) + parser.add_argument( + "-c", + "--limit", + help="The maximum number of posts to scrape (1-100). (default: %(default)s)", default=10, type=int, - choices=range(1, 101) # This enforces that the limit must be between 1 and 100 inclusive. + choices=range( + 1, 101 + ), # This enforces that the limit must be between 1 and 100 inclusive. ) parser.add_argument( - '-l', '--listing', - default='top', - const='top', - nargs='?', - choices=['controversial', 'hot', 'best', 'new', 'rising'], - help='The type of posts to scrape (default: %(default)s)' + "-l", + "--listing", + default="top", + const="top", + nargs="?", + choices=["controversial", "hot", "best", "new", "rising"], + help="The type of posts to scrape (default: %(default)s)", ) parser.add_argument( - '-t', '--timeframe', - default='all', - const='all', - nargs='?', - choices=['hour', 'day', 'week', 'month', 'year', 'all'], - help='The timeframe from which to scrape posts (default: %(default)s)' + "-t", + "--timeframe", + default="all", + const="all", + nargs="?", + choices=["hour", "day", "week", "month", "year", "all"], + help="The timeframe from which to scrape posts (default: %(default)s)", ) parser.add_argument( - '-j', '--json', - help='Write all found posts to a json file.', - action='store_true' + "-j", + "--json", + help="Write all found posts to a json file.", + action="store_true", ) return parser -logging.basicConfig(level="NOTSET", format="%(message)s", - handlers=[RichHandler(markup=True, log_time_format='[%H:%M:%S%p]')]) +logging.basicConfig( + level="NOTSET", + format="%(message)s", + handlers=[ + RichHandler(markup=True, log_time_format="[%H:%M:%S%p]", show_level=False) + ], +) log = logging.getLogger("rich")