DEV Community

Cover image for Automating Daily arXiv Paper Summaries with Slack Notifications
M Sea Bass
M Sea Bass

Posted on

Automating Daily arXiv Paper Summaries with Slack Notifications

This post is a follow-up to the previous article. It turns out there’s a slight delay before the latest papers show up in the arXiv API. Because of this delay, the same paper can sometimes appear the next day.

To fix this, we’re going to record the timestamp of the last retrieved paper and then only fetch new papers each day.

We’ll store the timestamp of the latest paper in Amazon S3 so we can both update and retrieve it later. For this, you’ll need to install boto3. In the python folder we created previously, run:

pip install boto3 -t python
Enter fullscreen mode Exit fullscreen mode

Next, zip the folder again and upload it as a new version of your Lambda layer:

zip -r ./upload.zip ./python/*
Enter fullscreen mode Exit fullscreen mode

Then, update your Lambda function to use this new layer version.

You’ll also need an S3 bucket ready in advance. In this example, we simply created one with the default settings.

Below is the fully revised code in English, including the new functions to update and retrieve the timestamp from S3. Note that we set S3_BUCKET_NAME as an environment variable.

import datetime
import json
import logging
import os
import time

import arxiv
import boto3
import google.generativeai as genai
from slack_sdk import WebClient
from slack_sdk.errors import SlackApiError

# List of arXiv categories to search for
PAPER_TYPE_LIST = ["cs.AI", "cs.CY", "cs.MA"]

ARXIV_API_URL = "http://export.arxiv.org/api/query"
GEMINI_API_KEY = os.environ["GEMINI_API_KEY"]
GEMINI_MODEL = "gemini-2.0-flash"

# Slack bot token (xoxb-...)
SLACK_TOKEN = os.environ["SLACK_BOT_TOKEN"]
SLACK_BOT_TOKEN = os.environ["SLACK_BOT_TOKEN"]
# ID of the Slack channel to post to (e.g., "C12345...")
SLACK_CHANNEL = os.environ["SLACK_CHANNEL"]

# Name of the S3 bucket to use
S3_BUCKET_NAME = os.environ["S3_BUCKET_NAME"]

# Number of results to fetch at once
MAX_RESULTS = 50

# If the timestamp file does not exist, fallback to (current UTC - NO_EXIST_DAYS)
NO_EXIST_DAYS = 3


def save_utc_datetime(utc_datetime, filename="utc_date.json"):
    """
    Save the given UTC datetime to an S3 bucket in JSON format.
    """
    s3 = boto3.resource("s3")
    bucket = s3.Bucket(S3_BUCKET_NAME)
    obj = bucket.Object(filename)
    obj.put(Body=json.dumps({"utc_datetime": utc_datetime.isoformat()}))
    return utc_datetime


def load_datetime_from_file(filename="utc_date.json"):
    """
    Load the UTC datetime from an S3 JSON file.
    If the file does not exist, return (current UTC time - NO_EXIST_DAYS).
    """
    try:
        s3 = boto3.resource("s3")
        bucket = s3.Bucket(S3_BUCKET_NAME)
        obj = bucket.Object(filename)
        response = obj.get()
        body = response["Body"].read()
        data = json.loads(body.decode("utf-8"))
        return datetime.datetime.fromisoformat(data["utc_datetime"])
    except Exception:
        # If file doesn't exist, return current UTC datetime minus NO_EXIST_DAYS
        return datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(days=NO_EXIST_DAYS)


def get_papers(max_results: int = 10):
    """
    Retrieve papers from arXiv based on the specified PAPER_TYPE_LIST.
    Only papers published after the timestamp recorded in S3 will be returned.
    """
    query = " OR ".join([f"cat:{paper_type}" for paper_type in PAPER_TYPE_LIST])
    client = arxiv.Client()
    search = arxiv.Search(
        query=query,
        max_results=max_results,
        sort_by=arxiv.SortCriterion.SubmittedDate,
        sort_order=arxiv.SortOrder.Descending,
    )
    result_list = list(client.results(search))

    if not result_list:
        return []

    # The latest published date from the fetched results
    latest_published = result_list[0].published

    # Load the saved threshold from S3
    threshold = load_datetime_from_file()

    # Filter out papers published before the saved threshold
    filtered_results = [paper for paper in result_list if paper.published >= threshold]

    # Convert to a simpler data structure
    filtered_results = [
        {
            "title": paper.title,
            "summary": paper.summary,
            "pdf_url": paper.pdf_url,
            "published": paper.published,
        }
        for paper in filtered_results
    ]

    # Save the new threshold (slightly advanced by 1 second to avoid duplicates)
    save_utc_datetime(latest_published + datetime.timedelta(seconds=1))

    return filtered_results


def generate_summary(abstract_text):
    """
    Use the Generative AI API to summarize the paper's abstract.
    """
    genai.configure(api_key=GEMINI_API_KEY)
    model = genai.GenerativeModel(GEMINI_MODEL)  # The model to use, e.g., "gemini-2.0-flash"

    prompt = (
        "Please provide a concise summary (in English, within 50 words) of the following paper abstract "
        "that even beginners can understand. Be sure to include the significance and results of the paper. "
        "Output only the summary text.\n\n"
        "---\n\n"
        f"{abstract_text}"
    )
    response = model.generate_content(prompt)
    summary_text = response.text.strip()

    return summary_text


def send_slack_message(papers):
    """
    Format and send the summaries to Slack.
    """
    all_messages = []
    for i, paper in enumerate(papers, 1):
        title = paper["title"]
        summary_text = paper["summary"]
        link = paper["pdf_url"]
        published = paper["published"]
        message_text = (
            f"{i}: *{title}*\n\n"
            f"{summary_text}\n\n"
            f"PDF: {link}\n"
            f"Published: {published}"
        )
        all_messages.append(message_text)

    # Separate entries with a divider
    all_message = "\n\n────────────────────────\n\n".join(all_messages)

    client = WebClient(token=SLACK_BOT_TOKEN)
    logger = logging.getLogger(__name__)

    try:
        # If there are no papers, show a message
        if not all_message:
            all_message = "No new papers were found."
        # Post to Slack
        result = client.chat_postMessage(channel=SLACK_CHANNEL, text=all_message)
        logger.info(result)
    except SlackApiError as e:
        logger.error(f"Error posting message: {e}")


def main():
    """
    Main entry point:
    1. Get papers from arXiv.
    2. Summarize each paper.
    3. Send summaries to Slack.
    """
    papers = get_papers(MAX_RESULTS)
    output_papers = []
    for i, paper in enumerate(papers, 1):
        title = paper["title"]
        abstract = paper["summary"]
        link = paper["pdf_url"]
        published = paper["published"]
        summary = generate_summary(abstract)
        output_papers.append({
            "title": title,
            "summary": summary,
            "pdf_link": link,
            "published": published
        })
        # Small delay to avoid hitting rate limits
        time.sleep(1)
    send_slack_message(output_papers)


def lambda_handler(event, context):
    """
    AWS Lambda handler function.
    """
    main()
    return {
        'statusCode': 200,
        'body': "Successfully sent message to Slack!"
    }
Enter fullscreen mode Exit fullscreen mode

By saving the timestamp in S3, your script won’t process the same paper entries each day, and if no new papers appear, the script will skip generating summaries. This helps reduce unnecessary API usage and costs.

Billboard image

Deploy and scale your apps on AWS and GCP with a world class developer experience

Coherence makes it easy to set up and maintain cloud infrastructure. Harness the extensibility, compliance and cost efficiency of the cloud.

Learn more

Top comments (0)

A Workflow Copilot. Tailored to You.

Pieces.app image

Our desktop app, with its intelligent copilot, streamlines coding by generating snippets, extracting code from screenshots, and accelerating problem-solving.

Read the docs

👋 Kindness is contagious

Please leave a ❤️ or a friendly comment on this post if you found it helpful!

Okay