Welcome to my attempt at learning Python library functions at lightspeed! I already know the Python language itself, and I write in it a lot, but there are still many unknown modules out there. I have discovered that reading the test cases for the languages themselves, as well as sharing them with you all here to retain my memory.
Today I'm going to take a jab at the multiprocessing
Python module. Specifically I'm going cover:
- manager
- pool
- connection and the authentication challenges
Now lets begin.
Manager
In a nutshell, manager holds inter-process synchronization primitives. From the Python documentation:
A manager object returned by Manager() controls a server process which holds Python objects and allows other processes to manipulate them using proxies.
Manager objects give us an arsenal of synchronization primitives (which I will hereby abbreviate as 'syncprims') to play with. We get the usual locks, semaphores and barriers, plus, we get thread-safe Queue, Value and Array data structures too. These work like the normal non-threaded types you're used to (and Value is equivalent to a single variable). Manager objects can even be accessed by Python processes running across networks.
Here is an example of Queue usage taken directly from the multiprocessing test suite:
@classmethod
def _test_queue(cls, obj):
assert obj.qsize() == 2
assert obj.full()
assert not obj.empty()
assert obj.get() == 5
assert not obj.empty()
assert obj.get() == 6
assert obj.empty()
def test_queue(self, qname="Queue"):
o = getattr(self.manager, qname)(2)
o.put(5)
o.put(6)
self.run_worker(self._test_queue, o)
assert o.empty()
assert not o.full()
Key takeaways from this code snipplet:
-
manager
has a method calledQueue()
which is an address (proxy) pointing to shared queues (yes, queues) managed by the manager object, although the presence ofQueue()
is less obvious because the function is returned bygettattr
which is called with an argumentqname
(which itself is"queue"
). It is possible to callQueue()
multiple times to create multiple shared queues. - the
manager.Queue()
takes an argument that is the maximum size of the queue, hence the extra(2)
. (Remember this snipplet is located in the multiprocessing module so all names here would be qualified bymultiprocessing
.) - the
o.get()
ando.put()
calls putvalues
on the queue. The values can have different types. I could have as well wrote:
o.put("foo")
o.put(6)
Pool
Pools are used for offloading tasks to worker processes. Here is a slightly edited rudimentary example of Pool usage:
def sqr_wait(x, wait=0.0):
time.sleep(wait)
return x*x
def mul(x, y):
return x*y
# ...
@classmethod
def setUpClass(cls):
super().setUpClass()
cls.pool = cls.Pool(4)
@classmethod
def tearDownClass(cls):
cls.pool.terminate()
cls.pool.join()
cls.pool = None
super().tearDownClass()
def test_apply(self):
papply = self.pool.apply
self.assertEqual(papply(sqr_wait, (5,)), sqr_wait(5))
self.assertEqual(papply(sqr_wait, (), {'x':3}), sqr_wait(x=3))
def test_map(self):
pmap = self.pool.map
self.assertEqual(pmap(sqr_wait, list(range(10))), list(map(sqr_wait, list(range(10)))))
self.assertEqual(pmap(sqr_wait, list(range(100)), chunksize=20),
list(map(sqr_wait, list(range(100)))))
def test_starmap(self):
psmap = self.pool.starmap
tuples = list(zip(range(10), range(9,-1, -1)))
self.assertEqual(psmap(mul, tuples),
list(itertools.starmap(mul, tuples)))
tuples = list(zip(range(100), range(99,-1, -1)))
self.assertEqual(psmap(mul, tuples, chunksize=20),
list(itertools.starmap(mul, tuples)))
Notice the setUp()
and tearDown()
methods above. This is also an excerpt from a test case. Also to make it clear, the tuples
variable in the starmap test are lists which have values [[0, 9], [1, 8], ..., [9, 0]]
and [[0, 99], [1, 98], ..., [99, 0]]
recpectively. The zip()
call is for putting the lists into this format.
The main points here are:
- Pool has three different types of apply methods,
apply
,map
, andstarmap
. All three functions also have async (non-blocking) variants. The ones I just displayed are blocking variants.
(There is also imap
which is a more efficient version of map
for long lists, and imap_unordered
which returns the results in arbitrary order).
apply
runs the function on only one worker. Second parameter is a tuple of normal arguments, third parameter is a dictionary of keyword argumentsmap
splits up the list, prepares function calls for each value of the list and distributes the function calls across all workers. The called function must support only one argument.starmap
is likemap
except the list has tuples, each tuple contains arguments that will be passed to the function.
You'll probably get away with passing a list of different-sized tuples to starmap
as long as the funciton being called has default values for the arguments not specified.
Pool
could prove useful if you have a numpy/scipy computation that needs to run in parallel across multiple threads.
Connection
Last, we take a look at the networking multiprocessing
can do for us.
The Python docs have this summary for Connection:
Connection objects allow the sending and receiving of picklable objects or strings. They can be thought of as message oriented connected sockets.
So basically, consider this as a way to cheaply send objects across Python processes.
Normally, Pipe()
is used to create a pair of Connection
s. However, here we will look at the more powerful connection
submodule which also supports sockets, Windows named pipes and of course across the internet.
You make listeners by calling connection.Listener()
, and to connect some other Python process to a listener, call connection.Client()
.
For security, most likely if your object has sensitive data which you will send across the internet, you can set up the listener to accept a challenge, which you can imagine is similar to a private key, from the client, or the other way around (doesn't matter who sends first).
This is done by calling connection.deliver_challenge(connection, authkey)
. The challenge is answered by connection.answer_challenge(connection, authkey)
. The authkey is supposed to be a byte string as stated on the python docs, but I'm not sure how to use it, so if you do, please answer the Stack Overflow question I just made here.
Surprisingly, there are no test cases for deliver_challenge
or answer_challenge
. TODO: remind me to write some for them. This is the best I could find:
@classmethod
def _listener(cls, conn, families):
for fam in families:
l = cls.connection.Listener(family=fam)
conn.send(l.address)
new_conn = l.accept()
conn.send(new_conn)
new_conn.close()
l.close()
@classmethod
def _remote(cls, conn):
for (address, msg) in iter(conn.recv, None):
client = cls.connection.Client(address)
client.send(msg.upper())
client.close()
And we're done
I hope you enjoyed this 5-minute lightspeed of Python functions, and learned a lot more about it! I didn't test any of the code I wrote here (I learn passively not by typing things in a REPL console), so if I made a mistake here let me know in the comments.
Top comments (0)