(Cover image (C) Tai Kedzierski)
Recently I took to re-writing something I did in a previous job, as I wanted to retain the knowledge I had gained.
In that job, we had a hardware system that had several data streams we needed to be sure we were not losing any data from, as well as several pieces of hardware control we'd need to actuate simultaneously. We'd receive stream input on one stream, send commands through another, change the hardware state, and check back on the streams to see evidence that the intended effect had taken place.
The particular part I feel I learned the most from was the implementation of the stream control: they had to be always reading from at least a serial line, as well as being able to follow live logs accessed over an SSH connection.
And so the Threaded Streams component was born.
I will not be going into the nitty gritty of how that was implemented - you can see the source code on Github.
What I wanted to do in this post is call out a few things that I learned along the way.
Scopes of Concern
The first thing I want to touch on is the importance of separating out scopes of concern: multi-threading is enough to manage on its own, without having to burden your calls - and your mental model - with additional baggage.
The design of the threaded streams was to provide a thread-safe mechanism for accessing an IO stream, whilst being able to support different types of transport layers.
One could think "OK, put your generic stream stuff in one class, and put your TCP/serial/SSH stuff in subclasses." That's a pretty fundamental separation which happened early on - but there's another layer. When you think about "generic stream stuff" you're actually missing out on the thread-safety part. If you lump both the thread safety aspect and the stream reading API into one, you're burdening a single class with two scopes of responsibility: being thread safe and being pleasant to use. that's not to say that being thread safe doesn't need to be easy (it is a must!) but it should be a given. The niceness relates to conveniences like "read a line" or "peek at the buffer" or "read until condition X is found".
So the inheritance tree is actually like this:
Thread
|
+- ThreadedStream
|
+- IOStream
|
+- TCP implementor
+- Serial implementor
+- SSH implementor
IOStream
implements the nicey-nicey read_line()
and read_until()
API that makes working with streams easy ; the ThreadedStream
provides the thread-safety on the base read()
and write()
operations. The latter are backed by the implementors' _raw_read()
and _raw_write()
methods which, beyond their primary and obvious functionalities, have an extra requirement: they MUST be non-blocking.
For serial, this was easy - the library is naturally non-blocking. For TCP, there is a mode that can be set, and it needs managing. For SSH, some exceptions need managing in not-ready states. If you inspect the three implementors' files, you'll see that, once comments are stripped out, each is extremely simple. They are concerned with the busy-work of moving bytes around - not with being nice (IOtream
's job) or knowing about threading (ThreadedStream
's purview). Similarly, IOStream
has an easy time of implementing a single read_until
method, which the read_line
methods can rely on, provide the concept of line separators, and suchlike; and ThreadedStream
can focus on the semi-complex task of managing the underlying buffer in a safe manner.
Limiting the scope of concern of your classes is paramount to staying sane - and to extending the implementation in the future.
A quick note on testability
It also pays in terms of providing testability. The actual stream implementors are easy enough to write and to read - but verifying that the logic of the threading and the higher-level reading is working is much more important as it underpins the entire project. Being able to move as much as possible out of the transport implementation means I was able to implement a rudimentary "faux" stream, which provided its own little inner buffer to mock out a network line.
So whilst mocking out endpoints for the actual transport implementations is an exercise in itself, the faux transport implementation requires no additional setup, and allows verifying the implementation quickly and easily.
Slowing down speeds you up
When running the basic tests through pytest , I found that pytest itself was seeming to slow down the execution of the library significantly - at one point, I had even trouble halting the test, as pressing Ctrl+C
resulted in the stack trace being printed in not lines per second, but seconds per line. Surely, pytest is not so much of a hog? I had a minor epiphany about using a unit testing framework on which to build an integration testing utility, and questioned that a whlie, and whether having a deeply-introspecting framework would have a detrimental effect on our runtime...
But really, the problem was the execution loop. My execution loop - pure and simple. It was neurotic. I figured out only after a while is that when you are looping through an operation infinitely, it pays to not tie-up the processor with pointless obsessive checks on the sockets at every available clock cycle. The lower-level socket stack itself maintains a buffer, large enough for our process to only query it seldomly - and by "seldomly" I mean "every thousand clock cycles or so" which, at 1.7 GHz (for an entry-level processor) is still around between 10e8 to 10e9 times per second.
This is where the tick
parameter comes in. It is set to a sluggish 10e6, which becomes internally a wait time of 1/10e6
between each execution of the ThreadedStream
main loop. And this, dear reader, is what frees up the CPU. Slowing down the loop makes the entire system work much faster. I like it as an analogy for being efficient: pause to breath and relax every so often, and you'll operate much more efficiently.
For all the hooks pytest seems to add to the code, they became undetectable from a runtime performance point of view. Now that the processor wasn't at 200% load, everything ticked over fine.
Use Context-Managed Re-entrant Locks
The next item I want to touch on concerns thread safety basics. Multi-threading is easy when you have a simple use case. You can easily code yourself into an unmanageable situation if you don't carefully think about what functions get called by which thread of execution. It gets especially hairy if you don't manage your locks properly.
For the latter problem, python's context management mechanism is particularly helpful. If you place under a with self.lock:
block any code that tries to access a thread-sensitive resource, you can be guaranteed that your lock will be released once you're done with it (just be very careful with locks in loops). You may even have a higher-abstraction function that wants to have a lock on the resource which lower-level functions also need to have a lock on - this happens notably in IoStream.read_until(...)
. It wants to perform a unified action that comprises several operations which each acquire the lock for their own safety. Normally this would a problem, since the read_until()
function hypothetically has the only lock, and its child would be waiting to acquire the lock in turn. Re-entrant locks allow the same thread to re-acquire the same lock to break the deadlock (pun not intended), with the caveat that each must remember to release the lock too, which context-management helps ensure.
But remember, we must not allow our reading loop to itself be neurotic. Instead of implementing another wait tick for this function (which I initially did - oopsie), instead we piggy back on the fact that, we're not interested in re-reading the buffer until it has at least had a chance to be updated. We use an Event object for this, with the main read()
in ThreadedStream
signalling to any waiting thread that, yes you lot I might have added something, come get it. Our read_until()
operation having Event.wait()
-ed proceeds, thus also piggy-backing the reader thread's tick.
Working Around Data Loss
One thing that I wanted to do this time around, which I hadn't been able to puzzle in my original work, was dealing with errors. Sometimes you get a read error, some exception... maybe your attempt to read_until()
failed because the target item was not found. But maybe the target was not really necessary... and you still want to know what the accumulated data was up until the error.
Well. This feels like a shim, and I am not sure how happy or ashamed I am of this. But it is definitely a use-case I have had need for. My predecessor's technique was to expose the buffer for direct access. My technique is... to attach a copy of the buffer data to the exception :grimace:
Both the IoStream
and the ThreadedStream
perform an action on their read and write operations which, upon encountering an error, capture it, attach copies of the input and output buffers to it (without consuming them), then re-raise it. This allows the calling context to catch the error, and decide whether to process the received data before bailing (the IoStream
error is non-fatal and the stream can continue to be used - the ThreadedStream
error is fatal and will have killed the thread).
In this way, error control flow is retained, whilst also attempting to ensure maximum data retention as well.
Multi-threading is easy
No it isn't. Not necessarily at least. But separating out your concerns, ensuring your abstractions are testable, and managing your locks and events properly, goes some way to de-complexifying the problem. I think.
I'll get back to you once I finish the game loop on the other project.
Top comments (2)
Hey, really interesting article! Had fun reading it. Thanks for writing ð
Looking forward to the other article you mentioned at the end
Interesting article as always, Tai! ð