Right, so last time I walked through how I have programmed and wired some ESP8266 microcontrollers to collect temperature, humidity and soil moisture data and beam that data to a specified address on the network.
In this post, I'll walk through the code to run a server on something like a laptop or a raspberry pi to collect the incoming data and write it to a sqlite3 database for later analysis.
This post will cover topics like threading, more advanced socket management, and interfacing with a database from Python.
The full, versioned code is available on Github. Note that link is fixed and will not update as I improve--for that, go Here and check out the Greenhouses project.
I'll be pulling snippets from that code into this post, but to see it all in context you should look at the link.
Quick Rehash
So, remember, my goal for this project is to have a fleet of environmental sensors deployed throughout a greenhouse to sample data and send it to a central location where I'll make control decisions (e.g., turn on the heater, open a vent, water tray A, etc). We need our server architecture to support multiple sensors simultaneously, as well as collating the every-two-second readings into an average over a minute or so.
Step One: Get The Data Off the Network
For a moment, let's pretend we're only expecting a single client sensor to talk to us. We'll get to dealing with multiple sensors later, but then we have to deal with threads and locking and it's a gigantic mess.
So for now, one sensor.
Aside: Server socket programming
When you're a server, you generally have one socket/port open to listen for incoming clients--like a web server, listening on port 80 for new clients. The tricky thing is you don't want to do your primary communication with your clients on that socket, or you won't be able to use it to listen for more incoming clients.
Imagine a system named Bob goes to Google and asks to download some gigantic file, and Google responds with that file on the same socket it uses to accept new client connections. Then you go to Google, but can't get through because the 'line is busy' as Google offloads the file onto Bob, which takes forever--and that entire time, you can't talk to Google.
So, you need one socket on the 'public' port to initialize the connection, and then another socket to actually handle the communication on--kind of like an operator at a business taking incoming calls and then routing them to different lines. Kind of.
Back to the network...
So, we have a bunch of ESP8226s trying to open connections to a fixed IP and port--I'm going to say, arbitrarily, it's IP 192.168.1.5 and port 5555. It doesn't really matter, so long as the server and the microcontrollers agree and the port number is somewhere between 2048 and 60666 or so. (I'm guessing on those numbers, you may want to look them up).
Your server needs to listen on that address--the IP/port combination--for incoming connections. This is fairly easy:
serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
serversocket.bind(("192.168.1.5", 5555))
serversocket.listen()
Walking through this:
First we're creating a socket with some attributes. This is a socket of the AF_INET
family, which basically just means it's going to talk to the Internet (instead of another process on your system or some weird other networking protocol). This mostly just determines how the socket works under the hood, as well as the type of address we're going to give it.
This is also a socket of the SOCK_STREAM
type, which really just means the socket will have an internal buffer so we can 'stream' the data at a useful rate to ourselves.
Then, we bind the socket to an address. Note: If you run into difficulty with that line, make sure the IP you have there matches the IP of the system you're using. If they mismatch, you'll get an exception here (since you're trying to bind to an address you don't actually have).
After that, we set the socket to 'listen' mode.
With me so far?
(clientsocket, address) = serversocket.accept()
Alright--so now, we're saying "Hey, socket, listen for a new connection and when you get one, hand me a socket to talk to that new connection and let me know what the address is." This is a blocking call, which means our program will now stop until the new connection comes in. Obviously, this makes it rather complicated to do any sort of data processing while we just sit here and wait--but we'll fix that later.
If you're playing with this alongside a running microcontroller what you should see is the program launch and then hang as it waits for the incoming socket connection--if you're using my ESP8266 code it'll blink steadily as it acquires the wifi connection, then it'll blink more angrily as it makes the socket connection and then it should be solid blue with an intermittent blink as it writes to the socket.
Of course, we aren't actually reading any data from the socket yet! Let's fix that:
buffer = ""
while 1:
data = clientsocket.recv(128)
datastr = buffer + data.decode('utf-8')
split = datastr.split("\n")
buffer = split[-1]
print(split[:-1])
So, the workhouse here is the clientsocket.recv(128)
call. That asks for 128 bytes of data from the socket--this is also a blocking call, meaning execution will halt until we get the 128 bytes of data. When we get the data, we decode it from just a bunch of bytes into a string and slap it onto a buffer.
The buffer is necessary because 128 bytes might not be exactly the length of a single 'record' sent from the ESP8266--you may have all of record A and half of record B. In our 'protocol', newlines are used as boundaries between records, which means we have an easy way of telling when we have an entire record.
So, we add the data we've received to the buffer (in case we just got the second half of record B) and then we split the buffer on newlines, to separate the records. We put the last record from the split, which may be a partial, and shove it back in the buffer. Then we print the rest of the records (excluding the last one).
With this, you should be able to see the incoming data from a single ESP8266 sensor! Woo!
Now we need to work on scaling to multiple connections.
Step Two: Threading the Connections
As I've mentioned a couple times we're making use of some calls that block execution, making our program halt until some outside thing happens. That makes it hard to scale to large numbers of sensors, so we're going to have to introduce some threading.
If you aren't familiar, threading is a way for a single program to do lots of things at the same time--multiple 'threads of execution.' A single process can have multiple threads where the threads all share the same memory, which makes them faster than if you had multiple processes just trying to talk to each other. (It also makes it a lot easier to write.)
The tricky thing about threads is you can have race conditions , where two threads are trying to access the same resource simultaneously. This typically leads to strange--and difficult to reproduce--behavior, so you have to be very, very careful to manage your access to shared resources when using threads to prevent this, or debugging it will be a living hell.
The general architecure we're going to build will look like most simple server models. We'll have a server thread that'll listen on port 5555 and accept incoming connections. Those connections will get moved to a different socket on a different port on a different thread, so the server thread can continue waiting for things.
Python provides a helpful threading library, thankfully. There's a couple ways you can write threads--You can either make a new Thread.Thread
object and pass in the function that thread will run as an argument, or you can make a class that derives from Thread.Thread
and just override the run
method. I chose the latter, since I wanted to have some saved state and that seemed a good way to do it. You can do whatever you like.
Accepting incoming connections
class Overseer(threading.Thread):
def __init__(self, threads):
super(Overseer, self).__init__()
print("Starting overseer thread...")
def run(self):
global threads_run
global threads
serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
serversocket.bind(("192.168.1.149", 5555))
serversocket.listen()
while threads_run:
(clientsocket, address) = serversocket.accept()
l = Listener(clientsocket, address)
threads.append(l)
l.start()
serversocket.close()
Most of this should look familiar, really. The important things here are the global threads_run
variable and the l = Listener(clientsocket, address)
with l.start()
at the bottom.
The Listener class is another thread class I made, which we'll get to in a moment.
Walking through:
The init method should be much what you expect. One catch, you must make sure to call the superclass init, or you'll get an exception when you try to .start()
the thread.
The while loop is keyed off of a variable shared between all the threads--and only modified in the original main thread. This makes it possible for us to cleanly stop all the threads from the original one and clean up our resources when we're done.
With the Overseer in it's own thread we can constantly block and wait for incoming connections without preventing our program from doing other things. This is the main idea and main value of threading.
Now, one flaw in this design is that if we toggle threads_run
while we're waiting for an incoming connection, we could just be waiting forever. I've been meaning to write an interrupt for that, but... later.
Also note how we're appending the new Listener thread to a global threads
list--We'll come to that.
Listening to those incoming connections
class Listener(threading.Thread):
def __init__(self, clientsocket: socket.socket, address) -> None:
super(Listener, self).__init__()
print("Starting listener thread...")
self.socket = clientsocket
self.address = address
self.buffer = ""
def run(self):
global lock
global shared_list
global threads_run
while threads_run:
data = self.socket.recv(128)
datastr = self.buffer + data.decode('utf-8')
self.buffer = ""
split = datastr.split("\n")
self.buffer = split[-1]
split = split[:-1]
with lock:
shared_list.extend(split)
lock.notify()
self.socket.close()
This is where things start to get a little more complicated, but it's still pretty close to what we had before. Remember that the Listener threads get built and started in the Overseer thread, where they get their socket and their address.
Within the listener thread, while the global threads_run
is true, we constantly block and receive data from the socket. Again, since this is occuring in its own thread, the blocking here does not prevent us from executing in other threads.
We do the same processing on the received data that we did before... and then we have this weird with lock
thing.
So, remember how I was talking about how you have to be super careful when accessing shared resources when you're working with threading? This is one of those instances. Here, shared_list
is shared between all the running threads, and it's where they write the data they obtain from the network. To avoid collisions and weird behavior we use this lock
thing, which is a threading.Condition
class. In essence, the lock is a way for threads to ensure only one of them gets to touch a thing at a time. A thread has to acquire a lock, which only one thread can have at a time, and then it can do things, and then it has to release the lock for other threads, which block when they try to acquire the lock. This is handled nicely with the with
syntax in Python; when you with
with a lock, Python handles the acquiring and releasing of the lock. Do note that you have to manually call lock.notify()
though, to let other threads know that the lock is available. Note also that I only acquire the lock when I'm about to interact with the shared data, and I release it immediately after I'm done messing with the shared data--There's no point in making other threads wait for me while I process strings that they don't share, they could be doing something useful in that time, so I try to hold onto the lock for as short a time as possible.
Now, this is a super high level view of locks. Locks can get very complicated, especially if you end up with circular waits (where thread A is waiting on thread B, but thread B is waiting on thread A). Threading is a super complicated concept that I'm not going to go into in this post, but rest assured there's plenty of other reading on it if you're so interested.
Anyway, once you have the lock, you add stuff to the list and then release the lock, ensuring nothing weird happens with shared access. Cool? Cool.
Interacting with a Database
If you haven't used them before, databases are systems designed for the easy storage and retrieval of--you guessed it--data. They're used in virtually every aspect of software, from managing the users of blogs to the content of twitter to the access logs of sensitive physical areas. They're fairly complex structures that often require a server in their own right to run on.
For our purposes, we can make do with sqlite, a super lightweight database engine that just stores all the data in a file near your program. It's trivially easy to set up and interface with in Python.
Now, I'll explain what I'm doing with the SQL commands I use here, but this isn't really a database usage tutorial so much as it is an interfacing-with-them tutorial.
Cool? Cool. Onwards!
class DBWriter(threading.Thread):
def __init__(self):
super(DBWriter, self).__init__()
print("Starting DB writer thread...")
def run(self):
global threads_run
global lock
global shared_list
connection = sqlite3.connect("greenhouse.db")
connection.execute("CREATE TABLE if not exists readings "
"(source text, "
"airtemp real, "
"humidity real, "
"soil_moisture int, "
"timestamp text);")
while threads_run:
time.sleep(60)
to_load_into_database = []
with lock:
if len(shared_list) > 0:
to_load_into_database = list(shared_list)
shared_list.clear()
print(f"Condensing {len(to_load_into_database)} records...")
by_source = defaultdict(list)
record = namedtuple('SensorRecord', ['air_temp', 'air_hum', 'soil_moisture'])
for line in to_load_into_database:
line = line.split(';')
mac = line[3].strip().split(" ")[1]
by_source[mac].append(
record(
float(line[1].strip().split(" ")[1]),
float(line[2].strip().split(" ")[1]),
int(line[0].strip().split(" ")[1]),
)
)
for key in by_source:
list_len = float(len(by_source[key]))
avg_temp = 0
avg_hum = 0
avg_soil = 0
for record in by_source[key]:
avg_temp += record.air_temp
avg_hum += record.air_hum
avg_soil += record.soil_moisture
avg_temp /= list_len
avg_hum /= list_len
avg_soil /= list_len
timestamp = datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")
values = f" ('{key}', {avg_temp}, {avg_hum}, {avg_soil}, '{timestamp}')"
print(f"Writing {values} to DB")
connection.execute(f"INSERT INTO readings VALUES {values};")
print("Sleeping...")
print("DB writer thread exiting.")
connection.commit()
connection.close()
"Aaah! That's a lot of code! What the hell, Jake, it was going so smoothly?!"
It's fine! I promise! Really!
We'll break it down into pieces. Let's look at the setup:
class DBWriter(threading.Thread):
def __init__(self):
super(DBWriter, self).__init__()
print("Starting DB writer thread...")
def run(self):
global threads_run
global lock
global shared_list
connection = sqlite3.connect("greenhouse.db")
connection.execute("CREATE TABLE if not exists readings "
"(source text, "
"airtemp real, "
"humidity real, "
"soil_moisture int, "
"timestamp text);")
while threads_run:
time.sleep(60)
So, the __init__
does... pretty much nothing useful.
At the start of run, we global some things that we share, and we open a connection to the database, giving it the filename we want the database to live in.
"Wait, Jake," you say. "Why wouldn't you move that connection up into __init__
? Isn't it... You know... Initialization?"
You would be very correct! And I tried that. Turns out, sqlite3
objects can only be used in the thread they're created in, and, technically speaking, the __init__
function is executed from a different thread than the run
function is, since __init__
is called when the thread object is built, but run
is only called after you .start()
the thread. So, you have to make the object after the threads have diverged.
Once we have the connection to the database we make sure the table we're going to use exists. If we don't have the if not exists
phrase there, we'll get a SQL error when we try to create an already-existing table. We then provide the name and type pairs of all the columns we want in the database:
- source, text: this will be the MAC address of the sending sensor. I'll use this later to figure out which sensor is where in the greenhouse so I can actually understand the incoming data, and not have to guess where it's coming from.
- airtemp, real: temperature is a continuous value, not an integer. Especially once we average it over the period a sensor has been sending, and since the temps will be in celcius, an integral value is so very wide and imprecise compared to a real one.
- humidity, real: See previous
- soil_moisture: So this is a bit strange, since it's a percentage. Technically, the average could also be a real, but I got lazy and made this an int. Sue me.
- timestamp, text: We're going to timestamp all the data, otherwise we won't know when a reading was taken, but there isn't really a consensus on how timestamps should be stored in databases. I'm doing text since it's the most flexible format.
With all that set up, we can actually enter our while loop. This thread naps for a while, since we need to wait for the data we're going to aggregate to come in.
Now for the meat of the aggregation:
to_load_into_database = []
with lock:
if len(shared_list) > 0:
to_load_into_database = list(shared_list)
shared_list.clear()
print(f"Condensing {len(to_load_into_database)} records...")
by_source = defaultdict(list)
record = namedtuple('SensorRecord', ['air_temp', 'air_hum', 'soil_moisture'])
for line in to_load_into_database:
line = line.split(';')
mac = line[3].strip().split(" ")[1]
by_source[mac].append(
record(
float(line[1].strip().split(" ")[1]),
float(line[2].strip().split(" ")[1]),
int(line[0].strip().split(" ")[1]),
)
)
So we make an empty list that we'll copy the shared data into, then we grab the lock and empty the shared list into our local one. Then we release the lock so the other threads can write to the shared lit.
Once that's done, we start pulling the data out of the string and loading it into a namedtupe, which is really just a super efficient (and immutable) dictionary. We also group the data by the sensor that sent it.
Now we have data, grouped by sending sensor, that we need to aggregate.
for key in by_source:
list_len = float(len(by_source[key]))
avg_temp = 0
avg_hum = 0
avg_soil = 0
for record in by_source[key]:
avg_temp += record.air_temp
avg_hum += record.air_hum
avg_soil += record.soil_moisture
avg_temp /= list_len
avg_hum /= list_len
avg_soil /= list_len
timestamp = datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")
values = f" ('{key}', {avg_temp}, {avg_hum}, {avg_soil}, '{timestamp}')"
print(f"Writing {values} to DB")
connection.execute(f"INSERT INTO readings VALUES {values};")
connection.commit()
This is pretty much just math. We aggregate all the readings by the sensor that sent them and build a timestamp in what's essentially ISO format, but without milliseconds.
Once that's done, I build a string that the 'values' component of an SQL insert, using Python's wonderful format string syntax ( in Python 3.6 and above, yay!). Then we execute the insert statement and commit the change to the database.
Make sure, once the while
in the run
of the DatabaseWriter exits, that you close the connection! Weird things can happen if you don't.
That's great, but what does the main look like?
def main():
global threads_run
global shared_list
global threads
threads = []
threads_run = True
threads.append(Overseer(threads_run))
threads.append(DBWriter())
threads[0].start()
threads[1].start()
while threads_run:
instr = ""
while instr != "stop":
instr = input("enter 'stop' to stop: ")
threads_run = False
print("Waiting for threads...")
for thread in threads:
thread.join()
We start the Overseer and the DBWriter, and we make sure those are appended to the threads list. Then we start both of them. The Overseer starts accepting connections and spawning Listeners, and the DBWriter thread starts aggregating data.
Then we start a loop where we wait for input, so the user can gracefully stop the program. When they do, we wait for all of the threads to halt with thead.join
and then we exit our program.
There you go! That's the meat of my server! Keep in mind that this isn't by any means the "best" way to do this, and it is absolutely flawed. Keep an eye on the git repo to get updates!
Top comments (0)