initial commit

This commit is contained in:
Tristan Lee
2023-05-26 05:15:11 -05:00
commit a14b3a8c7a
4 changed files with 402 additions and 0 deletions

146
get_deleted_o9a_articles.py Normal file
View File

@@ -0,0 +1,146 @@
"""Identify all articles from o9a.org that have been captured on Wayback Machine but
have since been deleted from the o9a.org website, and search for articles containing
the term "tempel".
"""
import re
import time
import requests
import pandas as pd
from bs4 import BeautifulSoup
# URL of Wayback Machine search API
BASE_URL = "http://web.archive.org/cdx/search/cdx"
# If archived URL contains these strings, ignore it
IGNORE_URL_STRINGS = ["wp-json", "wp-content", "/uploads/", "/page/"]
# Search articles for this string
SEARCH_TERM = "tempel"
# Save table of information for deleted articles to this file
OUTPUT_CSV = "o9a_deleted_articles.csv"
def get_wayback_url(row):
"""Convert timestamp and original URL into Wayback Machine URL"""
return f'https://web.archive.org/web/{row["timestamp"]}/{row["original"]}'
def process_archived_url(url):
"""Convert original archived URL to standard form"""
if "/20" not in url:
return None
_url = url.split("?")[0]
_url = _url.replace(".org:80/", ".org/").replace("http://", "https://").strip("/")
if any(s in _url for s in IGNORE_URL_STRINGS):
return None
if re.match("^https://www.o9a.org/\d{4}/\d{2}$", _url):
return None
if _url.endswith("/embed"):
return None
return _url
def _get(url):
"""Wrapper for retrying request multiple times."""
n_retries = 0
while n_retries < 5:
time.sleep(2**n_retries - 1)
try:
response = requests.get(url=url, timeout=15)
if response.status_code == 200:
return response
else:
n_retries += 1
except Exception:
n_retries += 1
raise ValueError(
f"Maximum number of retries reached for GET request with url {url}"
)
def process_article(soup):
"""Extract relevant information from HTML of o9a.org article"""
content_soup = soup.select_one("div#content")
content = content_soup.text.strip()
tags_split = content.split("| Tags: ")
if len(tags_split) > 1:
tags = tags_split[1].split(" | ")[0].split(", ")
else:
tags = []
data = {
"wayback_url": soup.find("link", {"rel": "canonical"})["href"],
"title": content_soup.find("h1").text,
"author": content.split("| Author: ")[1].split(" | ")[0],
"date": content.split("Posted: ")[1].split(" | ")[0],
"content": content.lower(),
"links": [a["href"] for a in content_soup.find_all("a", href=True)],
"tags": tags,
}
return data
if __name__ == "__main__":
# Get all archived pages from the o9a.org website, store in DataFrame
capture_list = []
page = 0
out_of_pages = False
while not out_of_pages:
params = {
"page": page,
"url": f"o9a.org/*",
"output": "json",
}
r = requests.get(url=BASE_URL, params=params)
if r.text == "":
out_of_pages = True
break
result = r.json()
capture_list.append(pd.DataFrame(data=result[1:], columns=result[0]))
page += 1
captures = pd.concat(capture_list)
captures["datetime"] = pd.to_datetime(captures["timestamp"])
captures["url"] = captures.apply(get_wayback_url, axis="columns")
captures = (
captures[captures["statuscode"] == "200"]
.sort_values("timestamp")
.reset_index(drop=True)
)
captures["processed_url"] = captures["original"].apply(process_archived_url)
captures = captures.drop_duplicates(subset="processed_url", keep="first")
# Get all URLs from the o9a.org sitemap, compare with archived URLs to find deleted articles
r = requests.get("https://www.o9a.org/wp-sitemap-posts-post-1.xml")
soup = BeautifulSoup(r.content, features="lxml")
article_urls_sitemap = set([loc.text.strip("/") for loc in soup.select("url loc")])
article_urls_wayback = set(captures["processed_url"].dropna())
deleted_urls = article_urls_wayback - article_urls_sitemap
urls_to_download = captures[captures["processed_url"].isin(deleted_urls)]["url"]
# Download all deleted pages, process into DataFrame and save to CSV
article_data = []
for url in urls_to_download:
r = _get(url)
soup = BeautifulSoup(r.content, features="lxml")
article_data.append(process_article(soup))
articles = pd.DataFrame(article_data)
articles["date"] = pd.to_datetime(articles["date"])
articles = articles.sort_values("date").reset_index(drop=True)
articles.to_csv(path_or_buf=OUTPUT_CSV, index=False, quoting=2)
# Search for specified search term in deleted article contents
relevant_articles = articles[
articles["content"].str.contains(SEARCH_TERM, na=False)
][["date", "wayback_url"]]
for date, url in relevant_articles.values:
print(date, url)

View File

@@ -0,0 +1,68 @@
"""Estimate the total revenue of a given Ebay seller, and identify their most
frequently reviewed products"""
import requests
from urllib.parse import urlencode, quote
from collections import Counter
from bs4 import BeautifulSoup
import pandas as pd
# URL of Ebay's customer feedback API
BASE_URL = "https://feedback.ebay.com/fdbk/update_feedback_profile"
# Username of seller
USERNAME = "commandantcultus"
# Nested dict of parameters in API query
PARAMS = {
"url": {
"username": USERNAME,
"filter": "feedback_page:RECEIVED_AS_SELLER",
"limit": "200",
},
"module": {"modules": "FEEDBACK_SUMMARY"},
}
def process_review(review):
"""Extract relevant fields from raw JSON response for one review"""
item = review["feedbackInfo"]["item"]
item_text = item["itemSummary"]["textSpans"][0]["text"]
name, item_id = item_text.split(" (#")
return {
"name": name,
"id": int(item_id.strip(")")),
"price": float(item["itemPrice"]["textSpans"][0]["text"].replace("US $", "")),
}
if __name__ == "__main__":
# Fetch data from Ebay API, convert into DataFrame
params_str = "&".join(f"{k}={quote(urlencode(v))}" for k, v in PARAMS.items())
r = requests.get(url=BASE_URL, params=params_str)
review_dicts = r.json()["modules"]["FEEDBACK_SUMMARY"]["feedbackView"][
"feedbackCards"
]
reviews = pd.DataFrame(
[process_review(review_dict) for review_dict in review_dicts]
)
# Fetch total number of sales (should be 581 as of May 2023)
r = requests.get(f"https://www.ebay.com/usr/{USERNAME}")
soup = BeautifulSoup(r.content, features="lxml")
total_reviews = int(
soup.select("div.str-seller-card__stats-content > div[title]")[1][
"title"
].split(" ")[0]
)
# Estimate seller's total revenue
estimated_revenue = reviews["price"].mean() * total_reviews
print(f"Estimated revenue of seller: ${estimated_revenue:.2f}")
# Identify 5 most frequently reviewed items
print("Most reviewed items:")
print(Counter(reviews["name"]).most_common(5))

60
get_in_stock_stores.py Normal file
View File

@@ -0,0 +1,60 @@
"""Find all Barnes & Noble stores with a specified product in-stock
"""
from bs4 import BeautifulSoup
import requests
# Base URL for Barnes & Noble product availability API
BASE_URL = "https://www.barnesandnoble.com/xhr/storeList-with-prodAvailability.jsp"
# Stock-keeping unit number for specific book
SKU_ID = 9780692306581
# Zip code to center search on
ZIP_CODE = 75254
# Radius of search: Barnes & Noble's browser interface only allows maximum of 100 miles
SEARCH_RADIUS = 1000
# Random user-agent
HEADERS = {
"User-Agent": "Mozilla/5.0 (Linux; Android 8.0.0; 5099D Build/O00623) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/67.0.3396.68 Mobile Safari/537.36"
}
def process_store(store_soup):
"""Extract relevant information from HTML of a given store"""
in_stock_str = store_soup.select_one("div.item-in-stock").text.strip()
if in_stock_str == "In Stock in Store":
in_stock = True
elif in_stock_str == "Not in Stock in Store":
in_stock = False
else:
in_stock = in_stock_str
return {
"store": store_soup.select_one(
"div.store-details-container > div.store-address"
).text.strip(),
"in_stock": in_stock,
}
if __name__ == "__main__":
# Initialize query parameters
params = {
"action": "fromSearch",
"radius": SEARCH_RADIUS,
"searchString": ZIP_CODE,
"skuId": SKU_ID,
}
# Make API query to determine which (if any) stores have the book in-stock
r = requests.get(url=BASE_URL, params=params, headers=HEADERS)
soup = BeautifulSoup(r.text, features="lxml")
stores = soup.select("div.store-list")
results = [process_store(store) for store in stores]
# Print addresses of stores with book in-stock
[print(result["store"]) for result in results if result["in_stock"]]

128
get_ingram_o9a_books.py Normal file
View File

@@ -0,0 +1,128 @@
"""Get information about a list of in-stock O9A books from Ingram's catalog.
This provides the same information as is available using Ingram's Stock Check app
https://www.ingramcontent.com/retailers/independent-bookstores/stock-check-app"""
import requests
import pandas as pd
# Base URL of Ingram API
BASE_URL = "https://ipage.ingramcontent.com/ipage/ws/1/mobile"
# List of EAN (European Article Number) codes for books associated with O9A
EANS = [
"9780692306581",
"9780997836363",
"9780692575505",
"9781494440954",
"9780692260845",
"9780692548127",
"9780692723920",
"9780999768006",
"9781696821742",
"9781687255624",
"9781689931953",
"9780997836370",
"9780999768044",
"9780999768020",
"9780997836387",
"9780997836356",
"9780997836349",
"9780997836325",
"9780997836301",
"9780997836318",
"9780692667293",
"9780692510711",
"9780692484463",
"9780692432082",
]
# Columns in API response to store
RELEVANT_COLUMNS = [
"primaryContributorName",
"isbn",
"title",
"primaryProductType",
"displayableFormat",
"sortableTitle",
"ean",
"primaryBisacCategory",
"publisher",
"retailPrice",
"totalOnHand",
]
# Write information about each book to this file
OUTPUT_CSV = "o9a_books.csv"
class IngramClient:
"""Class to search Ingram's free (mobile) API."""
def __init__(self):
self.token = self.get_token()
def get_token(self):
"""Initialize access token, which is necessary for all API queries"""
params = {
"email_address": "email@address.com", # email address doesn't seem to matter
"terms_accepted": True,
}
r = requests.post(url=BASE_URL + "/register", json=params)
return r.json()["token"]
def search(self, keywords):
"""Search Ingram's book catalog for a given keyword or EAN"""
all_results = []
page_number = 1
while True:
params = {
"keywords": keywords,
"token": self.token,
"page_number": page_number,
}
r = requests.get(url=BASE_URL + "/search", params=params)
if not (results := r.json().get("results")):
break
enhanced_results = []
for result in results:
additional_info = self.get_stock(ean=result["ean"])
result.update(additional_info)
enhanced_results.append(result)
all_results.extend(enhanced_results)
page_number += 1
return all_results
def get_stock(self, ean):
"""Query how many copies of a given Ingram product are in stock"""
params = {"product_code": ean, "token": self.token}
r = requests.get(url=BASE_URL + "/stockcheck", params=params)
return r.json()
def process_book(book):
"""Extract relevant fields from Ingram API response, aggregate number of books in-stock"""
processed_book = {k: v for k, v in book.items() if k in RELEVANT_COLUMNS}
processed_book["contributors"] = ", ".join(
c["displayName"] for c in book["contributors"]
)
processed_book["totalOnOrder"] = sum(c["count"] for c in book["onOrder"].values())
return processed_book
if __name__ == "__main__":
# Initialize client, fetch information about all specified books and store in DataFrame
client = IngramClient()
valid_books = []
for ean in EANS:
valid_books.extend(client.search(keywords=ean))
df = pd.DataFrame([process_book(book) for book in valid_books])
# Write DataFrame to CSV file
df.to_csv(path_or_buf=OUTPUT_CSV, index=False, quoting=2)