Files
reddit-post-scraping-tool/rpst/__rpst.py
Richard Mwewa ce75d40f76 Update __rpst.py
Changed post ouput format
2023-08-12 03:46:23 +02:00

260 lines
9.5 KiB
Python

import os
import json
import logging
import argparse
from datetime import datetime
import requests
from rich.tree import Tree
from rich import print as xprint
from rich.markdown import Markdown
from rich.logging import RichHandler
def convert_timestamp_to_datetime(timestamp: int) -> str:
"""
Converts a Unix timestamp to a formatted datetime string.
:param timestamp: The Unix timestamp to be converted.
:return: A formatted datetime string in the format "dd MMMM yyyy, hh:mm:ssAM/PM".
"""
utc_from_timestamp = datetime.utcfromtimestamp(timestamp)
datetime_object = utc_from_timestamp.strftime("%d %B %Y, %I:%M:%S%p")
return datetime_object
def write_post_data(post_data: dict, filename: str) -> str:
"""
Writes post data to a specified JSON file.
:param post_data: A dictionary containing post data.
:param filename: The name of the file to which post data will be written.
:returns: A string representation of the file path.
"""
home_directory = os.path.expanduser("~")
file_path = os.path.join(home_directory, f"{filename}.json")
# Write the data to a JSON file
with open(file_path, "a") as file:
file.write(json.dumps(post_data))
file.write("\n") # write a newline to separate posts
return file.name
def check_updates(version_tag: str):
"""
This function checks if there's a new release of a project on GitHub. If there is, it logs an
information message and prints the release notes.
:param version_tag: A string representing the current version of the project.
"""
# Make a GET request to the GitHub API to get the latest release of the project
response = requests.get(
"https://api.github.com/repos/bellingcat/reddit-post-scraping-tool/releases/latest"
).json()
# Check if the latest release's tag matches the current version tag
if response["tag_name"] != version_tag:
# If not, convert the release notes from Markdown to HTML
raw_release_notes = response["body"]
markdown_release_notes = Markdown(raw_release_notes)
# Log an info message about the new release
log.info(
f"A new release of RPST is available ({response['tag_name']}). "
f"Run 'pip install --upgrade reddit-post-scraping-tool' to get the updates."
)
# Print the release notes
xprint(markdown_release_notes)
def create_post_branch(post: dict, keyword: str, output: bool, tree: Tree) -> Tree:
"""
This function extracts relevant data from a Reddit post and adds it in a tree branch structure,
followed by the post's selftext.
:param post: A dictionary containing the data of a Reddit post.
:param keyword: The keyword that is used to find posts, in his case gets uses as the filename.
:param output: If specified, all found posts will be written to a json file.
:param tree: Tree where the post branch will be added.
:returns: The main tree with added post branches.
"""
# Define the data to extract from the post
post_data = {
# 'Author': post['data']['author'],
"ID": post["data"]["id"],
"Subreddit": post["data"]["subreddit_name_prefixed"],
"Visibility": post["data"]["subreddit_type"],
"Thumbnail": post["data"]["thumbnail"],
"NSFW": post["data"]["over_18"],
"Gilded": post["data"]["gilded"],
"Upvotes": post["data"]["ups"],
"Upvote ratio": post["data"]["upvote_ratio"],
"Downvotes": post["data"]["downs"],
"Awards": post["data"]["total_awards_received"],
"Top award": post["data"]["top_awarded_type"],
"Is crosspostable?": post["data"]["is_crosspostable"],
"Score": post["data"]["score"],
"Category": post["data"]["category"],
"Domain": post["data"]["domain"],
"Posted on": convert_timestamp_to_datetime(post["data"]["created"]),
"Approved at": post["data"]["approved_at_utc"],
"Approved by": post["data"]["approved_by"],
}
if output:
output_file = write_post_data(filename=keyword, post_data=post_data)
tree.add(
f":page_facing_up: Post data written/appended to "
f"[italic][link file://{output_file}]{output_file}[/]"
)
# Add the post's branch to the main tree.
post_branch = tree.add(f":scroll: {post['data']['title']}")
# Add each piece of extracted data as a branch of the post_branch
for post_key, post_value in post_data.items():
post_branch.add(f"{post_key}: {post_value}", style="dim")
post_branch.add(post["data"]["selftext"], style="italic")
return tree
def get_posts(arguments: argparse):
"""
Scrapes a given subreddit for posts that contain a specified keyword.
The search is limited by the number of posts and timeframe specified.
:param arguments: Namespace object from argparse.
Expected Object Attributes
--------------------------
- keyword: The keyword to search for in the posts.
- subreddit: The subreddit to scrape.
- listing: The type of posts to scrape. This could be 'hot', 'new', etc.
- timeframe: The timeframe from which to scrape posts. This could be 'day', 'week', etc.
- limit: The maximum number of posts to scrape.
- json: If specified, all found posts will be written to a json file.
"""
keyword = arguments.keyword
subreddit = arguments.subreddit
listing = arguments.listing
timeframe = arguments.timeframe
limit = arguments.limit
json_output = arguments.json
# Create main result tree.
main_tree = Tree(f"[bold]{datetime.now()}[/]", guide_style="bold bright_blue")
# Start a new session
session = requests.session()
# Set the User-Agent to mimic a Safari browser on a Mac
session.headers = {
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, "
"like Gecko) Version/14.1.1 Safari/605.1.15"
}
# Send a GET request to the specified subreddit and listing,
# limiting the response by the specified limit and timeframe
response = session.get(
f"https://reddit.com/r/{subreddit}/{listing}"
f".json?limit={limit}&t={timeframe}"
).json()
# Initialize a counter for the number of posts found that contain the keyword
found_posts = 0
# Loop through each post in the response
for post_index, post in enumerate(response["data"]["children"], start=1):
# If the keyword is found in the post's selftext or title, increment the counter and process the post
if (
keyword.lower() in post["data"]["selftext"]
or keyword.lower() in post["data"]["title"]
):
# Create a branch for found post(s) and show post index and post author as the title
found_tree = main_tree.add(
f":bust_in_silhouette: #{post_index} by [bold]@{post['data']['author']}[/]"
)
found_posts += 1
create_post_branch(
post=post,
keyword=keyword,
output=json_output,
tree=found_tree,
)
# Log the number of posts in which the keyword was found
main_tree.add(
f"Keyword ('{keyword}') was found in {found_posts}/{len(response['data']['children'])} "
f"{listing} posts from r/{subreddit}."
)
xprint(main_tree)
def create_parser():
"""
Creates and configures an argument parser for the command line arguments.
:return: A configured argparse.ArgumentParser object ready to parse the command line arguments.
"""
parser = argparse.ArgumentParser(
description="RPST (Reddit Post Scraping Tool) —by Richard Mwewa | https://about.me/rly0nheart",
epilog="Given a subreddit name and a keyword, "
"RPST returns all top (by default) posts that contain the specified keyword.",
)
parser.add_argument(
"-k", "--keyword", help="The keyword to search for in the posts.", required=True
)
parser.add_argument(
"-s", "--subreddit", help="The subreddit to scrape.", required=True
)
parser.add_argument(
"-c",
"--limit",
help="The maximum number of posts to scrape (1-100). (default: %(default)s)",
default=10,
type=int,
choices=range(
1, 101
), # This enforces that the limit must be between 1 and 100 inclusive.
)
parser.add_argument(
"-l",
"--listing",
default="top",
const="top",
nargs="?",
choices=["controversial", "hot", "best", "new", "rising"],
help="The type of posts to scrape (default: %(default)s)",
)
parser.add_argument(
"-t",
"--timeframe",
default="all",
const="all",
nargs="?",
choices=["hour", "day", "week", "month", "year", "all"],
help="The timeframe from which to scrape posts (default: %(default)s)",
)
parser.add_argument(
"-j",
"--json",
help="Write all found posts to a json file.",
action="store_true",
)
return parser
logging.basicConfig(
level="NOTSET",
format="%(message)s",
handlers=[
RichHandler(markup=True, log_time_format="[%H:%M:%S%p]", show_level=False)
],
)
log = logging.getLogger("rich")