From bc10b3020ecf8e682cd2f32f76269c84aad1aa02 Mon Sep 17 00:00:00 2001 From: Richard Mwewa <74001397+rly0nheart@users.noreply.github.com> Date: Sat, 5 Aug 2023 23:47:01 +0200 Subject: [PATCH] Update and rename reddit_post_scraping_tool.py to __rpst_.py Refactored and added doc strings to code --- .../reddit_post_scraping_tool.py | 91 ---------- rpst/__rpst_.py | 165 ++++++++++++++++++ 2 files changed, 165 insertions(+), 91 deletions(-) delete mode 100644 reddit_post_scraping_tool/reddit_post_scraping_tool.py create mode 100644 rpst/__rpst_.py diff --git a/reddit_post_scraping_tool/reddit_post_scraping_tool.py b/reddit_post_scraping_tool/reddit_post_scraping_tool.py deleted file mode 100644 index 9b656e9..0000000 --- a/reddit_post_scraping_tool/reddit_post_scraping_tool.py +++ /dev/null @@ -1,91 +0,0 @@ -import logging -import argparse -import requests -from rich.tree import Tree -from datetime import datetime -from rich import print as xprint -from rich.markdown import Markdown -from rich.logging import RichHandler - - -start_time = datetime.now() -logging.basicConfig(level="NOTSET", format="%(message)s", handlers=[RichHandler(markup=True, log_time_format='[%H:%M:%S%p]')]) -log = logging.getLogger("rich") - - -# Check if the remote tag_name from the latest release matches the one in the program -# if it does, it means the program is up-to-date. -# If it doesn't match, notify the user about a new release -def check_updates(version_tag): - response = requests.get("https://api.github.com/repos/bellingcat/reddit-post-scraping-tool/releases/latest").json() - if response['tag_name'] == version_tag: - pass - else: - raw_release_notes = response['body'] - markdown_release_notes = Markdown(raw_release_notes) - log.info(f"A new release of reddit-post-scraping-tool is available ({response['tag_name']}). Run 'pip install --upgrade reddit-post-scraping-tool' to get the updates.") - xprint(markdown_release_notes) - - -# Getting posts -def get_posts(post): - post_data = {'Author': post['data']['author'], - 'ID': post['data']['id'], - 'Subreddit': post["data"]["subreddit_name_prefixed"], - 'Visibility': post['data']['subreddit_type'], - # 'Author': post["data"]["author_fullname"], - 'Thumbnail': post["data"]["thumbnail"], - # 'Flair': post["data"]["link_flair_text"], - 'NSFW': post['data']['over_18'], - 'Gilded': post['data']['gilded'], - 'Upvotes': post["data"]["ups"], - 'Upvote ratio': post["data"]["upvote_ratio"], - 'Downvotes': post["data"]["downs"], - 'Awards': post["data"]["total_awards_received"], - 'Top award': post['data']['top_awarded_type'], - 'Is crosspostable?': post['data']['is_crosspostable'], - 'Score': post["data"]["score"], - 'Category': post['data']['category'], - 'Domain': post["data"]["domain"], - 'Created': post['data']['created'], - 'Approved at': post['data']['approved_at_utc'], - 'Approved by': post['data']['approved_by'], } - - post_tree = Tree("\n" + post['data']['title']) - for post_key, post_value in post_data.items(): - post_tree.add(f"{post_key}: {post_value}") - xprint(post_tree) - print(post['data']['selftext'] + "\n") - - -def reddit_post_scraper(): - session = requests.session() - session.headers = {'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.1.1 Safari/605.1.15'} - response = session.get(f'https://reddit.com/r/{args.subreddit}/{args.listing}.json?limit={args.limit}&t={args.timeframe}').json() - found_posts = 0 - for post in response['data']['children']: - if args.keyword.lower() in post['data']['selftext'] or args.keyword.lower() in post['data']['title']: - found_posts += 1 - get_posts(post) - - log.info(f"Keyword ('{args.keyword}') was found in {found_posts}/{len(response['data']['children'])} {args.listing} posts from r/{args.subreddit}.") - - -def create_parser(): - parser = argparse.ArgumentParser( - description=f'reddit-post-scraping-tool — by Richard Mwewa | https://about.me/rly0nheart', - epilog=f'Given a subreddit name and a keyword, this program returns all top (by default) posts that contain the specified word. ') - parser.add_argument('-k', '--keyword', help='kewyword', required=True) - parser.add_argument('-s', '--subreddit', help='subreddit', required=True) - parser.add_argument('-c', '--limit', help='results limit (1-100) (default: %(default)s)', default=10, type=int) - parser.add_argument('-l', '--listing', default='top', const='top', nargs='?', - choices=['controversial', 'hot', 'best', 'new', 'rising'], - help='listings: controversial, hot, best, new, rising (default: %(default)s)') - parser.add_argument('-t', '--timeframe', default='all', const='all', nargs='?', - choices=['hour', 'day', 'week', 'month', 'year'], - help='timeframe: hour, day, week, month, year (default: %(default)s)') - return parser - - -_parser = create_parser() -args = _parser.parse_args() diff --git a/rpst/__rpst_.py b/rpst/__rpst_.py new file mode 100644 index 0000000..5e2bef1 --- /dev/null +++ b/rpst/__rpst_.py @@ -0,0 +1,165 @@ +import logging +import argparse +import requests +from rich.tree import Tree +from rich import print as xprint +from rich.markdown import Markdown +from rich.logging import RichHandler + + +def check_updates(version_tag: str): + """ + Checks if there's a new release of a project on GitHub. If there is, it logs an + information message and prints the release notes. + + :param version_tag: A string representing the current version of the project. + """ + + # Make a GET request to the GitHub API to get the latest release of the project + response = requests.get("https://api.github.com/repos/bellingcat/reddit-post-scraping-tool/releases/latest").json() + + # Check if the latest release's tag matches the current version tag + if response['tag_name'] != version_tag: + + # If not, convert the release notes from Markdown to HTML + raw_release_notes = response['body'] + markdown_release_notes = Markdown(raw_release_notes) + + # Log an info message about the new release + log.info( + f"A new release of RPST is available ({response['tag_name']}). " + f"Run 'pip install --upgrade reddit-post-scraping-tool' to get the updates." + ) + + # Print the release notes + xprint(markdown_release_notes) + + +def get_post_data(post: dict): + """ + Extracts relevant data from a Reddit post and displays it in a tree structure, + followed by the post's selftext. + + :param post: A dictionary containing the data of a Reddit post. + """ + # Define the data to extract from the post + post_data = { + 'Author': post['data']['author'], + 'ID': post['data']['id'], + 'Subreddit': post["data"]["subreddit_name_prefixed"], + 'Visibility': post['data']['subreddit_type'], + 'Thumbnail': post["data"]["thumbnail"], + 'NSFW': post['data']['over_18'], + 'Gilded': post['data']['gilded'], + 'Upvotes': post["data"]["ups"], + 'Upvote ratio': post["data"]["upvote_ratio"], + 'Downvotes': post["data"]["downs"], + 'Awards': post["data"]["total_awards_received"], + 'Top award': post['data']['top_awarded_type'], + 'Is crosspostable?': post['data']['is_crosspostable'], + 'Score': post["data"]["score"], + 'Category': post['data']['category'], + 'Domain': post["data"]["domain"], + 'Created': post['data']['created'], + 'Approved at': post['data']['approved_at_utc'], + 'Approved by': post['data']['approved_by'], + } + + # Create a tree structure with the post's title as the root + post_tree = Tree("\n" + post['data']['title']) + + # Add each piece of extracted data as a branch of the tree + for post_key, post_value in post_data.items(): + post_tree.add(f"{post_key}: {post_value}") + + # Print the tree structure + xprint(post_tree) + + # Print the post's selftext + print(post['data']['selftext'] + "\n") + + +def start_scraper(keyword: str, subreddit: str, listing: str, timeframe: str, limit: int): + """ + Scrapes a given subreddit for posts that contain a specified keyword. + The search is limited by the number of posts and timeframe specified. + + :param keyword: The keyword to search for in the posts. + :param subreddit: The subreddit to scrape. + :param listing: The type of posts to scrape. This could be 'hot', 'new', etc. + :param timeframe: The timeframe from which to scrape posts. This could be 'day', 'week', etc. + :param limit: The maximum number of posts to scrape. + + This function logs the number of posts in which the keyword was found. + """ + + # Start a new session + session = requests.session() + # Set the User-Agent to mimic a Safari browser on a Mac + session.headers = {'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, ' + 'like Gecko) Version/14.1.1 Safari/605.1.15'} + + # Send a GET request to the specified subreddit and listing, + # limiting the response by the specified limit and timeframe + response = session.get(f'https://reddit.com/r/{subreddit}/{listing}.json?limit={limit}&t={timeframe}').json() + + # Initialize a counter for the number of posts found that contain the keyword + found_posts = 0 + + # Loop through each post in the response + for post in response['data']['children']: + # If the keyword is found in the post's selftext or title, increment the counter and process the post + if keyword.lower() in post['data']['selftext'] or keyword.lower() in post['data']['title']: + found_posts += 1 + get_post_data(post=post) + + # Log the number of posts in which the keyword was found + log.info(f"Keyword ('{keyword}') was found in {found_posts}/{len(response['data']['children'])} " + f"{listing} posts from r/{subreddit}.") + + +def create_parser(): + """ + Creates and configures an argument parser for the command line arguments. + + :return: A configured argparse.ArgumentParser object ready to parse the command line arguments. + """ + parser = argparse.ArgumentParser( + description='RPST: Reddit Post Scraping Tool —by Richard Mwewa | https://about.me/rly0nheart', + epilog='Given a subreddit name and a keyword, ' + 'RPST returns all top (by default) posts that contain the specified keyword.' + ) + + parser.add_argument('-k', '--keyword', help='The keyword to search for in the posts.', required=True) + parser.add_argument('-s', '--subreddit', help='The subreddit to scrape.', required=True) + parser.add_argument( + '-c', '--limit', + help='The maximum number of posts to scrape (1-100). (default: %(default)s)', + default=10, + const=10, + type=int, + choices=range(1, 101) # This enforces that the limit must be between 1 and 100 inclusive. + ) + parser.add_argument( + '-l', '--listing', + default='top', + const='top', + nargs='?', + choices=['controversial', 'hot', 'best', 'new', 'rising'], + help='The type of posts to scrape (default: %(default)s)' + ) + parser.add_argument( + '-t', '--timeframe', + default='all', + const='all', + nargs='?', + choices=['hour', 'day', 'week', 'month', 'year', 'all'], + help='The timeframe from which to scrape posts (default: %(default)s)' + ) + + return parser + + +logging.basicConfig(level="NOTSET", format="%(message)s", + handlers=[RichHandler(markup=True, log_time_format='[%H:%M:%S%p]')]) +log = logging.getLogger("rich")