diff --git a/config.py b/config.py deleted file mode 100644 index 8fc6bff2..00000000 --- a/config.py +++ /dev/null @@ -1,2 +0,0 @@ -EMAIL = "your-email@domain.com" -PASSWORD = "your-password" diff --git a/data/README.md b/data/README.md deleted file mode 100644 index 27476ca6..00000000 --- a/data/README.md +++ /dev/null @@ -1,3 +0,0 @@ -This directory will be used to store `.json` files for each writer -containing metadata that is used to populate a `.html` file for that -author. \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index c58926a7..af704d1a 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,7 +1,6 @@ bs4==0.0.1 html2text==2020.1.16 requests==2.31.0 -selenium==4.16.0 +selenium-driverless tqdm==4.66.1 -webdriver_manager==4.0.1 Markdown==3.6 diff --git a/src/substack2markdown/__init__.py b/src/substack2markdown/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/author_template.html b/src/substack2markdown/assets/author_template.html similarity index 100% rename from author_template.html rename to src/substack2markdown/assets/author_template.html diff --git a/assets/css/essay-styles.css b/src/substack2markdown/assets/css/essay-styles.css similarity index 100% rename from assets/css/essay-styles.css rename to src/substack2markdown/assets/css/essay-styles.css diff --git a/assets/css/style.css b/src/substack2markdown/assets/css/style.css similarity index 100% rename from assets/css/style.css rename to src/substack2markdown/assets/css/style.css diff --git a/assets/images/screenshot.png b/src/substack2markdown/assets/images/screenshot.png similarity index 100% rename from assets/images/screenshot.png rename to src/substack2markdown/assets/images/screenshot.png diff --git a/assets/js/populate-essays.js b/src/substack2markdown/assets/js/populate-essays.js similarity index 100% rename from assets/js/populate-essays.js rename to src/substack2markdown/assets/js/populate-essays.js diff --git a/src/substack2markdown/substack_scraper.py b/src/substack2markdown/substack_scraper.py new file mode 100644 index 00000000..d31ca8dd --- /dev/null +++ b/src/substack2markdown/substack_scraper.py @@ -0,0 +1,998 @@ +import argparse +import json +import os +import io +import re +import base64 +import hashlib +import mimetypes +from pathlib import Path +from urllib.parse import urlparse, unquote +from abc import ABC, abstractmethod +from typing import List, Optional, Tuple +from time import sleep +import asyncio +import atexit +import signal + +import html2text +import markdown +import requests +from bs4 import BeautifulSoup +from datetime import datetime +from tqdm import tqdm +from xml.etree import ElementTree as ET + +from selenium_driverless import webdriver +from selenium_driverless.types.by import By + +USE_PREMIUM: bool = True # Set to True if you want to login to Substack and convert paid for posts +BASE_SUBSTACK_URL: str = "https://www.thefitzwilliam.com/" # Substack you want to convert to markdown +BASE_MD_DIR: str = "substack_md_files" # Name of the directory we'll save the .md essay files +BASE_HTML_DIR: str = "substack_html_pages" # Name of the directory we'll save the .html essay files +BASE_IMAGE_DIR: str = "substack_images" +BASE_JSON_DIR: str = "substack_json" +ASSETS_DIR: str = os.path.dirname(__file__) + "/assets" +HTML_TEMPLATE: str = "author_template.html" # HTML template to use for the author page +JSON_DATA_DIR: str = "data" +NUM_POSTS_TO_SCRAPE: int = 3 # Set to 0 if you want all posts + + +def count_images_in_markdown(md_content: str) -> int: + """Count number of Substack CDN image URLs in markdown content.""" + # [](https://substackcdn.com/image/fetch/x.png) + # regex lookahead: match "...)" but not "...)]" suffix + pattern = re.compile(r'\(https://substackcdn\.com/image/fetch/[^\s\)]+\)(?=[^\]]|$)') + matches = re.findall(pattern, md_content) + return len(matches) + + +def sanitize_image_filename(url: str) -> str: + """Create a safe filename from URL or content.""" + # Extract original filename from CDN URL + if "substackcdn.com" in url: + # Get the actual image URL after the CDN parameters + original_url = unquote(url.split("/https%3A%2F%2F")[1]) + filename = original_url.split("/")[-1] + else: + filename = url.split("/")[-1] + + # Remove invalid characters + filename = re.sub(r'[<>:"/\\|?*]', '', filename) + + # If filename is too long or empty, create hash-based name + if len(filename) > 100 or not filename: + hash_object = hashlib.md5(url.encode()) + ext = mimetypes.guess_extension(requests.head(url).headers.get('content-type', '')) or '.jpg' + filename = f"{hash_object.hexdigest()}{ext}" + + return filename + + +def get_post_slug(url: str) -> str: + match = re.search(r'/p/([^/]+)', url) + return match.group(1) if match else 'unknown_post' + + +def extract_main_part(url: str) -> str: + parts = urlparse(url).netloc.split('.') # Parse the URL to get the netloc, and split on '.' + return parts[1] if parts[0] == 'www' else parts[0] # Return the main part of the domain, while ignoring 'www' if + # present + + +def generate_html_file(args, author_name: str) -> None: + """ + Generates a HTML file for the given author. + """ + if not os.path.exists(args.html_directory): + os.makedirs(args.html_directory) + + # Read JSON data + json_path = os.path.join(JSON_DATA_DIR, f'{author_name}.json') + with open(json_path, 'r', encoding='utf-8') as file: + essays_data = json.load(file) + + # Convert JSON data to a JSON string for embedding + embedded_json_data = json.dumps(essays_data, ensure_ascii=False, indent=4) + + with open(args.author_template, 'r', encoding='utf-8') as file: + html_template = file.read() + + # Insert the JSON string into the script tag in the HTML template + html_with_data = html_template.replace('', author_name).replace( + '', + f'' + ) + html_with_author = html_with_data.replace('author_name', author_name) + + # Write the modified HTML to a new file + html_output_path = os.path.join(args.html_directory, f'{author_name}.html') + with open(html_output_path, 'w', encoding='utf-8') as file: + file.write(html_with_author) + + +class BaseSubstackScraper(ABC): + def __await__(self): + return self._async_init().__await__() + + async def __aenter__(self): + return await self + + async def __aexit__(self, exc_type, exc, tb): + pass + + def __init__(self, args, base_substack_url: str, md_save_dir: str, html_save_dir: str): + if not base_substack_url.endswith("/"): + base_substack_url += "/" + self.args = args + self.base_substack_url: str = base_substack_url + + self.writer_name: str = extract_main_part(base_substack_url) + md_save_dir: str = f"{md_save_dir}/{self.writer_name}" + + self.md_save_dir: str = md_save_dir + self.html_save_dir: str = f"{html_save_dir}/{self.writer_name}" + + self.args.json_directory += f"/{self.writer_name}" + + if not os.path.exists(md_save_dir): + os.makedirs(md_save_dir) + print(f"Created md directory {md_save_dir}") + if not os.path.exists(self.html_save_dir): + os.makedirs(self.html_save_dir) + print(f"Created html directory {self.html_save_dir}") + + if not self.args.no_images: + os.makedirs(self.args.image_directory, exist_ok=True) + + if not self.args.no_json: + os.makedirs(self.args.json_directory, exist_ok=True) + + self.keywords: List[str] = ["about", "archive", "podcast"] + self.post_urls: List[str] = self.get_all_post_urls() + + async def _async_init(self): + self._loop = asyncio.get_running_loop() + return self + + def get_all_post_urls(self) -> List[str]: + """ + Attempts to fetch URLs from sitemap.xml, falling back to feed.xml if necessary. + """ + urls = self.fetch_urls_from_sitemap() + if not urls: + urls = self.fetch_urls_from_feed() + return self.filter_urls(urls, self.keywords) + + def fetch_urls_from_sitemap(self) -> List[str]: + """ + Fetches URLs from sitemap.xml. + """ + sitemap_url = f"{self.base_substack_url}sitemap.xml" + response = requests.get(sitemap_url) + + if not response.ok: + print(f'Error fetching sitemap at {sitemap_url}: {response.status_code}') + return [] + + root = ET.fromstring(response.content) + urls = [element.text for element in root.iter('{http://www.sitemaps.org/schemas/sitemap/0.9}loc')] + return urls + + def fetch_urls_from_feed(self) -> List[str]: + """ + Fetches URLs from feed.xml. + """ + print('Falling back to feed.xml. This will only contain up to the 22 most recent posts.') + feed_url = f"{self.base_substack_url}feed.xml" + response = requests.get(feed_url) + + if not response.ok: + print(f'Error fetching feed at {feed_url}: {response.status_code}') + return [] + + root = ET.fromstring(response.content) + urls = [] + for item in root.findall('.//item'): + link = item.find('link') + if link is not None and link.text: + urls.append(link.text) + + return urls + + @staticmethod + def filter_urls(urls: List[str], keywords: List[str]) -> List[str]: + """ + This method filters out URLs that contain certain keywords + """ + return [url for url in urls if all(keyword not in url for keyword in keywords)] + + @staticmethod + def html_to_md(html_content: str) -> str: + """ + This method converts HTML to Markdown + """ + if not isinstance(html_content, str): + raise ValueError("html_content must be a string") + h = html2text.HTML2Text() + h.ignore_links = False + h.body_width = 0 + return h.handle(html_content) + + @staticmethod + def save_to_file(filepath: str, content: str) -> None: + """ + This method saves content to a file. Can be used to save HTML or Markdown + """ + if not isinstance(filepath, str): + raise ValueError("filepath must be a string") + + if not isinstance(content, str): + raise ValueError("content must be a string") + + # if os.path.exists(filepath): + if False: + print(f"File already exists: {filepath}") + return + + with open(filepath, 'w', encoding='utf-8') as file: + file.write(content) + + @staticmethod + def md_to_html(md_content: str) -> str: + """ + This method converts Markdown to HTML + """ + return markdown.markdown(md_content, extensions=['extra']) + + + def save_to_html_file(self, filepath: str, content: str) -> None: + """ + This method saves HTML content to a file with a link to an external CSS file. + """ + if not isinstance(filepath, str): + raise ValueError("filepath must be a string") + + if not isinstance(content, str): + raise ValueError("content must be a string") + + # Calculate the relative path from the HTML file to the CSS file + html_dir = os.path.dirname(filepath) + css_path = os.path.relpath(self.args.assets_dir + "/css/essay-styles.css", html_dir) + css_path = css_path.replace("\\", "/") # Ensure forward slashes for web paths + + html_content = f""" + + +
+ + +" + body + "
" + body = body.replace("\n", "\n")
+ # TODO more?
+ return body
+
+ def render_comments_html_inner(comment, buf):
+ assert comment["type"] == "comment", f'unexpected comment type: {comment["type"]!r}'
+ buf.write(f'\n')
+
+ # NOTE user IDs are constant, user handles are variable
+ # when i change my user handle
+ # then other users can use my old user handle
+ if not comment["user_id"] is None:
+ buf.write(f'')
+
+ if not comment["name"] is None:
+ buf.write(comment["name"]) # human-readable username
+ else:
+ # Comment removed
+ buf.write("null")
+
+ if not comment["user_id"] is None:
+ buf.write('\n')
+ else:
+ buf.write('\n')
+
+ other_pub = comment["metadata"].get("author_on_other_pub")
+ if other_pub:
+ # NOTE publication handles are quasi-constant:
+ # when i change my publication handle
+ # then other users cannot use my old publication handle
+ # NOTE "Changing your publication's subdomain
+ # does not automatically set up a redirect from the old subdomain to the new one."
+ buf.write(f'(')
+ buf.write(other_pub["name"])
+ buf.write(')\n')
+
+ buf.write(comment["date"] + '\n') # "2025-05-17T06:51:39.485Z"
+
+ for reaction, reaction_count in comment["reactions"].items():
+ if reaction_count == 0: continue
+ buf.write(reaction + str(reaction_count) + '\n') # "❤123"
+ # buf.write(str(reaction_count) + reaction + '\n') # "123❤"
+
+ buf.write('
\n')
+
+ buf.write('\n')
+ buf.write('\n')
+
+ if comment["body"] is None:
+ # Comment removed
+ status = comment.get("status")
+ if status is None:
+ buf.write('(Comment removed)\n')
+ else:
+ # "moderator_removed", ...
+ buf.write('(status:' + status + ')\n')
+ # TODO comment["bans"]
+ # TODO comment["suppressed"]
+ # TODO comment["user_banned"]
+ # TODO comment["user_banned_for_comment"]
+ else:
+ buf.write(render_comment_body(comment["body"]) + '\n')
+
+ for child_comment in comment["children"]:
+ buf.write('\n')
+ render_comments_html_inner(child_comment, buf)
+ buf.write('
\n')
+
+ buf.write('
Comments
\n' + + '{comments_num} comments
\n' + + comments_html + '\n' + + '