Setup
Python Requirements
The Python packages required are the following.
~/requirements.txt
pydantic==1.10.2
redis==4.3.4
redis-om==0.0.27
Environment Variables
To get your free redis cloud account, feel free to use the links at the bottom of this post.
Once you have logged in and created your free account, you can acquire your credentials from your dashboard.
This is the connection URI required to connect to you redis instance.
We will use this later on in our python code.
~/.env
PORT=13832
USERNAME=default
PASSWORD=abc123
URL=redis-332123.c321.us-central1-1.gce.cloud.redislabs.com
DATABASE=0
REDIS_OM_URL=redis://{USERNAME}:{PASSWORD}@{URL}:{PORT}/{DATABASE}
Enum
Let's write an enumerated class to keep track of all the boards and limit which options our function can take.
We will use this as the argument to the function that calls the 4chan API.
Enum classes in Python have many uses.
In this instance our enum will validate the name of the board we want to get data from.
If we pass the wrong board name our function will fail immediately, by raising a KeyError
.
This is the ideal solution, as we would like to know we've passed the wrong board name before making the get
request to the API.
~/schema.py
class Board(str, Enum):
wg = 'wg'
v = 'v'
b = 'b'
s = 's'
h = 'h'
c = 'c'
e = 'e'
g = 'g'
k = 'k'
o = 'o'
u = 'u'
vg = 'vg'
s4s = 's4s'
cm = 'cm'
hm = 'hm'
lgbt = 'lgbt'
sci = 'sci'
wsg = 'wsg'
adv = 'adv'
an = 'an'
out = 'out'
trv = 'trv'
sp = 'sp'
soc = 'soc'
fit = 'fit'
biz = 'biz'
fa = 'fa'
tg = 'tg'
w = 'w'
x = 'x'
JSON Serialization
This is the pydantic model that will serialize the JSON data coming from the 4chan API.
"Serialization" is the process of converting the data type from one language to another.
In this case we are serializing JSON data to Python types, and then serializing back into JSON before being inserted into Redis.
~/schema.py
class CatalogBase(BaseModel):
no: int = Field(None, description="always | The numeric post ID | any positive integer")
resto: int = Field(None, description="always | For replies: this is the ID of the thread being replied to. For OP: this value is zero |`0` or `Any positive integer")
sticky: int = Field(None, description="OP only, if thread is currently stickied | If the thread is being pinned to the top of the page| `1` or not set")
closed: int = Field(None, description="OP only, if thread is currently closed | If the thread is closed to replies | `1` or not set")
now: str = Field(None, description="always | MM/DD/YY(Day)HH:MM (:SS on some boards), EST/EDT timezone | `string")
time: int = Field(None, description="always | UNIX timestamp the post was created | UNIX timestamp")
name: str = Field(None, description="always | Name user posted with. Defaults to Anonymous | any string")
trip: str = Field(None, description="if post has tripcode | The users tripcode, in format: !tripcode or !!securetripcode| any string")
id: str = Field(None, description="if post has ID | posters ID | any 8 chars")
capcode: str = Field(None, description="if post has capcode | The capcode identifier for a post | Not set, mod, admin, admin_highlight, manager, developer, founder")
country: str = Field(None, description="if country flags are enabled | Posters [ISO 3166-1 alpha-2 country code](https://en.wikipedia.org/wiki/ISO_3166-1_alpha-2 | 2 character string or XX if unknown")
country_name: str = Field(None, description="if country flags are enabled | Posters country name | Name of any country")
sub: str = Field(None, description="OP only, if subject was included| OP Subject text | any string")
com: str = Field(None, description="if comment was included | Comment (HTML escaped) | any HTML escaped string")
tim: int = Field(None, description="always if post has attachment | Unix timestamp + microtime that an image was uploaded | integer")
filename: str = Field(None, description="always if post has attachment | Filename as it appeared on the poster's device | any string")
ext: str = Field(None, description="always if post has attachment | Filetype | jpg, png, gif, pdf, swf, webm")
fsize: int = Field(None, description="always if post has attachment | Size of uploaded file in bytes | any integer")
md5: str = Field(None, description="always if post has attachment | 24 character, packed base64 MD5 hash of file")
w: int = Field(None, description="always if post has attachment | Image width dimension | `any integer")
h: int = Field(None, description="always if post has attachment | Image height dimension | `any integer")
tn_w: int = Field(None, description="always if post has attachment | Thumbnail image width dimension | any integer")
tn_h: int = Field(None, description="always if post has attachment | Thumbnail image height dimension | any integer")
filedeleted: int = Field(None, description="if post had attachment and attachment is deleted | If the file was deleted from the post | `1` or not set")
spoiler: int = Field(None, description="if post has attachment and attachment is spoilered | If the image was spoilered or not | `1` or not set")
custom_spoiler: int = Field(None, description="if post has attachment and attachment is spoilered | The custom spoiler ID for a spoilered image | `1-10` or not set |")
omitted_posts: int = Field(None, description="OP only| Number of replies minus the number of previewed replies | `any integer` |")
omitted_images: int = Field(None, description="OP only| Number of image replies minus the number of previewed image replies | `any integer` |")
replies: int = Field(None, description="OP only | Total number of replies to a thread | any integer")
images: int = Field(None, description="OP only | Total number of image replies to a thread | any integer")
bumplimit: int = Field(None, description="OP only, only if bump limit has been reached | If a thread has reached bumplimit, it will no longer bump | `1` or not set |")
imagelimit: int = Field(None, description="OP only, only if image limit has been reached | If an image has reached image limit, no more image replies can be made | `1` or not set |")
last_modified: int = Field(None, description="OP only | UNIX timestamp marking last time thread was modified post | added/modified/deleted, thread closed/sticky settings modified | `UNIX Timestamp")
tag: str = Field(None, description="OP only, /f/ only | The category of `.swf` upload |`Game`, `Loop`, etc")
semantic_url: str = Field(None, description="OP only | SEO URL slug for thread | `string` |")
since4pass: int = Field(None, description="if poster put 'since4pass' in the options field` | Year 4chan pass bought | `any 4 digit year`")
unique_ips: int = Field(None, description="OP only | Number of unique posters in a thread | any integer")
m_img: int = Field(None, description="any post that has a mobile-optimized image` | Mobile optimized image exists for post | 1 or not set")
class CatalogThread(CatalogBase):
board: Board
last_replies: List[CatalogBase] = []# catalog OP only | JSON representation of the most recent replies to a thread | array of JSON post objects")
Calling our external API
This function calls the 4chan API.
As you can see we limit the arguments to it using our enum class, using the Board[board]
syntax.
For example, if the wg
or Board.wg
were not a property of the Board enum class it will throw a KeyError
before being passed to requests.get()
.
The function then calls our external API, serializes the JSON data to python types and appends it to a list.
~/deps.py
def get_catalog(board: Board) -> List[CatalogThread]:
url = f'https://a.4cdn.org/{Board[board]}/catalog.json'
data = requests.get(url).json()
all_posts = []
for page in data:
for thread in page['threads']:
'''attach board to thread'''
thread['board'] = board
all_posts.append(CatalogThread(**thread))
return all_posts
Redis CRUD operations - lpush and lrange
As an introduction, let's go over how we could perform some CRUD operations without RedisJSON first.
Now that we have code to call our external API, we can begin to determine how we will insert it into redis.
~/db.py
from redis_om import get_redis_connection
from deps import get_catalog
from schemas import Board
import redis
import os
from redis.commands.json.path import Path
from redis.commands.search.query import Query
from redis.commands.search.indexDefinition import IndexDefinition, IndexType
from redis.commands.search.field import (
NumericField,
TagField,
TextField
)
from typing import (
Tuple,
Union,
Optional,
List
)
r_conn = get_redis_connection()
KEY = Board.wg.value
class Lpush_Lrange(object):
def create_lpush(self):
data = get_catalog(KEY)
for catalog_model in data:
r = r_conn.lpush(KEY, catalog_model.json())
print(r)
def read_lrange(self):
posts = []
#this
end = r_conn.llen(KEY)
# or this to get all records
# end = -1
for item in r_conn.lrange(KEY, start=0, end=end):
d = json.loads(item)
posts.append(CatalogThread(**d))
print(posts)
print(len(posts))
def delete_all_data(self):
print('count before: ', r_conn.dbsize())
r_conn.flushdb()
print('count after: ', r_conn.dbsize())
assert r_conn.dbsize() == 0
if __name__ == "__main__":
lpr = Lpush_Lrange()
lpr.read_lrange()
lpr.delete_all_data()
Create and Update
In this example create
will be done using the same function.
Due to the way we structured the field name when we run the update_db
function the key is updated at the given JSON path. In this case chan:<post number>
.
https://redis.io/docs/stack/json/path/
This solution keeps us from having to write extra code to check if a prefix exists before updating.
We can have this code execute on a schedule and know it will be updated in place.
r_conn = get_redis_connection(encoding='utf-8', decode_responses=True)
class JsonSearch(object):
URL = os.environ.get("REDIS_OM_URL")
def __init__(self, key: str):
if not self.URL:
raise ValueError('Dont forget to set your REDIS_OM_URL environment variable!!')
self.KEY = key
self.idx_prefix = [f"{self.KEY}:"]
self.idx_definition = IndexDefinition(prefix=self.idx_prefix, index_type=IndexType.JSON)
def update_db(self):
data = get_catalog(Board.pol.value)
for catalog_model in data:
r = r_conn.json().set(
f'{self.KEY}:{catalog_model.no}',
Path.root_path(),
{self.KEY: catalog_model.dict()}
)
print(r)
if __name__ == "__main__":
KEY = 'chan'
js = JsonSearch(KEY)
RediSearch with RedisJSON
https://redis.com/blog/getting-started-with-redisearch-2-0/
The first requirement for search is to create a RediSearch index.
To create an index, you must define a schema to list the fields and their types to be indexed.
This will be used in your queries.
def create_idx(
self,
schema: Tuple[Union[TextField, TagField, NumericField]]
) -> None:
try:
r_conn.ft().create_index(schema, definition=self.idx_definition)
except redis.ResponseError as err:
print(err)
r_conn.ft().dropindex(delete_documents=False)
r_conn.ft().create_index(schema, definition=self.idx_definition)
def search_any(self, text: str, as_name: str):
query = Query(text).return_field(as_name).highlight().summarize()
search_results = r_conn.ft().search(query)
for i in search_results.docs:
print('--'*34)
print(getattr(i, as_name))
print('=='* 39)
print('Summary')
print('Redis Args: ', query.get_args())
print('total: ', search_results.total)
if __name__ == "__main__":
KEY = 'chan'
schema = (
TextField(f"$.{KEY}.com", as_name='op_com', weight=5.0),
TextField(f"$.{KEY}.country_name", as_name='country_name', weight=1.0)
)
js = JsonSearch(KEY)
js.update_db()
js.create_idx(schema)
js.get_idx_info
js.search_any('USA', 'country_name')
Let's add some more methods to implement full text search on the comments
and country_name
fields, in our pydantic model.
Creating the Index Schema
schema = (
TextField(f"$.{KEY}.com", as_name='op_com', weight=5.0),
TextField(f"$.{KEY}.country_name", as_name='country_name', weight=1.0)
)
A pre-requisite for full text search is creating an index for the specific key we want to search.
Let's create one on com
and country_name
.
The as_name
field creates an alias for that specific field which will used to reference that field in search_any()
method.
Writing a Query.
The final step is writing our query.
There are a few options for writing a query using redis-py
.
def search_any(self, text: str, as_name: str):
query = Query(text).return_field(as_name).highlight().summarize()
search_results = r_conn.ft().search(query)
for i in search_results.docs:
print('--'*34)
print(getattr(i, as_name))
print('=='* 39)
print('Summary')
print('Redis Args: ', query.get_args())
print('total: ', search_results.total)
You will notice a few methods attached to the Query
class.
.return_field(as_name)
This methods returns the specific field from the query match, as opposed to returning all fields in the document.
.highlight()
This is a nice feature if your implementing a search field in a front-end application.
It adds HTML <b>{search term}</b>
bold tags around all the matches in the returned text, allowing for an easier time displaying results in a frontend application.
.summarize()
This allows you to shorten the result of the returned text.
You can set the number of start and end words returned around your search term.
Conclusion
Redis is a very fast database with many uses. As someone who uses MongoDB on a daily basis, it has all the similarities with the added benefit of extra speed.
Feel free to use the links below to see for yourself.
References
Watch this video on the benefits of Redis Cloud over other Redis providers
Redis Developer Hub - tools, guides, and tutorials about Redis
Top comments (0)