Hello,
I've been interested in distributed systems for a long time, and my work in DevOps often had me working with distributed systems (mostly etcd which uses raft). However, this was always a surface level interaction and I've always wanted to explore this field more.
Kademlia is a relatively simple distributed system, which makes it a good first step. In this series of posts, I plan on explaining my simple implementation of Kademlia.
I've set myself a few conditions for this implementation:
- Write it in Rust. This is mostly because I enjoy writing code in Rust.
- No using async.
- Each node must run on a single thread.
In my experience, single threaded, non-async code is generally much easier to reason about.
Kademlia is a distributed hash table with an optimization where each node stores values in the cluster that are "close" to it. It operates on a 160 bit key space, where each key is an opaque 160 bit identifier. Each Kademlia node is also assigned a 160 bit identifier.
In this post, we will first explore operations on Kademlia keys, followed by a simple implementation of a routing table.
Keys
Closeness in Kademlia is quantified using the XOR metric. The XOR metric is rather simple:
distance = key_1 ^ key_2
So, let's start with defining a key:
const KEYLEN: usize = 20;
type Key = [u8; KEYLEN];
That would make the XOR distance implementation as follows:
pub fn key_distance(result: &mut Key, x: &Key, y: &Key) -> () {
for index in 0..KEYLEN {
result[index] = x[index] ^ y[index];
}
}
pub fn key_distance2(x: &Key, y: &Key) -> Key {
let mut result: Key = [0; KEYLEN];
key_distance(&mut result, x, y);
result
}
Notice that the distance between two keys is also in the key space.
Now, consider a 4 bit key space. There are 16 keys in this space from
. We'll pick 3 arbitrary keys from this space.
This makes comparing distances a trivial problem, as we just compare respective bytes from left to right until a differing byte is found or the distances are equal.
We'll pause our discussion on keys, as the interesting bit is how they are used in routing tables.
Routing Tables (Simplified)
While the paper describes storing nodes as leaves in a tree (which is valid), the best insight for implementation comes from the following excerpt:
For each 0 <= i <= 160, every node keeps a list of triples (IP address, UDP port, Node ID) triples for nodes of distance between 2^i and 2^(i+1) from itself. We call these lists k-buckets. Each list is kept sorted by time last seen - least-recently seen node at the head, most-recently seen at the tail. ... the lists can grow up to size k, where k is a system wide replication parameter.
Let's break this down:
For each 0 <= i < 160, every node keeps a list of triples (IP address, UDP port, Node ID) triples for nodes of distance between 2^i and 2^(i+1) from itself.
When i = 0, the list can store nodes with a distance from the current node in the range .
When i = 5, this becomes .
Since , there will be a maximum of 160 k-buckets. Since each list cannot go beyond size k, the total number of entries in the whole table cannot grow beyod .
This isn't that big given that the paper recommends k be set to 20. We can just pre-allocate this.
We can define a simple k-bucket and routing table as follows:
const TABLE_SIZE: usize = KEYLEN * 8 + 1;
// RoutingTable stores the K-Bucket tree
#[derive(Debug)]
pub struct RoutingTable<T: Clone + Eq> {
pub k: usize,
pub key: Key,
pub table: Vec<KBucket<T>>,
}
#[derive(Debug, Clone)]
pub struct KBucket<T: Clone + Eq> {
pub last_refresh_time: time::Instant,
pub list: LinkedList<(Key, T)>,
}
impl<T: Clone + Eq> KBucket<T> {
pub fn new() -> Self {
Self {
last_refresh_time: time::Instant::now(),
list: LinkedList::new(),
}
}
}
We use a generic for the actual data stored by the k-bucket. This could be simplified as just a (Key, SocketAddr)
tuple, but at this point we don't actually know what routing information is stored as long as the lookups for a key are accurate.
Now, for the important question: Given a key, how do we find the appropriate k-bucket for that key?
XOR Metric 2
Consider the following statements:
- For any key
w
, the distance to itselfdist(w, w) = 0
, which would be a 160 bit key with all zeros. This is trivial to prove as any number xor'ed with itself is zero. - Any distance in will have zeros as a prefix. For kademlia, N = 160.
To illustrate the second statement, consider the following in our simplified 4 bit key space (N = 4):
This translates to the following:
Given a key w
, we compute the distance d
from the current node u
. Then, we count the number of zeros which are a prefix of d
. This count gives us the index for the appropriate bucket. The extreme cases are as follows:
- When
u == w
, the index will be 160. This means that the node will be placed in the last bucket of the table. This bucket can only hold 1 entry. - When
d = (1....)
,u
andw
differ at the very first bit. This means thatw
will be placed in the first bucket of the table.
This leads nicely to one of the core properties of Kademlia: a node can store more information about other nodes that are close to it as compared to nodes farther away. eg. when k = 20, it can only store 20 out of 128 for nodes with a distance between 128 and 255, but all 16 for nodes with a distance between 16 and 31.
Let us implement this:
pub fn key_zeroes_prefix(key: &Key) -> usize {
let mut index = 0;
while index < KEYLEN && key[index] == 0 {
index += 1;
}
if index == KEYLEN {
return KEYLEN * 8;
}
let b = key[index];
let mask: u8 = 128;
let mut count = 0;
while count < 8 && b & (mask >> count) == 0 {
count += 1;
}
index * 8 + count
}
impl<T: Clone + Eq> RoutingTable<T> {
pub fn new(id: Key, data: T, k: usize) -> Self {
let mut t = Self {
key: id,
k: k,
table: vec![KBucket::new(); TABLE_SIZE],
};
// insert current node into routing table
let _ = t.insert_contact(&id, data);
t
}
// Finds the index of the closest k-bucket for a given key
fn get_bucket_index_for_key(&self, w: &Key) -> usize {
self.get_bucket_index(&key_distance2(&self.key, w))
}
// Finds the index of the closest k-bucket for a given distance
#[inline]
fn get_bucket_index(&self, distance: &Key) -> usize {
key_zeroes_prefix(distance)
}
pub fn evict_contact(&mut self, w: &Key) {
let bucket = {
let idx = self.get_bucket_index_for_key(w);
&mut self.table[idx]
};
Self::evict_from_bucket(bucket, w)
}
pub fn evict_from_bucket(bucket: &mut KBucket<T>, w: &Key) {
// check if node is in the bucket
let mut idx = None::<usize>;
for (i, (u, _)) in bucket.list.iter().enumerate() {
if u == w {
idx = Some(i);
break;
}
}
// remove node if it exists in the list
if let Some(idx) = idx {
debug!("evicting existing entry");
// remove contact from list
let mut split_list = bucket.list.split_off(idx);
split_list.pop_front();
bucket.list.append(&mut split_list);
}
}
// Attempts to insert a new contact into the routing table
pub fn insert_contact(&mut self, w: &Key, data: T) -> Option<()> {
debug!("inserting new contact: {}", hex::encode(w));
let bucket = {
let idx = self.get_bucket_index_for_key(w);
&mut self.table[idx]
};
// evict key from bucket if it exists
Self::evict_from_bucket(bucket, w);
bucket.last_refresh_time = time::Instant::now();
// if the list is full, drop the least recently seen node
if bucket.list.len() == self.k {
bucket.list.pop_front();
}
// key is not in the list, try to insert
bucket.list.push_back((*w, data));
return Some(());
}
}
Please note the following:
- We initialize the routing table by inserting the current node into the table.
- Whenever we try to insert a node into a list, we first evict the node if it exists, and then insert the node at the tail of the list. This ensures the k-bucket list is always sorted in least-recently seen order from head to tail.
We find the nodes closest to a given key as follows:
- Find the appropriate bucket index for the given key using the above methods.
- We then iterate from the index until 160 and keep adding to the result until there are at least k entries in the result or we run out of buckets.
- We then iterate backwards from the bucket index to 0, and keep adding to the result with the same condition.
The reason for the iteration direction is that the range of distances a bucket stores increases exponentially as the index approaches zero, and decreases exponentially as the index approaches 160. Therefore, we first add all the nodes that are between 0 and . If those are not enough, we add distances that are further away. We add nodes from at least one bucket index which stores nodes with distances between if it exists to account for the minute chance that those nodes happen to be closer to the target.
This is implemented as follows:
impl <T: Clone + Eq> RoutingTable<T> {
...
// Returns k closest known entries to the key w
pub fn find_closest_to_key(&self, key: &Key) -> Option<Vec<(Key, T)>> {
self.find_closest_to_key_ignore(key, None)
}
pub fn find_closest_to_key_ignore(
&self,
key: &Key,
ignore: Option<&HashSet<Key>>,
) -> Option<Vec<(Key, T)>> {
let (bucket, bucket_idx) = {
let idx = self.get_bucket_index_for_key(key);
(&self.table[idx], idx)
};
let mut result = vec![];
for i in bucket_idx..TABLE_SIZE {
let bucket = &self.table[i];
for b in &bucket.list {
if !ignore.as_ref().map_or(false, |s| s.contains(&b.0)) {
result.push(b.clone())
}
}
if result.len() > self.k {
break;
}
}
let mut b_idx = bucket_idx;
let mut first = true;
while b_idx > 0 && b_idx < TABLE_SIZE && result.len() < self.k {
first = false;
let bucket = &self.table[b_idx - 1];
for b in &bucket.list {
if !ignore.as_ref().map_or(false, |s| s.contains(&b.0)) {
result.push(b.clone())
}
}
// overflow could panic
if b_idx == 0 {
break;
}
b_idx -= 1
}
// sort the list
result.sort_by(|(x, _), (y, _)| {
let (u, v) = (key_distance2(x, &self.key), key_distance2(y, &self.key));
u.cmp(&v)
});
// truncate the list
result.truncate(self.k);
Some(result)
}
}
Thus we have a mostly complete routing table.
Conclusion
In the next post, we will explore how recursive node lookups can be implemented so that nodes can join (and leave) the network. I'm not very good at writing so I would greatly value any feedback. I'm also happy to answer any questions.
Top comments (0)