DEV Community

Cover image for JSON, Search and Pydantic for Data Serialization with Redis Cloud
Alex
Alex

Posted on

JSON, Search and Pydantic for Data Serialization with Redis Cloud

Setup

Python Requirements

The Python packages required are the following.

~/requirements.txt

pydantic==1.10.2
redis==4.3.4
redis-om==0.0.27
Enter fullscreen mode Exit fullscreen mode

Environment Variables

To get your free redis cloud account, feel free to use the links at the bottom of this post.

Once you have logged in and created your free account, you can acquire your credentials from your dashboard.

This is the connection URI required to connect to you redis instance.

We will use this later on in our python code.

~/.env

PORT=13832
USERNAME=default
PASSWORD=abc123
URL=redis-332123.c321.us-central1-1.gce.cloud.redislabs.com
DATABASE=0

REDIS_OM_URL=redis://{USERNAME}:{PASSWORD}@{URL}:{PORT}/{DATABASE}
Enter fullscreen mode Exit fullscreen mode

Enum

Let's write an enumerated class to keep track of all the boards and limit which options our function can take.
We will use this as the argument to the function that calls the 4chan API.

Enum classes in Python have many uses.
In this instance our enum will validate the name of the board we want to get data from.
If we pass the wrong board name our function will fail immediately, by raising a KeyError.
This is the ideal solution, as we would like to know we've passed the wrong board name before making the get request to the API.

~/schema.py

class Board(str, Enum):
    wg = 'wg'
    v = 'v'
    b = 'b'
    s = 's'
    h = 'h'
    c = 'c'
    e = 'e'
    g = 'g'
    k = 'k'
    o = 'o'
    u = 'u'
    vg = 'vg'
    s4s = 's4s'
    cm = 'cm'
    hm = 'hm'
    lgbt = 'lgbt'
    sci = 'sci'
    wsg = 'wsg'
    adv = 'adv'
    an = 'an'
    out = 'out'
    trv = 'trv'
    sp = 'sp'
    soc = 'soc'
    fit = 'fit'
    biz = 'biz'
    fa = 'fa'
    tg = 'tg'
    w = 'w'
    x = 'x'
Enter fullscreen mode Exit fullscreen mode

JSON Serialization

This is the pydantic model that will serialize the JSON data coming from the 4chan API.

"Serialization" is the process of converting the data type from one language to another.

In this case we are serializing JSON data to Python types, and then serializing back into JSON before being inserted into Redis.

~/schema.py

class CatalogBase(BaseModel):
    no: int = Field(None, description="always | The numeric post ID | any positive integer")
    resto: int = Field(None, description="always | For replies: this is the ID of the thread being replied to. For OP: this value is zero |`0` or `Any positive integer")
    sticky: int = Field(None, description="OP only, if thread is currently stickied | If the thread is being pinned to the top of the page| `1` or not set")
    closed: int = Field(None, description="OP only, if thread is currently closed | If the thread is closed to replies | `1` or not set")
    now: str = Field(None, description="always | MM/DD/YY(Day)HH:MM (:SS on some boards), EST/EDT timezone | `string")
    time: int = Field(None, description="always | UNIX timestamp the post was created | UNIX timestamp")
    name: str = Field(None, description="always | Name user posted with. Defaults to Anonymous | any string")
    trip: str = Field(None, description="if post has tripcode | The users tripcode, in format: !tripcode or !!securetripcode| any string")
    id: str = Field(None, description="if post has ID | posters ID | any 8 chars")

    capcode: str = Field(None, description="if post has capcode | The capcode identifier for a post | Not set, mod, admin, admin_highlight, manager, developer, founder")

    country: str = Field(None, description="if country flags are enabled | Posters [ISO 3166-1 alpha-2 country code](https://en.wikipedia.org/wiki/ISO_3166-1_alpha-2 | 2 character string or XX if unknown")
    country_name: str = Field(None, description="if country flags are enabled | Posters country name | Name of any country")

    sub: str = Field(None, description="OP only, if subject was included| OP Subject text | any string")

    com: str = Field(None, description="if comment was included | Comment (HTML escaped) | any HTML escaped string")

    tim: int = Field(None, description="always if post has attachment | Unix timestamp + microtime that an image was uploaded | integer")
    filename: str = Field(None, description="always if post has attachment | Filename as it appeared on the poster's device | any string")
    ext: str = Field(None, description="always if post has attachment | Filetype | jpg, png, gif, pdf, swf, webm")
    fsize: int = Field(None, description="always if post has attachment | Size of uploaded file in bytes | any integer")
    md5: str = Field(None, description="always if post has attachment | 24 character, packed base64 MD5 hash of file")
    w: int = Field(None, description="always if post has attachment | Image width dimension | `any integer")
    h: int = Field(None, description="always if post has attachment | Image height dimension | `any integer")
    tn_w: int = Field(None, description="always if post has attachment | Thumbnail image width dimension | any integer")
    tn_h: int = Field(None, description="always if post has attachment | Thumbnail image height dimension | any integer")
    filedeleted: int = Field(None, description="if post had attachment and attachment is deleted | If the file was deleted from the post | `1` or not set")
    spoiler: int = Field(None, description="if post has attachment and attachment is spoilered | If the image was spoilered or not | `1` or not set")
    custom_spoiler: int = Field(None, description="if post has attachment and attachment is spoilered | The custom spoiler ID for a spoilered image | `1-10` or not set |")
    omitted_posts: int = Field(None, description="OP only| Number of replies minus the number of previewed replies | `any integer` |")
    omitted_images: int = Field(None, description="OP only| Number of image replies minus the number of previewed image replies | `any integer` |")
    replies: int = Field(None, description="OP only | Total number of replies to a thread | any integer")
    images: int = Field(None, description="OP only | Total number of image replies to a thread | any integer")
    bumplimit: int = Field(None, description="OP only, only if bump limit has been reached | If a thread has reached bumplimit, it will no longer bump | `1` or not set |")
    imagelimit: int = Field(None, description="OP only, only if image limit has been reached | If an image has reached image limit, no more image replies can be made  | `1` or not set |")
    last_modified: int = Field(None, description="OP only | UNIX timestamp marking last time thread was modified post | added/modified/deleted, thread closed/sticky settings modified | `UNIX Timestamp")
    tag: str = Field(None, description="OP only, /f/ only | The category of `.swf` upload |`Game`, `Loop`, etc")
    semantic_url: str = Field(None, description="OP only | SEO URL slug for thread | `string` |")
    since4pass: int = Field(None, description="if poster put 'since4pass' in the options field` | Year 4chan pass bought | `any 4 digit year`")
    unique_ips: int = Field(None, description="OP only | Number of unique posters in a thread | any integer")
    m_img: int = Field(None, description="any post that has a mobile-optimized image` | Mobile optimized image exists for post | 1 or not set")

class CatalogThread(CatalogBase):
    board: Board
    last_replies: List[CatalogBase] = []# catalog OP only | JSON representation of the most recent replies to a thread | array of JSON post objects")
Enter fullscreen mode Exit fullscreen mode

Calling our external API

This function calls the 4chan API.

As you can see we limit the arguments to it using our enum class, using the Board[board] syntax.

For example, if the wg or Board.wg were not a property of the Board enum class it will throw a KeyError before being passed to requests.get().

The function then calls our external API, serializes the JSON data to python types and appends it to a list.

~/deps.py

def get_catalog(board: Board) -> List[CatalogThread]:
    url = f'https://a.4cdn.org/{Board[board]}/catalog.json'
    data = requests.get(url).json()
    all_posts = []
    for page in data:
        for thread in page['threads']:
            '''attach board to thread'''
            thread['board'] = board
            all_posts.append(CatalogThread(**thread))
    return all_posts
Enter fullscreen mode Exit fullscreen mode

Redis CRUD operations - lpush and lrange

As an introduction, let's go over how we could perform some CRUD operations without RedisJSON first.

Now that we have code to call our external API, we can begin to determine how we will insert it into redis.

~/db.py

from redis_om import get_redis_connection
from deps import get_catalog
from schemas import Board
import redis
import os
from redis.commands.json.path import Path
from redis.commands.search.query import Query
from redis.commands.search.indexDefinition import IndexDefinition, IndexType
from redis.commands.search.field import (
    NumericField,
    TagField,
    TextField
)
from typing import (
    Tuple, 
    Union, 
    Optional, 
    List
)

r_conn = get_redis_connection()
KEY = Board.wg.value

class Lpush_Lrange(object):

    def create_lpush(self):
        data = get_catalog(KEY)
        for catalog_model in data:
            r = r_conn.lpush(KEY, catalog_model.json())
            print(r)

    def read_lrange(self):
        posts = []
        #this
        end = r_conn.llen(KEY)
        # or this to get all records
        # end = -1
        for item in r_conn.lrange(KEY, start=0, end=end):
            d = json.loads(item)
            posts.append(CatalogThread(**d))
        print(posts)
        print(len(posts))

    def delete_all_data(self):
        print('count before: ', r_conn.dbsize())
        r_conn.flushdb()
        print('count after: ', r_conn.dbsize())
        assert r_conn.dbsize() == 0

if __name__ == "__main__":
    lpr = Lpush_Lrange()
    lpr.read_lrange()
    lpr.delete_all_data()
Enter fullscreen mode Exit fullscreen mode

Create and Update

In this example create will be done using the same function.

Due to the way we structured the field name when we run the update_db function the key is updated at the given JSON path. In this case chan:<post number>.

https://redis.io/docs/stack/json/path/

This solution keeps us from having to write extra code to check if a prefix exists before updating.
We can have this code execute on a schedule and know it will be updated in place.

r_conn = get_redis_connection(encoding='utf-8', decode_responses=True)

class JsonSearch(object):

    URL = os.environ.get("REDIS_OM_URL")

    def __init__(self, key: str):
        if not self.URL:
            raise ValueError('Dont forget to set your REDIS_OM_URL environment variable!!')

        self.KEY = key
        self.idx_prefix = [f"{self.KEY}:"]
        self.idx_definition = IndexDefinition(prefix=self.idx_prefix, index_type=IndexType.JSON)

    def update_db(self):
        data = get_catalog(Board.pol.value)
        for catalog_model in data:
            r = r_conn.json().set(
                f'{self.KEY}:{catalog_model.no}', 
                Path.root_path(), 
                {self.KEY: catalog_model.dict()}
            )
            print(r)

if __name__ == "__main__":
    KEY = 'chan'
    js = JsonSearch(KEY)
Enter fullscreen mode Exit fullscreen mode

RediSearch with RedisJSON

https://redis.com/blog/getting-started-with-redisearch-2-0/

The first requirement for search is to create a RediSearch index.

To create an index, you must define a schema to list the fields and their types to be indexed.

This will be used in your queries.

    def create_idx(
        self, 
        schema: Tuple[Union[TextField, TagField, NumericField]]
        ) -> None:
        try:
            r_conn.ft().create_index(schema, definition=self.idx_definition)
        except redis.ResponseError as err:
            print(err)
            r_conn.ft().dropindex(delete_documents=False)
            r_conn.ft().create_index(schema, definition=self.idx_definition)

    def search_any(self, text: str, as_name: str):
        query = Query(text).return_field(as_name).highlight().summarize()
        search_results = r_conn.ft().search(query)
        for i in search_results.docs:
            print('--'*34)
            print(getattr(i, as_name))

        print('=='* 39)
        print('Summary')
        print('Redis Args: ', query.get_args())
        print('total: ', search_results.total)

if __name__ == "__main__":
    KEY = 'chan'
    schema = (
        TextField(f"$.{KEY}.com", as_name='op_com', weight=5.0),
        TextField(f"$.{KEY}.country_name", as_name='country_name', weight=1.0)
    )

    js = JsonSearch(KEY)
    js.update_db()
    js.create_idx(schema)
    js.get_idx_info

    js.search_any('USA', 'country_name')
Enter fullscreen mode Exit fullscreen mode

Let's add some more methods to implement full text search on the comments and country_name fields, in our pydantic model.

Creating the Index Schema

schema = (
    TextField(f"$.{KEY}.com", as_name='op_com', weight=5.0),
    TextField(f"$.{KEY}.country_name", as_name='country_name', weight=1.0)
)
Enter fullscreen mode Exit fullscreen mode

A pre-requisite for full text search is creating an index for the specific key we want to search.

Let's create one on com and country_name.

The as_name field creates an alias for that specific field which will used to reference that field in search_any() method.

Writing a Query.

The final step is writing our query.

There are a few options for writing a query using redis-py.

    def search_any(self, text: str, as_name: str):
        query = Query(text).return_field(as_name).highlight().summarize()
        search_results = r_conn.ft().search(query)
        for i in search_results.docs:
            print('--'*34)
            print(getattr(i, as_name))

        print('=='* 39)
        print('Summary')
        print('Redis Args: ', query.get_args())
        print('total: ', search_results.total)
Enter fullscreen mode Exit fullscreen mode

You will notice a few methods attached to the Query class.

.return_field(as_name)

This methods returns the specific field from the query match, as opposed to returning all fields in the document.

.highlight()

This is a nice feature if your implementing a search field in a front-end application.

It adds HTML <b>{search term}</b> bold tags around all the matches in the returned text, allowing for an easier time displaying results in a frontend application.

.summarize()

This allows you to shorten the result of the returned text.

You can set the number of start and end words returned around your search term.

Conclusion

Redis is a very fast database with many uses. As someone who uses MongoDB on a daily basis, it has all the similarities with the added benefit of extra speed.

Feel free to use the links below to see for yourself.

References

Try Redis Cloud for free

Watch this video on the benefits of Redis Cloud over other Redis providers

Redis Developer Hub - tools, guides, and tutorials about Redis

RedisInsight Desktop GUI

Top comments (0)