TL;DR
The event loop is a thread that runs in an infinite loop, pulling the tasks, which are callback functions, from the queue and executing them one by one. If the queue is empty, it goes into an idle state waiting for a task to be added to the queue to be processed again. The implementation of the task can achieve concurrency by dividing the task into sub-tasks. When a sub-task is finished, put the next sub-task in the queue to let other tasks have a chance to execute. There is no need for thread synchronization like mutex and condition variable since there is only one thread (the event loop thread) that accesses the resource, and the context switching is in the programmer's control, unlike a multi-threaded that each thread gets CPU time depending on the implementation logic of the operating system. The implementation code can be found in GitHub.
Process vs Thread
Before discussing processes and threads, let's understand the operating system. As shown in the picture below, the operating system is divided into three layers. Hardware, Operating System, and Application Programs. When we write a program. For example, "Hello world", it lives in the Application Programs layer. When the program gets executed, it runs on an operating system as a process. There are many processes running on your computer, and it is the job of the operating system to give each process access to the hardware, like the CPU, concurrently by switching processes to run on the CPU.
Each process has its own virtual memory, which is managed by the operating system, as shown in the following picture.
As can be seen, each process has its own virtual memory, so the communication between them can be done by transferring the data (a socket can be used). whereas threads share the same code and heap memory area, so each thread can access all the memory area except the other thread's stack memory area. When a process creates a new thread, it goes and runs on its own, and the operating system treats it like a process. The difference is that each thread that was created by the same process shares the same virtual memory
Concurrency and Parallelism
Parallelism means that your thread gets executed at the same time. While concurrency means that your thread gets executed one after the other by switching the execution between the threads, in the eyes of humans, the two threads seem like happening at the same time because both threads have progress. The illustration is shown as follows.
When you program a multi-threaded program, it doesn't mean that your program will run in parallel. Your program will run in parallel only if there is no overlap, and you have more than one CPU. Overlap means each thread has to operate on the same data structure, and the operation cannot happen in parallel. For example, one thread wants to read the data structure, but another thread wants to write to it, which will make the data structure inconsistent, so we need to prevent this not happening at the same time by using a mutex and a condition variable to ensure that only one thread can access the data structure at a time. Note that if two threads want to read the same data structure, this can happen at the same time.
Multi-thread Programming
To demonstrate multi-thread programming. I will use the C programming language, but the concept applies to any programming language of your choice. The classic problem of multi-thread programming is the consumer-producer problem. The consumer thread pulls data from the data structure, and the producer thread puts data into that data structure. To synchronize between two threads, we need a mutex to lock the access to the data structure and a condition variable to send a signal between threads. The pseudo code is as follows. Let's say that the shared data structure is a queue.
queue
// consumer
lock(queue) (1)
while (queue is empty) { (2)
wait(until queue have data) (3)
}
pull data from queue (4)
unlock(queue) (5)
// producer
lock(queue) (6)
if (queue not full) { (7)
put data into queue (8)
signal(queue have data) (9)
}
unlock(queue) (10)
Let's see how each step works.
Consumer
- Step 1: We lock the queue so that other threads cannot access the queue.
- Step 2: We check the condition with a while loop instead of an if clause to prevent Spurious Wakeup. For example, after getting the signal from Step 3, some other thread might have already pulled the data from the queue, so we need to check the condition again after getting the signal.
- Step 3: If the condition from Step 2 is met, block the thread until it gets the signal to wake up. Behind the scenes, the operating system will unlock the queue to let other threads access the queue. After getting a signal from the producer on step 9, the consumer thread will move to the ready state, meaning that it will not get executed until the producer calls unlock on Step 10. Then, the operating system, behind the scenes, will lock the queue and hand the key to this consumer thread, allowing it to access the queue.
- Step 4: The consumer thread pulls data from the queue.
- Step 5: unlock the queue to let other threads access the queue.
Producer
- Step 6: The producer locks the queue to prevent other threads from accessing the queue.
- Step 7: Check the condition.
- Step 8: Put data into the queue.
- Step 9: Signal to the waiting thread that the queue has data. The signal is not queued, which means if there is no waiting thread, the signal is lost. So there is no need to check if there is a waiting thread.
- Step 10: Unlock the queue to let other threads access the queue.
Now let's see an implementation in C.
#include <stdio.h>
#include <assert.h>
#include <pthread.h>
#include <string.h>
#define QUEUE_SIZE 5
typedef struct queue_ {
int data[QUEUE_SIZE];
int front;
int rear;
int len;
} queue_t;
void queue_init(queue_t *q) {
memset(q, 0, sizeof(*q));
}
void queue_add(queue_t *q, int n) {
assert(q->len < QUEUE_SIZE);
q->data[q->rear] = n;
q->len++;
q->rear++;
if (q->rear >= QUEUE_SIZE) q->rear = 0;
}
int queue_remove(queue_t *q) {
int r;
assert(q->len > 0);
r = q->data[q->front];
q->front++;
q->len--;
if (q->front >= QUEUE_SIZE) q->front = 0;
return r;
}
int queue_is_full(queue_t *q) {
return q->len == QUEUE_SIZE;
}
int queue_is_empty(queue_t *q) {
return q->len == 0;
}
int gen_num() {
static int n = 0;
return n++;
}
queue_t q;
pthread_mutex_t mu;
pthread_cond_t not_empty;
pthread_cond_t not_full;
void *consumer(void *arg) {
char *name;
int n;
name = (char *) arg;
printf("%s: locking mu\n", name);
pthread_mutex_lock(&mu);
printf("%s: locked mu\n", name);
while (queue_is_empty(&q)) {
printf("%s: wait for queue not_empty signal\n", name);
pthread_cond_wait(¬_empty, &mu);
printf("%s: awake\n", name);
}
while (!queue_is_empty(&q)) {
n = queue_remove(&q);
printf("%s: remove num %d from queue\n", name, n);
}
printf("%s: drain all queue and send not_full signal\n", name);
pthread_cond_signal(¬_full);
printf("%s: unlocking mu\n", name);
pthread_mutex_unlock(&mu);
printf("%s: unlocked mu\n", name);
printf("%s: done\n", name);
return NULL;
}
void *producer(void *arg) {
char *name;
int n;
name = (char *) arg;
printf("%s: locking mu\n", name);
pthread_mutex_lock(&mu);
printf("%s: locked mu\n", name);
while (queue_is_full(&q)) {
printf("%s: wait for queue not_full signal\n", name);
pthread_cond_wait(¬_full, &mu);
printf("%s: awake\n", name);
}
while (!queue_is_full(&q)) {
n = gen_num();
queue_add(&q, n);
printf("%s: put number %d to queue\n", name, n);
}
printf("%s: filled all slot in queue and sending not_empty signal\n", name);
pthread_cond_signal(¬_empty);
printf("%s: unlocking mu\n", name);
pthread_mutex_unlock(&mu);
printf("%s: unlocked mu\n", name);
printf("%s: done\n", name);
return NULL;
}
int main(void) {
pthread_t c1, c2;
pthread_t p1, p2;
pthread_attr_t attr;
queue_init(&q);
pthread_mutex_init(&mu, NULL);
pthread_cond_init(¬_empty, NULL);
pthread_cond_init(¬_full, NULL);
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
pthread_create(&c1, &attr, consumer, "c1");
pthread_create(&c2, &attr, consumer, "c2");
pthread_create(&p1, &attr, producer, "p1");
pthread_create(&p2, &attr, producer, "p2");
pthread_join(c1, NULL);
pthread_join(c2, NULL);
pthread_join(p1, NULL);
pthread_join(p2, NULL);
printf("done\n");
return 0;
}
Serialize multi-thread to single-thread
Multithread programming is hard to get right. It might encounter a deadlock or livelock. And another reason is that creating a new thread consumes memory, so you cannot keep creating threads.
Example use case
Let's say that you have an application that has to listen to the network, accept input from the command line, and a timer that has a task to run in an interval.
As above picture, on the left side, each thread executes the function in its own context. If there is an overlap, it requires a mutex and a condition variable to synchronize between each thread. But on the right side, each thread pushes tasks to the event loop thread, which does not require thread synchronization because every task is executed on the event loop thread context.
The implementation of the event loop is shown as follows. glthread is just a doubly linked list data structure.
// event_loop.h
#ifndef __EVENT_LOOP__
#define __EVENT_LOOP__
#include <pthread.h>
#include "glthread.h"
typedef void (*fn_cb)(void *);
typedef struct task_ {
fn_cb cb;
void *arg;
glthread_node_t node;
} task_t;
typedef struct event_loop_ {
pthread_t thread;
pthread_mutex_t mu;
pthread_cond_t have_task;
glthread_t glthread;
task_t *current_task;
} event_loop_t;
void event_loop_init(event_loop_t *el);
void event_loop_run(event_loop_t *el);
task_t *event_loop_add_task(event_loop_t *el, fn_cb cb, void *arg);
void event_loop_cancel_task(event_loop_t *el, task_t *t);
#endif
// event_loop.c
#include "event_loop.h"
#include "glthread.h"
#include <assert.h>
#include <pthread.h>
#include <stdlib.h>
static void *event_loop_fn(void *arg) {
event_loop_t *el;
task_t *task;
el = (event_loop_t *) arg;
for (;;) {
pthread_mutex_lock(&el->mu);
el->current_task = NULL;
while (el->glthread.size == 0) {
pthread_cond_wait(&el->have_task, &el->mu);
}
task = tostruct(task_t, (&el->glthread), glthread_remove(&el->glthread, NULL));
el->current_task = task;
pthread_mutex_unlock(&el->mu);
task->cb(task->arg);
free(task);
task = NULL;
}
return NULL;
}
void event_loop_init(event_loop_t *el) {
el->thread = 0;
pthread_mutex_init(&el->mu, NULL);
pthread_cond_init(&el->have_task, NULL);
glthread_init(&el->glthread, offsetof(task_t, node));
}
void event_loop_run(event_loop_t *el) {
pthread_attr_t attr;
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
pthread_create(&el->thread, &attr, event_loop_fn, el);
}
task_t *event_loop_add_task(event_loop_t *el, fn_cb cb, void *arg) {
task_t *t;
t = malloc(sizeof(*t));
t->cb = cb;
t->arg = arg;
pthread_mutex_lock(&el->mu);
glthread_add(&el->glthread, &t->node);
pthread_cond_signal(&el->have_task);
pthread_mutex_unlock(&el->mu);
return t;
}
void event_loop_cancel_task(event_loop_t *el, task_t *t) {
pthread_mutex_lock(&el->mu);
if (el->current_task == t) return;
glthread_remove(&el->glthread, &t->node);
pthread_mutex_unlock(&el->mu);
free(t);
}
Usage
The following is an example usage of an event loop.
// main.c
#include <stdio.h>
#include <unistd.h>
#include "event_loop.h"
event_loop_t el;
int sum(int *nums, int size) {
int res;
int i;
res = 0;
for (i = 0; i < size; i++) {
res += nums[i];
}
return res;
}
int mul(int *nums, int size) {
int res;
int i;
res = 1;
for (i = 0; i < size; i++) {
res += nums[i];
}
return res;
}
typedef struct arg_obj_ {
int *nums;
int size;
} arg_obj_t;
void sum_fn(void *arg) {
arg_obj_t *arg_obj;
arg_obj = (arg_obj_t *) arg;
printf("sum=%d\n", sum(arg_obj->nums, arg_obj->size));
}
void mul_fn(void *arg) {
arg_obj_t *arg_obj;
arg_obj = (arg_obj_t *) arg;
printf("mul=%d\n", mul(arg_obj->nums, arg_obj->size));
}
int download = 0;
int upload = 0;
void download_fn(void *arg) {
(void) arg;
while (download < 100) {
download += 2;
printf("download: %d\n", download);
if (download % 10 == 0) {
event_loop_add_task(&el, download_fn, NULL);
return;
}
}
printf("download success\n");
}
void upload_fn(void *arg) {
(void) arg;
while (upload < 100) {
upload += 2;
printf("upload: %d\n", download);
if (upload % 10 == 0) {
event_loop_add_task(&el, upload_fn, NULL);
return;
}
}
printf("upload success\n");
}
arg_obj_t arg;
int nums[] = {1, 2, 3, 4, 5};
int main(void) {
arg.nums = nums;
arg.size = sizeof(nums) / sizeof(nums[0]);
event_loop_init(&el);
event_loop_run(&el);
sleep(1);
event_loop_add_task(&el, sum_fn, &arg);
event_loop_add_task(&el, mul_fn, &arg);
sleep(1);
event_loop_add_task(&el, download_fn, NULL);
event_loop_add_task(&el, upload_fn, NULL);
printf("main done\n");
pthread_exit(0);
}
Pitfall
As you can see the event loop approch is not require data syncronization between thread. But are there any pitfalls for the above implementation. There is a premature delete problem. Let's say there are two tasks, one is to print each field of the struct another is to delete that struct.
Let's say that the struct has 3 fields. The printing task was implemented to print one field, then push the rest sub-tasks to the event loop to allow other tasks to execute. And there is a delete struct task after the printing task. The tasks in the queue are as follows.
queue
| Print 1st field | Delete struct | Print 2nd field | Print 3rd field |
When the Delete struct was executed, the Print 2nd field will access the invalid memory area.
To solve this problem, you can modify the implementation of the event loop. Instead of having one queue of tasks, you can have multiple queues e.g., HIGH_PRIORITY, MEDIUM_PRIORITY, and LOW_PRIORITY. Your event loop will take a task from HIGH_PRIORITY first, and if that queue is empty, move on to the lower priority queue. Then push the Delete struct to the lower priority queue.
MEDIUM_PRIORITY
| Print 1st field | Print 2nd field | Print 3rd field |
LOW_PRIORITY
| Delete struct |
Top comments (0)