DEV Community

Cover image for LGTM Devlog 21: Deploying Pub/Sub-triggered Python Google Cloud Functions
Yuan Gao
Yuan Gao

Posted on

LGTM Devlog 21: Deploying Pub/Sub-triggered Python Google Cloud Functions

With the quest storage structure adequately defined, it's now time to hook up the game creation endpoint, which is part of a trio of cloud functions involved in handling new users:

  1. GitHub webhook listener - listens for webhook activity on GitHub. This function is specific to GitHub. And triggers a new game if it receives one
  2. Auth flow - a function triggered by the frontend when a user logs in/authenticates, and should attach any games to their account if not already
  3. Create new game - a function that will create new game data and kick off the first quest. This is triggered by the webhook listener

Since we need a function to trigger another function internally, we can use Google Cloud's pub/sub triggers, which are internal. Though we could also use authenticated HTTP functions, we can switch this later if pub/sub turns out to be the wrong choice. The code matching this post can be found at commit c9074d0

Payload validation

As usual, I will make use of pydantic to perform payload validation, with a model defined, but with a twist: pubsub data comes over base64-encoded, so I've sub-classed pydantic's BaseModel to include a couple of class methods that deal with base64 data, and even specifically consuming the event structure that comes to us from the pub/sub trigger runtime

from typing import Optional
from base64 import b64decode
from pydantic import BaseModel, Field  # pylint: disable=no-name-in-module

# pylint: disable=too-few-public-methods,missing-class-docstring
class BaseModelWithPubSub(BaseModel):
    """ Extra decode functions for pubsub data """

    @classmethod
    def from_base64(cls, data: bytes):
        return cls.parse_raw(b64decode(data).decode("utf-8"))

    @classmethod
    def from_event(cls, event: dict):
        return cls.from_base64(event["data"])


class NewGameData(BaseModelWithPubSub):
    """ Create a new game """

    source: str = Field(..., title="User source, e.g. 'github'")
    userId: str = Field(..., title="User ID")
    userUid: Optional[str] = Field(None, title="Auth UID if known")
    forkUrl: str = Field(..., title="URL of the LGTM fork")
Enter fullscreen mode Exit fullscreen mode

Endpoint

So that leaves the endpoint looking like this, it does the following:

  1. Decode the event payload
  2. Create a game structure in Firestore. Right now it only has the user-identification data, we'll add to it as we build game logic
  3. Create the start quest as well, using the fancy new Quest object we created to generate the data for a new game, OR having it load any existing one and save a new version. The anticipation here is if we have to deal with any quest version updates, we can let the quest object deal with it if we need to.
def create_new_game(event: dict, context: Context):
    """ Create a new game """
    logger.info("Got create new game request", payload=event)

    # decode event
    try:
        new_game_data = NewGameData.from_event(event)
    except ValidationError as err:
        logger.error("Validation error", err=err)
        raise err

    logger.info("Resolved data", new_game_data=new_game_data)

    # create game if doesn't exist
    game_id = create_game_id(new_game_data.source, new_game_data.userId)
    game_ref = db.collection("game").document(game_id)
    game = game_ref.get()
    if game.exists:
        logger.info("Game already exists", game_id=game_id)
        game_ref.set(
            {
                **new_game_data.dict(),
            },
            merge=True,
        )
    else:
        logger.info("Creating new game", game_id=game_id)
        game_ref.set(
            {
                **new_game_data.dict(),
                "joined": firestore.SERVER_TIMESTAMP,
            }
        )

    # create starting quest if not exist
    FirstQuest = get_quest_by_name(FIRST_QUEST_NAME)
    quest_obj = FirstQuest()

    quest_id = create_quest_id(game_id, FIRST_QUEST_NAME)
    quest_ref = db.collection("quest").document(quest_id)

    quest = quest_ref.get()
    if quest.exists:
        logger.info("Quest already exists, updating", quest_id=quest_id)

        try:
            quest_obj.load(quest.to_dict())
        except QuestLoadError as err:
            logger.error("Could not load", err=err)
            raise err

    quest_ref.set(quest_obj.get_save_data())
Enter fullscreen mode Exit fullscreen mode

Tests

The text fixture for triggering a pub/sub payload as a little more complex, as it needs to contain a json structure. Interestingly, the functions-framework appears to still use an the test-client with a Post() request, though I don't know exactly what's going on under the hood - it's not well documented and I'm figuring things out from their own unit tests.

@pytest.fixture(scope="package")
def new_game_post():
    """ Test client for newgame"""
    client = create_app("create_new_game", FUNCTION_SOURCE, "event").test_client() 

    return lambda data: client.post(
        "/",
        json={
            "context": {
                "eventId": "some-eventId",
                "timestamp": "some-timestamp",
                "eventType": "some-eventType",
                "resource": "some-resource",
            },
            "data": {"data": b64encode(json.dumps(data).encode()).decode()},
        },
    )
Enter fullscreen mode Exit fullscreen mode

The test (I guess an integration test) looks like this:


@pytest.fixture
def new_game_data(firestore_client):
    uid = "test_user_" + "".join(
        [random.choice(string.ascii_letters) for _ in range(6)]
    )

    # create user data
    yield NewGameData(
        source=SOURCE,
        userId=uid,
        userUid=uid,
        forkUrl="test_url",
    ).dict()

    # cleanup
    game_id = create_game_id(SOURCE, uid)
    quest_id = create_quest_id(game_id, FIRST_QUEST_NAME)
    firestore_client.collection("game").document(game_id).delete()
    firestore_client.collection("quest").document(quest_id).delete()


def test_fail_quest(firestore_client, new_game_post, new_game_data):
    """ Test situation where quest creation fails """

    # check game and quest does not exist
    game_id = create_game_id(SOURCE, new_game_data["userId"])
    quest_id = create_quest_id(game_id, FIRST_QUEST_NAME)

    firestore_client.collection("quest").document(quest_id).set({"_version": "999.9.9"})

    # create!
    res = new_game_post(new_game_data)
    assert res.status_code == 500


def test_game_creation(firestore_client, new_game_post, new_game_data):
    """ Test successful game creation flow """

    # check game and quest does not exist
    game_id = create_game_id(SOURCE, new_game_data["userId"])
    quest_id = create_quest_id(game_id, FIRST_QUEST_NAME)

    game = firestore_client.collection("game").document(game_id).get()
    assert not game.exists
    quest = firestore_client.collection("quest").document(quest_id).get()
    assert not quest.exists

    # create!
    res = new_game_post(new_game_data)
    assert res.status_code == 200

    # check if game actually created, and that it contains data
    game = firestore_client.collection("game").document(game_id).get()
    assert game.exists
    game_dict = game.to_dict()
    assert game_dict.items() >= new_game_data.items()

    # check if quest was created and that it contains data
    quest = firestore_client.collection("quest").document(quest_id).get()
    assert quest.exists

    # try create again, sohuld still work, but be idempotent
    res = new_game_post(new_game_data)
    assert res.status_code == 200

Enter fullscreen mode Exit fullscreen mode

It's a little unclear, I'm beginning to feel that I need to significantly modularize and refactor this codebase at this point, as we now have game logic strewn across endpoints, a lot of duplicate boilercode. I have some ways in mind of how to deal with it, and will leave it for later.

Top comments (0)