DEV Community

Cover image for betterhacker.news: Aggregate news by topics using ChatGPT and FastAPI
oleg
oleg

Posted on

betterhacker.news: Aggregate news by topics using ChatGPT and FastAPI

Description

In this article, we will create a website that aggregates the latest top stories from Hacker News.
To accomplish this, we will utilize the HackerNews API to fetch today’s top stories. Additionally, we will make an OpenAI API request to group the news articles by topics, storing the results in JSON format. The website will be served using FastAPI and the Jinja Template Engine

Step 1. Get top stories from Hacker News

To see the full code listing please check worker.py file in the GitHub repo

First, let’s fetch stories ids in the form of a list of integers

def get_topstories(max_stories=30):
    # Get top stories
    topstories = requests.get("https://hacker-news.firebaseio.com/v0/topstories.json")
    if (code := topstories.status_code) != 200:
        raise ValueError(f"topstories status code: {code}")

    topstories_ids = topstories.json()

    # Filter stores
    return topstories_ids[:max_stories]  # i.e. [3000, 3004, 3051]
Enter fullscreen mode Exit fullscreen mode

As one may note, we will limit the number of stories to be analyzed by max_stories=30 parameter

The tricky part is how to perform all 30 requests async. We will use aiohttp and create helpers.py file to add functions below:

import aiohttp
import asyncio

BATCH_SIZE = 15


async def fetch_url(session, url):
    async with session.get(url) as response:
        return await response.json()


async def process_batch(session, urls):
    tasks = []
    for url in urls:
        task = asyncio.ensure_future(fetch_url(session, url))
        tasks.append(task)
    return await asyncio.gather(*tasks)


async def process_urls(urls, batch_size=BATCH_SIZE):
    async with aiohttp.ClientSession() as session:
        batches = [urls[i : i + batch_size] for i in range(0, len(urls), batch_size)]
        results = []
        for batch in batches:
            batch_results = await process_batch(session, batch)
            results.extend(batch_results)
        return results
Enter fullscreen mode Exit fullscreen mode

Now we can pass a list of URLs into process_urls to process all requests using an asynchronous approach.

Let’s prepare URLs using list_of_items:

def get_items(list_of_items: List[int], batch_size=12):
    # Prepare API requests to get all items
    URL_ITEM = "https://hacker-news.firebaseio.com/v0/item/{}.json"
    urls = [URL_ITEM.format(t_s) for t_s in list_of_items]
    loop = asyncio.get_event_loop()
    results = loop.run_until_complete(process_urls(urls, batch_size))
    return results


list_of_items = get_topstories()
results = get_items(list_of_items)

# Now we have a list of urls:
# ["https://hacker-news.firebaseio.com/v0/item/3001.json",
#  "https://hacker-news.firebaseio.com/v0/item/4001.json",
# ...]
Enter fullscreen mode Exit fullscreen mode

Next, we will transform the retrieved results into a format that is easy to parse for ChatGPT requests. We will retain both the “title” and “URL” fields since the URL can provide valuable insights for classifying elements.

results_parsed = [
f"{el['title']} URL: {el['url']}" 
for el in results if el.get("url", None) is not None
]

# The result will be:
# ["The Password Game URL: https://neal.fun/password-game/",
#  "FreeBSD Jails Containers URL: https://vermaden.wordpress.com/2023/06/28/freebsd-jails-containers/"
# ...]
Enter fullscreen mode Exit fullscreen mode

Step 2. Make OpenAI API requests and process results

First, let’s create a function named get_openai_promt. It takes List[str] as input and returns system_message and user_message (we will use chat-optimized models)

from typing import List, Tuple

def get_openai_prompt(topics: List[str]) -> Tuple[dict, dict]:
    system_message = {
        "role": "system",
        "content": (
            "You are an assistant that can group news articles from hackernews (news.ycombinator.com) into topics"
        ),
    }

    user_message = {
        "role": "user",
        "content": (
            "Group the following news articles into topics\n\n"
            + topics
            + "\n\nUse the following format:\n"
            + "topic_name_1\n- title\turl\n- title\turl\ntopic_name_2\n\ttitle\turl"
        ),
    }

    return system_message, user_message

Enter fullscreen mode Exit fullscreen mode

The next step is to request OpenAI via API, parse a response and save it as a .json file

import openai

topics = "\n\n".join(results_parsed)
s_m, u_m = get_openai_prompt(topics=topics)  # system & user messages

# Get an API-key here: https://platform.openai.com/account/api-keys
openai.api_key = "sk-74xTNuflpF3CtQAdOeD3T3BlXkFJhYw70q1XYJKxqq0XdBZS"

# Get response from the model
response = openai.ChatCompletion.create(
        model="gpt-3.5-turbo",
        messages=[s_m, u_m],
        max_tokens=2200,  # You can increase this number if needed
    )

# Get a body of the response
res = response["choices"][0]["message"]["content"].split("\n")

# Parse results
# Sometimes response may be structured in different 
current_topic = None
dict_ = {}
titles_returned = {}
for l in res:
    if l == "\n":  # We will ignore empty strings
        continue

    if not ("http://" in l.lower() or "https://" in l.lower()):
        # If there is no link in the string it means that the string is a "topic"
        current_topic = l
        continue

    # Otherwise current string is a title that contains a link as well
    if current_topic not in dict_:
        dict_[current_topic] = {}

    pattern = r"- (.+?)\s*URL:"
    pattern2 = r"- (.+?)\s*http"
    match = re.search(pattern, l)
    match2 = re.search(pattern2, l)
    if match:
        substring = str(match.group(1))
        titles_returned[substring] = current_topic
    elif match2:
        substring = str(match2.group(1))
        titles_returned[substring] = current_topic
    else:
        print(l)

data = {}
for r in results:
    if "url" not in r or "score" not in r:
        print("Skip")
        continue
    data[r["title"]] = {"url": r["url"], "score": r["score"]}

for k in data:
    if k in titles_returned:
        data[k]["topic"] = titles_returned[k]
        continue

    data[k]["topic"] = "Other"

prefix = datetime.datetime.now().strftime("%Y-%m-%d")
fname = f"data/{prefix}_articles.json"
json.dump(data, open(fname, "w"))
Enter fullscreen mode Exit fullscreen mode

The script will generate a JSON similar to the one below:

{
   "A proto-pizza emerges from a fresco on a Pompeii wall":{
      "url":"http://pompeiisites.org/en/comunicati/pompeii-a-still-life-discovered-by-the-new-excavations-of-regio-ix/",
      "score":93,
      "topic":"news"
   },
   "The hidden cost of air quality monitoring":{
      "url":"https://www.airgradient.com/blog/hidden-costs-of-air-quality-monitoring/",
      "score":395,
      "topic":"news"
   },
   "The Password Game":{
      "url":"https://neal.fun/password-game/",
      "score":929,
      "topic":"lifestyle"
   },
   "FreeBSD Jails Containers":{
      "url":"https://vermaden.wordpress.com/2023/06/28/freebsd-jails-containers/",
      "score":164,
      "topic":"technology"
   },
   "What AMD Learned from Its Big Chiplet Push":{
      "url":"https://spectrum.ieee.org/chiplet",
      "score":38,
      "topic":"technology"
   },
   "In deep space, astronomers spot precursor of carbon based life":{
      "url":"https://www.theregister.com/2023/06/27/jwst_carbon_molecule_discovery/",
      "score":39,
      "topic":"Other"
   }
}
Enter fullscreen mode Exit fullscreen mode

We are now ready to use this JSON for use in our website

Step 3. Website (FastAPI + Jinja Templates)

To see the full code listing please check app/app.py file in the GitHub repo

Let’s create app.py file in app folder

import json
from collections import defaultdict

import uvicorn
from fastapi import FastAPI, Request
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
import glob

app = FastAPI()

app.mount("/static", StaticFiles(directory="static"), name="static")
templates = Jinja2Templates(directory="templates")


@app.get("/")
def get_articles(request: Request):
    fname = sorted(glob.glob("data/*_articles.json"), reverse=True)[0]
    with open(fname, "r") as json_file:
        articles = json.load(json_file)

    grouped_articles = {}

    for title, article in articles.items():
        topic = article["topic"]
        if topic in grouped_articles:
            grouped_articles[topic][title] = article
        else:
            grouped_articles[topic] = {title: article}

    # Calculate total score for each topic/group
    topic_scores = defaultdict(lambda: 0)
    for topic, data in articles.items():
        topic_scores[data["topic"]] += data["score"]

    return templates.TemplateResponse(
        "index.html",
        {
            "request": request,
            "articles": grouped_articles,
            "topic_scores": topic_scores,
        },
    )


if __name__ == "__main__":
    uvicorn.run("app:app", host="127.0.0.1", port=5556, reload=True)
Enter fullscreen mode Exit fullscreen mode

It would be ideal to avoid reading the .json file from the file system each time and instead keep it in memory, occasionally updating it. However, for the sake of code simplicity, we have opted for the most basic code that accomplishes its task. We anticipate that the website’s load will be minimal, with less than one request per second (RPS)

Now let’s prepare index.html and styles.css files

index.html

<!DOCTYPE html>
<html>
<head>
    <title>betterhacker.news</title>
    <link rel="icon" type="image/x-icon" href="static/favicon.ico">
    <link rel="stylesheet" href="static/styles.css">
    <meta property="og:title" content="betterhacker.news">
    <meta property="og:description" content="It is like hackernews, but better">
    <meta property="og:type" content="website">
    <meta property="og:url" content="https://betterhacker.news">
</head>
<body>
    <div class="container">
        <div class="main-title">betterhacker.news<div class="main-subtitle">Hackernews Top Stories grouped using modern LLMs (ChatGPT)</div></div>

        {% for topic, data in articles.items() %}
            <div class="column">
                <h2 class="topic">{{ topic }} // {{ topic_scores[topic] }} ❤️‍🔥</h2>
                <ul>
                    {% for title, article in data.items() %}
                        <li>
                            <div class="title">
                                <a href="{{ article.url }}">{{ title }}</a>
                                <span class="score"> {{ article.score }} <span class="emoji">❤️</span></span>
                            </div>
                        </li>
                    {% endfor %}
                </ul>
            </div>
        {% endfor %}
    </div>
    <div class="footer">
        Created by <a href="https://olegkhomenko.me" class="footer-link">Oleg Khomenko</a>
    </div>
</body>
</html>
Enter fullscreen mode Exit fullscreen mode

styles.css

body {
    font-family: Arial, sans-serif;
    margin: 0;
    padding: 20px;
}

.container {
    display: flex; justify-content: space-between;
    flex-wrap: wrap; max-width: 1200px;
    margin: 0 auto; background-color: #fff;
    box-shadow: 0 2px 4px rgba(0, 0, 0, 0.1);
    border-radius: 5px; padding: 30px;
}

.column {
    flex-basis: 48%;
}

.topic {
    font-size: 20px; font-weight: bold; margin-top: 20px;
}

ul {
    list-style-type: none; padding: 0; margin: 0;
}

li {
    margin-bottom: 10px;
}

.title {
    display: flex;
    align-items: center;
}

.emoji {
    margin-right: 5px;
    font-size: 0.8em;
}

.score {
    font-size: 0.8em;
    color: gray;
    margin-left: 5px;
}

.main-title {
    text-align: center; font-size: 32px;
    font-weight: bold; margin-bottom: 40px;
}

.main-subtitle {
    text-align: center; font-size: 18px; 
    color: rgba(128, 128, 128, 0.8);
}

a {
    text-decoration: none;
    color: #007bff;
}

a:hover {
    text-decoration: underline;
}

.footer {
    text-align: center; margin-top: 40px; font-size: 14px; color: rgba(0, 0, 0, 0.6);
}

.footer-link {
    color: #007bff;
}

.footer-link:hover {
    text-decoration: underline;
}

@media (max-width: 600px) {
    .column {
        flex-basis: 100%;
    }
}
Enter fullscreen mode Exit fullscreen mode

Step 4. Run and see the result

To simultaneously run both scripts, app.py for the web server and worker.py for interacting with an external API, we can utilize tmux

tmux


tmux allows multiple terminal sessions to be accessed simultaneously in a single window

To run the server, use the following command

uvicorn app.app:app --port 5556
Enter fullscreen mode Exit fullscreen mode

To run the worker, use the command below

while true; do python3 worker.py; ls data/*; sleep 12h; done
Enter fullscreen mode Exit fullscreen mode

Now you can open your favourite browser and test the result: http://localhost:5556 or test the production version at https://betterhacker.news

the result

Thank you for taking the time to read this.
You can find the complete code available on GitHub

Top comments (0)