DEV Community

SSK
SSK

Posted on • Edited on

1

How to use PyZMQ / PUB-SUB /PUSH-PULL

Hello world, Let's get into the topic, The Best way to Use pyZMQ.
A few days ago I started using PyZMQ from the blog tutorial in dev.to,
and also I found some code online with flask optimized I thought of reusing, I have reused the code and it worked fine, after that, I started coding for other functions when testing I found that the method PUB-SUB in PYZMQ is not suitable for that task, I will explain about the task in the bottom.

This is how Messaging Queue Works:

Two things we need to remember
1.Client
2.Server

The Client sends a message to the server.
The Server process the message(DB actions or Sending emails etc... )

Let's code

client.py

#importing ZMQ
import zmq
#Creating a context
context = zmq.Context()

#creating and connceting to a socket.
socket = context.socket(zmq.PUSH)
socket.connect('tcp://localhost:5555')

#function that sends data to the server
def send_data(data):
    # we're sending string to the server as of now..
    socket.send_string(f'{data}')

    #socket.send_json({"data":[1,2,3,4]})
    #you can send lot of things, check out ZMQ official docs

#Closing connection(We don't need this one, EXTRA).
def exit():
    socket.close()
    context.term()

while 1:
#getting data from user to send to there server
    send_data(input("Enter data to send: "))
exit()
Enter fullscreen mode Exit fullscreen mode

Server.py

import time
#importing
import zmq
#creating context
context = zmq.Context()
socket = context.socket(zmq.PULL)
socket.bind('tcp://*:5555')
#Creating poller
poller = zmq.Poller()
poller.register(socket, zmq.POLLIN)

while True:
    socks = dict(poller.poll())
    #if any new message in poller it will PULL that message form the poller.
    if socket in socks:
        message = socket.recv_string()

        #message = socket.recv_json()
        #you can reccieve lot of things, check out ZMQ official docs
        print("sleeping 20 seconds")
        time.sleep(20)#or Call some other function to change or modify
        print(message)

        #If you're using SQLAlachemy
        #session.commit() or db.session.commit()

Enter fullscreen mode Exit fullscreen mode

for saving and sharing links online..

checkout sharely.in

The Tutorial ends here. You can stop here, if you want to.


Why and when I used messaging Queue:-

I'm working in a project which is transferring a task to the like-minded people, It fetches all the like-minded users with some filters and sends this task to those targeted users,
So we decided to do this in with ZMQ I did that, but suddenly I found that when doing one task it rejects/not capturing all the triggering tasks at that particular time. I have the PUB-SUB model in ZMQ.
The code for that one is(Correct me if I'm wrong)
sender.py

# import time
# import zmq

# HOST = '127.0.0.1'
# PORT = '6666'

# _context = zmq.Context()
# _publisher = _context.socket(zmq.PUB)
# url = 'tcp://{}:{}'.format(HOST, PORT)

# # socket =_context.socket(zmq.REQ)
# # socket.connect('tcp://{}:{}'.format(HOST, PORT))

# def publish_message(message):

#     try:
#         _publisher.bind(url)
#         # time.sleep(1)
#         # print(url , message)
#         _publisher.send_string(message)
#         # socket.send_string(message)

#     except Exception as e:
#         print ("error {}".format(e))

#     # finally:
#         # _publisher.unbind(url)
#while 1:
    #publish_message('response..')
Enter fullscreen mode Exit fullscreen mode

server.py

# import sys
# import time
# import logging
# import os
# import zmq
# import time

# HOST = '127.0.0.1'
# PORT = '6666'

# logging.basicConfig(filename='subscriber.log', level=logging.INFO)


# class ZClient(object):

#     def __init__(self, host=HOST, port=PORT):

#         self.host = host
#         self.port = port
#         self._context = zmq.Context()
#         self._subscriber = self._context.socket(zmq.SUB)
#         # print ("Client Initiated")


#     def receive_message(self):
#         """Start receiving messages"""
#         self._subscriber.connect('tcp://{}:{}'.format(self.host, self.port))
#         self._subscriber.setsockopt(zmq.SUBSCRIBE, b"")

#         while True:
#             print("LISTENING:-")
#             # print( 'listening on tcp://{}:{}'.format(self.host, self.port))
#             self.message = self._subscriber.recv_string()
#             print(self.message)
#             logging.info(
#             '{}   - {}'.format(self.message, time.strftime("%Y-%m-%d %H:%M")))
#             # time.sleep(1)
#             self.result()


#     def result(self):
#         time.sleep(10)
#         print(self.message)


# if __name__ == '__main__':
#     zc = ZClient()
#     zc.receive_message()
Enter fullscreen mode Exit fullscreen mode

correct the code if it's wrong...

Image of Timescale

🚀 pgai Vectorizer: SQLAlchemy and LiteLLM Make Vector Search Simple

We built pgai Vectorizer to simplify embedding management for AI applications—without needing a separate database or complex infrastructure. Since launch, developers have created over 3,000 vectorizers on Timescale Cloud, with many more self-hosted.

Read more →

Top comments (0)

Postmark Image

Speedy emails, satisfied customers

Are delayed transactional emails costing you user satisfaction? Postmark delivers your emails almost instantly, keeping your customers happy and connected.

Sign up