DEV Community

Saurav Shah
Saurav Shah

Posted on

RCU (Read, Copy, Update) in the Linux Kernel

RCU is a mechanism to synchronize between readers and writers and it is very efficient when there are many readers or the data to read is large. It is a lock free synchronization mechanism.
RCU Illustration

Topics Covered

Why do we need a synchronization mechanism?

Let's consider this simple example:

#include <linux/module.h>
#include <linux/list.h>
#include <linux/slab.h>
#include <linux/delay.h>
#include <linux/debugfs.h>
#include <linux/mm.h>


#define MODULE_NAME "kernel_rcu_example"
#define FILENAME    "val"

struct data {
    int a;
    int b;
};

struct data *ptr = NULL;
static struct dentry *gparent = NULL;

void update_data(struct data *old, int a, int b, u32 ms)
{
    if(!old)
        return;
    old->a = a;
    mdelay(ms);
    old->b = b;
    pr_info(MODULE_NAME ": Data updated\n");
}

ssize_t dbg_write(struct file *file, const char __user *buffer, size_t count, loff_t *off)
{
    char kbuf[100];
    int a, b;
    u32 ms;
    (void)off; //Ignore offset for simplification
    if(count >= sizeof(kbuf))
        return -EINVAL;

    if (copy_from_user(kbuf, buffer, count))
        return -EFAULT;

    kbuf[count] = '\0';  // Null-terminate
    if(sscanf(kbuf, "%d:%d:%u", &a, &b, &ms) != 3)
        return -EINVAL;
    update_data(ptr, a, b, ms);
    return count;
}

ssize_t dbg_read(struct file *file, char __user *buffer, size_t count, loff_t *off)
{
    char kbuf[100];
    size_t len;

    if (*off > 0)
        return 0;

    if(!ptr)
        return -EINVAL;
    if(sizeof(kbuf) >= count)
        return -EINVAL;
    len = scnprintf(kbuf, sizeof(kbuf), "a=%d,b=%d\n", ptr->a, ptr->b);
    if(len > count)
        return -EINVAL;
    if (copy_to_user(buffer, kbuf, len))
        return -EFAULT;
    *off += len;
    return len;
}

static const struct file_operations dbg_fops = {
    .write = dbg_write,
    .read = dbg_read,
};

static int debug_init(void)
{
    if (!IS_ENABLED(CONFIG_DEBUG_FS)) {
        pr_warn(MODULE_NAME": debugfs unsupported! Aborting ...\n");
        return -EINVAL;
    }
    gparent = debugfs_create_dir(MODULE_NAME, NULL);
    if (!gparent) {
        pr_err(MODULE_NAME": debugfs_create_dir failed, aborting...\n");
        return -EINVAL;
    }

    if(!debugfs_create_file(FILENAME, 0660, gparent, NULL, &dbg_fops))
    {
        pr_err(MODULE_NAME": debugfs_create_file failed, aborting...\n");
        goto out_fail;
    }
    return 0;
out_fail:
    debugfs_remove_recursive(gparent);
    return -EINVAL;
}

static int __init kernelrcu_init_module(void)
{
    pr_info(MODULE_NAME ": %s\n",__func__);
    debug_init();
    ptr = (struct data*)kzalloc(sizeof(struct data), GFP_KERNEL);
    update_data(ptr, 1, 2, 0);
    return 0;
}

static void __exit kernelrcu_cleanup_module(void)
{
    pr_info(MODULE_NAME ": %s\n",__func__);
    if(ptr)
        kfree(ptr);
    if(gparent)
        debugfs_remove_recursive(gparent);
}
module_init(kernelrcu_init_module);
module_exit(kernelrcu_cleanup_module);
MODULE_LICENSE("GPL");
Enter fullscreen mode Exit fullscreen mode

The code example has a pointer ptr of type struct data * which has two members a and b. update_data updates this structure and it takes an argument ms, which is the time in milliseconds to sleep before updating the field b. We also create a debugfs entry /sys/kernel/debug/kernel_rcu_example/val which can be read to get the current value of ptr. You can also write to this file in the format a:b:ms to update the value and sleep ms before updating field b.

Now let's deploy this module.

$ insmod kernel_rcu_example.ko
$ dmesg
[1398227.954799] kernel_rcu_example: kernelrcu_init_module
[1398227.954804] kernel_rcu_example: Data updated
Enter fullscreen mode Exit fullscreen mode

Let's print the value of ptr.

$ cat /sys/kernel/debug/kernel_rcu_example/val
a=1,b=2
Enter fullscreen mode Exit fullscreen mode

Let's update the value now. Add a sleep of 5 seconds before updating the field b.

$ echo "5:6:5000" > /sys/kernel/debug/kernel_rcu_example/val
Enter fullscreen mode Exit fullscreen mode

Before this completes, open another terminal and print the value.

$ cat /sys/kernel/debug/kernel_rcu_example/val
a=5,b=2
Enter fullscreen mode Exit fullscreen mode

You see! The value of ptr is not updated completely, only a is updated. This is a typical race condition. We have used mdelay here to simulate the behaviour, but this usually occurs when two threads update and read the same data simultaneously. Since the update operation is not atomic, the writer thread might be preempted before the update completes, and the reader thread sees a partial update.

Mutex to synchronize

The classical approach to solve this issue is to use a mutex. Take the mutex before updating the value and release it once update is done. Similarly, take it before reading the value.

DEFINE_MUTEX(lock);
...
void update_data(struct data *old, int a, int b, u32 ms)
{
    if(!old)
        return;
    mutex_lock(&lock);
    old->a = a;
    mdelay(ms);
    old->b = b;
    mutex_unlock(&lock);
    pr_info(MODULE_NAME ": Data updated\n");
}
...
ssize_t dbg_read(struct file *file, char __user *buffer, size_t count, loff_t *off)
{
    ...
    mutex_lock(&lock);
    len = scnprintf(kbuf, sizeof(kbuf), "a=%d,b=%d\n", ptr->a, ptr->b);
    mutex_unlock(&lock);
    ...
}
Enter fullscreen mode Exit fullscreen mode

Now, if you compile and deploy this module and run the same experiment, you will see that in the second terminal, while you try to read val, it is blocked (for 5 seconds) until the update is done and the proper value is displayed.

$ cat /sys/kernel/debug/kernel_rcu_example/val #Blocks for sometime
a=5,b=2
Enter fullscreen mode Exit fullscreen mode

Reader‑mostly workloads

Consider this: you have readers in the order of 100s or 1000s but few writers, in the order of 1s or 10s. If you use a mutex (or spinlock), the reader needs to wait until the other reader is done. Similarly, the writer may need to wait for a long time before it gets the mutex to update the data. This is very inefficient and not scalable for a large number of readers. What if we allow multiple readers to read the data simultaneously, while still ensuring the data read is consistent (not a partial update)? This is where reader–writer locks or RCU come into the picture. They allow multiple readers to read simultaneously. I will not go into reader–writer locks in this post, but the general idea is: readers take the read lock (multiple readers can take it) which can only be taken if the writer lock is not taken (no writer), and the writer takes the writer lock, which can only be taken when the reader lock count is 0 (no readers).

Introducing RCU

RCU allows multiple readers to read simultaneously, and a single writer can update while readers are reading. The writer need not wait for the readers to finish reading. If a writer tries to write while readers are reading, the writer writes the value to a new location, while the old location is still valid. Once all the readers are done reading, the old location is discarded and the new location is kept.

The idea is, when a writer want to write a data:

  1. Read the current data (RCU)
  2. Copy the current data to a new location (R*C*U)
  3. Update data in the new location (RC*U*)
  4. Publish the new data (update the current location to point to the new location)
  5. Defer the freeing of the old location until all the reader are done

Let's modify the code to use RCU technique.

#include <linux/module.h>
#include <linux/list.h>
#include <linux/slab.h>
#include <linux/delay.h>
#include <linux/debugfs.h>
#include <linux/mm.h>


#define MODULE_NAME "kernel_rcu_example"
#define FILENAME    "val"

struct data {
    int a;
    int b;
};

struct data __rcu *ptr = NULL;
static struct dentry *gparent = NULL;

DEFINE_MUTEX(writer_lock);

void update_data(int a, int b, u32 ms)
{
    struct data *newloc = NULL;
    struct data *curr = NULL;
    if(!ptr)
        return;

    //Create a new location to store the updated data
    newloc = (struct data*)kzalloc(sizeof(struct data), GFP_KERNEL);
    if(!newloc)
        return;


    mutex_lock(&writer_lock);
    //Read the current data
    curr = rcu_dereference(ptr);
    //Copy the data to the new location
    memcpy(newloc, curr, sizeof(struct data));

    //Update the data in the new location
    newloc->a = a;
    mdelay(ms);
    newloc->b = b;

    //Publish the new data
    rcu_assign_pointer(ptr, newloc);
    mutex_unlock(&writer_lock);

    //Free the previous location once all the readers are done
    synchronize_rcu();
    kfree(curr);
    pr_info(MODULE_NAME ": Data updated\n");
}

ssize_t dbg_write(struct file *file, const char __user *buffer, size_t count, loff_t *off)
{
    char kbuf[100];
    int a, b;
    u32 ms;
    (void)off; //Ignore offset for simplification
    if(count >= sizeof(kbuf))
        return -EINVAL;

    if (copy_from_user(kbuf, buffer, count))
        return -EFAULT;

    kbuf[count] = '\0';  // Null-terminate
    if(sscanf(kbuf, "%d:%d:%u", &a, &b, &ms) != 3)
        return -EINVAL;
    update_data(a, b, ms);
    return count;
}

ssize_t dbg_read(struct file *file, char __user *buffer, size_t count, loff_t *off)
{
    char kbuf[100];
    size_t len;
    struct data *curr = NULL;

    if (*off > 0)
        return 0;

    if(!ptr)
        return -EINVAL;
    if(sizeof(kbuf) >= count)
        return -EINVAL;

    rcu_read_lock();
    curr = rcu_dereference(ptr);
    len = scnprintf(kbuf, sizeof(kbuf), "a=%d,b=%d\n", curr->a, curr->b);
    rcu_read_unlock();
    if(len > count)
        return -EINVAL;
    if (copy_to_user(buffer, kbuf, len))
        return -EFAULT;
    *off += len;
    return len;
}

static const struct file_operations dbg_fops = {
    .write = dbg_write,
    .read = dbg_read,
};

static int debug_init(void)
{
    if (!IS_ENABLED(CONFIG_DEBUG_FS)) {
        pr_warn(MODULE_NAME": debugfs unsupported! Aborting ...\n");
        return -EINVAL;
    }
    gparent = debugfs_create_dir(MODULE_NAME, NULL);
    if (!gparent) {
        pr_err(MODULE_NAME": debugfs_create_dir failed, aborting...\n");
        return -EINVAL;
    }

    if(!debugfs_create_file(FILENAME, 0660, gparent, NULL, &dbg_fops))
    {
        pr_err(MODULE_NAME": debugfs_create_file failed, aborting...\n");
        goto out_fail;
    }
    return 0;
out_fail:
    debugfs_remove_recursive(gparent);
    return -EINVAL;
}

static int __init kernelrcu_init_module(void)
{
    pr_info(MODULE_NAME ": %s\n",__func__);
    debug_init();
    ptr = (struct data*)kzalloc(sizeof(struct data), GFP_KERNEL);
    update_data(1, 2, 0);
    return 0;
}

static void __exit kernelrcu_cleanup_module(void)
{
    pr_info(MODULE_NAME ": %s\n",__func__);
    if(ptr)
        kfree(ptr);
    if(gparent)
        debugfs_remove_recursive(gparent);
}
module_init(kernelrcu_init_module);
module_exit(kernelrcu_cleanup_module);
MODULE_LICENSE("GPL");
Enter fullscreen mode Exit fullscreen mode

Now if you deploy this module and run the previous experiment, you will see the old data until the new data is published and becomes visible to readers.

$ echo "5:6:5000" > /sys/kernel/debug/kernel_rcu_example/val
# Blocks
# in another terminal
$ cat val
a=1,b=2
$ cat val
a=1,b=2
$ cat val
a=1,b=2
$ cat val
a=1,b=2
$ cat val  #Finally the updated value
a=5,b=6
Enter fullscreen mode Exit fullscreen mode

First we declared ptr as RCU-protected, struct data __rcu *ptr. Whenever we want to dereference the RCU pointer, we use rcu_dereference(ptr); this ensures memory ordering and correctness under concurrency. Similarly, we use rcu_assign_pointer(old, new) to assign a new location to the existing pointer.

Let's look at the readers’ logic in dbg_read. We use rcu_read_lock() before reading and rcu_read_unlock() to indicate that the reader is done. On the writer side, in update_data:

  1. newloc is allocated a new location.
  2. A lock, writer_lock, is used to synchronize writers. Note how writer_lock is different from RCU and not implicitly embedded in the RCU paradigm. If you are sure that the number of writers is small and the write time is deterministic, it's often better to use a spinlock rather than a mutex.
  3. The writer takes writer_lock, dereferences the current pointer using rcu_dereference, and copies the data to the new location using memcpy.
  4. The writer then updates the data in the new location. Note that the pointer ptr is still pointing to the old data, and the writer has only modified the new location.
  5. The writer then assigns the new location to the existing ptr using rcu_assign_pointer. The old pointer is still valid; if any reader is still referring to that pointer, it will still see the old data.
  6. The writer uses synchronize_rcu() to wait for all pre-existing readers to finish. Once the readers are done, this call returns.
  7. The writer can then safely free the memory pointed to by the old pointer. Any new reader coming in at this point will be referring to the new pointer.

synchronize_rcu is a blocking call and it blocks untill all the readers are done. You might not want to block just to free the old ptr. You can use call_rcu, which is async API.

RCU with Linked List

There are several kernel data structures designed with built-in support for RCU (Read-Copy-Update) to allow safe concurrent reads without locking.

In this example, we use kernel linked list to use of rcu. If you are not familiar with the kernel's linked list implementation, have a look at Linked List in the Linux Kernel.

#include <linux/module.h>
#include <linux/list.h>
#include <linux/slab.h>
#include <linux/delay.h>
#include <linux/debugfs.h>
#include <linux/mm.h>
#include <linux/spinlock.h>


#define MODULE_NAME "kernel_rcu_ll_example"

struct data {
    int seqno;
    int val;
    struct list_head node;
    struct rcu_head rcu;
};

static LIST_HEAD(data_head);
static spinlock_t writer_lock;

static void free_cb(struct rcu_head *rcu) {
    struct data *node = container_of(rcu, struct data, rcu);
    kfree(node);
}

static int add_data(int val) {
    struct data *ptr;

    ptr = kzalloc(sizeof(struct data), GFP_KERNEL);
    if(!ptr)
        return -ENOMEM;

    ptr->val = val;

    spin_lock(&writer_lock);
    if(list_empty(&data_head))
        ptr->seqno = 1;
    else
        ptr->seqno = container_of(data_head.prev, struct data, node)->seqno + 1; //tail seqno + 1
    list_add_tail_rcu(&ptr->node, &data_head);
    spin_unlock(&writer_lock);
    return 0;
}

static int update_node(int seqno, int val) {
    struct data *curr = NULL;
    struct data *old = NULL;
    struct data *new = NULL;
    rcu_read_lock();
    list_for_each_entry_rcu(curr, &data_head, node) {
        if(curr->seqno == seqno){
            old = curr;
            break;
        }
    }
    if(!old) {
        rcu_read_unlock();
        return -EINVAL;
    }
    new = kzalloc(sizeof(struct data), GFP_ATOMIC);
    if(!new) 
    {
        rcu_read_unlock();
        return -ENOMEM;
    }
    memcpy(new, old, sizeof(struct data));
    new->val = val;

    spin_lock(&writer_lock);
    list_replace_rcu(&old->node, &new->node);
    spin_unlock(&writer_lock);
    rcu_read_unlock();

    //Async callback to free the old node
    call_rcu(&old->rcu, free_cb);
    return 0;
}

static void delete_node(int seqno) {
    struct data *curr = NULL;
    spin_lock(&writer_lock);
    list_for_each_entry_rcu(curr, &data_head, node) {
        if(curr->seqno == seqno){
            list_del_rcu(&curr->node);
            spin_unlock(&writer_lock);
            call_rcu(&curr->rcu, free_cb);
            return;
        }
    }
    spin_unlock(&writer_lock);
    return;
}

static void print_ll(void)
{
    struct data *curr = NULL;
    rcu_read_lock();
    list_for_each_entry_rcu(curr, &data_head, node) {
        pr_info("\tseqno:%d , val:%d\n", curr->seqno, curr->val);
    }
    rcu_read_unlock();
}


static int __init kernelrcull_init_module(void)
{
    pr_info(MODULE_NAME ": %s\n",__func__);
    add_data(23);
    add_data(24);
    add_data(25);
    pr_info(MODULE_NAME ": After adding 23, 24, 25 :\n");
    print_ll();
    update_node(3, 26);
    pr_info(MODULE_NAME ": After updating 25 to 26 :\n");
    print_ll();
    delete_node(2);
    pr_info(MODULE_NAME ": After removing 24 :\n");
    print_ll();
    return -EINVAL; //Don't deploy module, only for demo
}

module_init(kernelrcull_init_module);
MODULE_LICENSE("GPL");
Enter fullscreen mode Exit fullscreen mode

This is the dmesg output

[1426406.079680] kernel_rcu_ll_example: kernelrcull_init_module
[1426406.079682] kernel_rcu_ll_example: After adding 23, 24, 25 :
[1426406.079683]        seqno:1 , val:23
[1426406.079683]        seqno:2 , val:24
[1426406.079683]        seqno:3 , val:25
[1426406.079684] kernel_rcu_ll_example: After updating 25 to 26 :
[1426406.079684]        seqno:1 , val:23
[1426406.079684]        seqno:2 , val:24
[1426406.079685]        seqno:3 , val:26
[1426406.079685] kernel_rcu_ll_example: After removing 24 :
[1426406.079685]        seqno:1 , val:23
[1426406.079686]        seqno:3 , val:26
Enter fullscreen mode Exit fullscreen mode

This example is only to demonstrate how to use rcu with linked list. I have called the reader and writer call from the same context (which would ideally not require any synchronization mechanism at all), but the same will work no matter how many concerent readers or writers are there.

Let's have a look at the code example.
First, notice we have a struct rcu_head rcu field in the struct data. I will explain the use of this in sometime. writer_lock is a spin locked used to synchronize the writers.
Let's look at the add_data function. Given a val, it adds a node to the tail of the linked list. the seqno is basically seqno of the tail + 1. Note we need to take the writer_lock before calculating the sequence number, as well as adding the node the linked list. list_add_tail_rcu is used to add the rcu node to the tail of the linked list.
Now, let's look at the update_node function. We take rcu_read_lock before iterating through the list to search for the node to update (based on seqno). If the node is not found, we unlock the rcu reader's lock and return. If node is found, we create a new pointer new and copy the data from the old pointer. We then take the writer_lock and replace the current node with the update node using list_replace_rcu.
I have use a new API call_rcu here. In the previous example, we used synchronize_rcu to wait for the readers to finish, before we free the old ptr. But this would block the caller. We dont want that. call_rcu registers an async callback that will be called later when all the readers are done and doesn't block the caller. Remember the struct rcu_head rcu we added, it is for this usecase. call_rcu takes in pointer to the struct rcu_head reference (which can be used to get the actual container using container_of macro) and the callback function to call. free_cb is called when the readers are done.

delete_node is also similar. Take the writer_lock, iterate through the list and when the node is found use list_del_rcu to remove from the list.

One thing to node is, once you take rcu_read_lock, you shouldn't be calling any blocking function inside the critical section. Notice how I used GPF_ATOMIC in new = kzalloc(sizeof(struct data), GFP_ATOMIC); of the update_node function as GFP_KENRL might block (voluntary preemption).

A little about how RCU works

Now that we know what rcu are, how to use them the the best practices, let use dive a little dep into the implementation of RCU.
You might have noticed there is no reference to a traditional “lock” when calling rcu_read_lock() and rcu_read_unlock(). So what does it lock exactly?
It doesn’t acquire a spinlock or mutex. Instead, it marks the beginning of an RCU read-side critical section. The purpose is to prevent the current CPU from being preempted in a way that would break RCU’s guarantee.

  • On a non-preemptible kernel, rcu_read_lock() and rcu_read_unlock() generate almost no code! Why? Because if the kernel is non-preemptible and we know we cannot call any blocking function inside an RCU read-side section, the current thread will not be preempted until the section completes. If another reader runs on another CPU, the same applies - no preemption until the read is done
  • On a preemptible kernel, rcu_read_lock() disables preemption or increments a per-CPU counter so the kernel knows this thread is inside an RCU read-side section. This allows the RCU subsystem to track active readers.

When you call call_rcu() or synchronize_rcu(), the kernel waits for a grace period - the time it takes for all CPUs to exit their RCU read-side critical sections. In other words, the per-CPU counters for all CPUs must drop to zero before old data can be safely freed.

General guidelines to using RCU

  • Use RCU when there are readers in the order of hundreds or thousands compared to writers. RCU shines in read-mostly workloads because readers are extremely fast and non-blocking.
  • Never use RCU for frequent updates. Writers pay the cost of waiting for grace periods, so if updates dominate, traditional locking may be better.
  • Readers must not block. Inside an rcu_read_lock() section
    • Do not sleep, call schedule(), or perform blocking I/O.
    • Keep the critical section short
  • Always pair rcu_read_lock() with rcu_read_unlock(). Nesting is allowed but must be balanced.
  • Use proper update primitives
    • list_add_rcu(), list_del_rcu(), list_replace_rcu() for linked lists.
    • Use call_rcu() or kfree_rcu() to free old data after a grace period
  • Synchronize writers correctly
    • Use synchronize_rcu() if you need to wait until all readers finish before proceeding.
    • Or use call_rcu() for deferred freeing without blocking the writer.
  • Protect writers with locks if needed. RCU does not serialize writers; you still need spinlocks or mutexes for concurrent updates.
  • Check context: RCU read-side sections are safe in interrupt context, but updates often require sleeping, so they must run in process context.
  • Avoid misuse: RCU is not a general-purpose lock replacement. It’s for scenarios where
    • Reads are frequent and must scale.
    • Updates are rare and can tolerate deferred cleanup.

Cheers!!

Top comments (0)