DEV Community

Paul J. Lucas
Paul J. Lucas

Posted on • Edited on

Go-Like Channels in C

#c

Introduction

To communicate among threads in a multithreaded program, you typically need to use mutexes to share the memory of variables. While that works, writing such programs is often hard to get right, even for experts. Not holding a lock when you should can lead to subtle and hard-to-reproduce bugs; holding multiple locks simultaneously can lead to deadlocks.

A channel is a data structure used for message passing among threads.

Or processes, but the focus here is on threads. For processes on Unix, there are pipes.

Instead of communicating among threads by sharing memory (requiring explicitly using mutexes and copying), you share memory by communicating (only explicitly copying).

Notes to the reader

This article assumes familiarity with traditional multithreaded programming, specifically the concepts of mutexes, condition variables, and the pthreads API specifically.

Channels in Go

While channels have been around since 1978, they’ve been popularized by having direct support in the Go programming language. For example, in Go:

func sum_array(vals []int, result chan int) {
  sum := 0
  for _, val := range vals {
    sum += val
  }
  result <- sum          // Send partial sum to channel.
}

func main() {
  vals := []int{ 7, 2, 8, -9, 4, 0 };
  c := make(chan int)
  go sum_array(vals[:len(s)/2], c)
  go sum_array(vals[len(s)/2:], c)
  sum := <-c + <-c       // Receive sums from channel.
}
Enter fullscreen mode Exit fullscreen mode

That code uses two “goroutines” (light-weight threads, created by the lines starting with go), each calling sum_array to sum half an array of integers. Each partial sum is then sent to the channel c using the <- binary operator. In main, the partial sums are read from the channel using the <- unary operator. Notice that there is no explicit use of mutexes, condition variables, or signaling between threads. The <- operator handles all that behind the curtain for you.

That code uses an unbuffered channel, that is a sender will block until a receiver is available and vice versa, i.e., the threads have to rendezvous. Go also supports buffered channels of a fixed size, e.g.:

  c := make(chan int 16) // Buffered channel, 16 capacity.
Enter fullscreen mode Exit fullscreen mode

For buffered channels, senders can send without blocking as long as the channel isn’t full; receivers can receive without blocking as long as the channel isn’t empty.

In addition to the one-at-a-time operations of send and receive, Go also has a select statement that allows you to select from multiple channels simultaneously:

func fibonacci(fibc chan int, quitc chan bool) {
  fib, next_fib := 0, 1
  for {
    select {
      case fibc <- fib:  // If sent number, calc next number.
        fib, next_fib = next_fib, fib + next_fib
      case <-quitc:      // If received quit, return.
        return
    }
  }
}
Enter fullscreen mode Exit fullscreen mode

That code calculates Fibonacci numbers and sends them to the fibc channel. The select waits until either there is a receiver to receive the next result or any value is received on the quitc channel.

The select statement may have an optional default case that’s selected only if no channels are ready. Lastly, channels may be closed to indicate to receivers that no more messages are coming.

Channels in C API: Basics

To create a channel in C, we can do:

struct chan c;
chan_init( &c, 100, sizeof(int) );
Enter fullscreen mode Exit fullscreen mode

where 100 specifies the capacity of the buffer (if 0, the channel will be unbuffered) and sizeof specifies the size of each message.

Since we can’t add an <- operator to C, we’ll have to substitute ordinary function calls:

int chan_send( struct chan *chan, void const *send_buf,
               struct timespec const *duration );

int chan_recv( struct chan *chan, void *recv_buf,
               struct timespec const *duration );

#define CHAN_NO_WAIT  NULL
extern struct timespec const *const CHAN_NO_TIMEOUT;
Enter fullscreen mode Exit fullscreen mode

But this allows us to specify an optional time-out, something that can’t be done directly in Go. CHAN_NO_WAIT as a synonym for NULL means “don’t wait,” CHAN_NO_TIMEOUT means “wait indefinitely,” and any other value means to wait for that amount of time.

To implement a time-out in Go, you typically select on the channel(s) of interest, plus an additional channel that receives a value after a time-out has expired.

The functions return a standard int error code:

  • 0: success;
  • EINVAL: invalid argument;
  • EPIPE: channel is closed;
  • EAGAIN: duration is NULL and can’t immediately send or receive;
  • ETIMEDOUT: duration has expired.

There’s also:

void chan_close( struct chan *chan );

void chan_cleanup( struct chan *chan,
                   void (*msg_cleanup_fn)( void* ) );
Enter fullscreen mode Exit fullscreen mode

where chan_close closes the channel and chan_cleanup cleans-up its internal resources. The msg_cleanup_fn parameter is used only for buffered channels to clean-up each unreceived message, if any. It may be NULL for unbuffered channels or if no clean-up is necessary.

There’s no equivalent of chan_cleanup in Go because Go has garbage collection, so channels and unreceived messages are cleaned-up automatically.

The API for chan_select is more complicated, so will be deferred until later.

Channels in C Implementation: Basics

This part of the implementation covers the data structure for channels, initialization, clean-up, time-outs, and buffered send and receive.

Data Structures

To implement channels, we’ll obviously need a data structure:

struct chan {
  union {
    struct {
      void           *ring_buf;         // Message ring buffer.
      unsigned        ring_len;         // Number of messages.
      unsigned        ring_idx[2];      // Receive/send indices.
    } buf;
    struct {
      void           *recv_buf;         // Where to put a message.
      pthread_cond_t  copy_done[2];     // Receive/send copy done.
      pthread_cond_t  not_busy[2];      // Receive/send no longer busy.
      bool            is_busy[2];       // Is receive/send busy?
      bool            is_copy_done[2];  // Is receive/send copied?
    } unbuf;
  };

  pthread_cond_t      ready[2];         // Receive/send ready.
  unsigned short      wait_cnt[2];      // Waiting to receive/send.

  pthread_mutex_t     mtx;              // Channel mutex.
  size_t              msg_size;         // Message size.
  unsigned            buf_cap;          // 0 = unbuffered.
  bool                is_closed;        // Is channel closed?
};
Enter fullscreen mode Exit fullscreen mode

Approximately half of the structure’s members are within an anonymous union grouped by buf and unbuf structures. For buffered channels, messages will be stored in a ring buffer. For unbuffered channels, recv_buf is a pointer to where a sender should copy a message to. (An explanation of the remaining members will be in due course.)

The two ready pthread condition variables mean “receiver can proceed” because either a sender is available (for an unbuffered channel) or the buffer isn’t empty (for a buffered channel) and “sender can proceed” because either a receiver is available (for an unbuffered channel) or the buffer isn’t full (for a buffered channel), respectively.

The two wait_cnt counts the number of threads waiting to receive and send, respectively. The rest of the members are straightforward.

Internally, we’ll also define:

enum chan_dir {
  CHAN_RECV,                    // Receive direction.
  CHAN_SEND                     // Send direction.
};
typedef enum chan_dir chan_dir;
Enter fullscreen mode Exit fullscreen mode

to be more mnemonic indices into the copy_done, is_busy, is_copy_done, not_busy, ready, ring_idx, and wait_cnt arrays than 0 and 1.

Initialization, Close, & Clean-Up

The implementation of chan_init is fairly straightforward:

int chan_init( struct chan *chan, unsigned buf_cap,
               size_t msg_size ) {
  assert( chan != NULL );

  if ( buf_cap > 0 ) {
    if ( unlikely( msg_size == 0 ) ) {
      errno = EINVAL;
      return EINVAL;
    }
    chan->buf.ring_buf = malloc( buf_cap * msg_size );
    if ( chan->buf.ring_buf == NULL )
      return ENOMEM;  // malloc sets errno to ENOMEM
    chan->buf.ring_idx[ CHAN_SEND ] = 0;
    chan->buf.ring_idx[ CHAN_RECV ] = 0;
    chan->buf.ring_len = 0;
  }
  else {
    chan->unbuf.recv_buf = NULL;
    pthread_cond_init( &chan->unbuf.copy_done[ CHAN_RECV ], NULL );
    pthread_cond_init( &chan->unbuf.copy_done[ CHAN_SEND ], NULL );
    pthread_cond_init( &chan->unbuf.not_busy[ CHAN_RECV ], NULL );
    pthread_cond_init( &chan->unbuf.not_busy[ CHAN_SEND ], NULL );
    chan->unbuf.is_busy[ CHAN_RECV ] = false;
    chan->unbuf.is_busy[ CHAN_SEND ] = false;
    chan->unbuf.is_copy_done[ CHAN_RECV ] = false;
    chan->unbuf.is_copy_done[ CHAN_SEND ] = false;
  }

  chan->buf_cap = buf_cap;
  chan->is_closed = false;
  chan->msg_size = msg_size;
  pthread_mutex_init( &chan->mtx, NULL );
  pthread_cond_init( &chan->ready[ CHAN_RECV ], NULL );
  pthread_cond_init( &chan->ready[ CHAN_SEND ], NULL );
  chan->wait_cnt[ CHAN_RECV ] = 0
  chan->wait_cnt[ CHAN_SEND ] = 0;

  return 0;
}
Enter fullscreen mode Exit fullscreen mode

For information about likely and unlikely, see here.

Not mentioned previously is that chan_init returns an int where zero indicates success and non-zero is an error code.

Also not mentioned previously is that, for an unbuffered channel, we allow msg_size to be zero. This is marginally useful when you simply want one thread to signal another and the message contents are irrelevant.

In Go, you can use a chan bool as was done in the earlier example with the quitc channel. The fact that it received any value is what matters; whether it was true or false doesn’t matter.

The implementation of chan_close is:

void chan_close( struct chan *chan ) {
  assert( chan != NULL );
  pthread_mutex_lock( &chan->mtx );
  if ( !chan->is_closed ) {
    chan->is_closed = true;
    chan_signal( chan, CHAN_RECV, &pthread_cond_broadcast );
    chan_signal( chan, CHAN_SEND, &pthread_cond_broadcast );
    if ( chan->buf_cap == 0 ) {
      chan->unbuf.is_copy_done[ CHAN_RECV ] = true;
      pthread_cond_broadcast( &chan->unbuf.copy_done[ CHAN_RECV ] );
      chan->unbuf.is_copy_done[ CHAN_SEND ] = true;
      pthread_cond_broadcast( &chan->unbuf.copy_done[ CHAN_SEND ] );
      pthread_cond_broadcast( &chan->unbuf.not_busy[ CHAN_RECV ] );
      pthread_cond_broadcast( &chan->unbuf.not_busy[ CHAN_SEND ] );
    }
  }
  pthread_mutex_unlock( &chan->mtx );
}
Enter fullscreen mode Exit fullscreen mode

If the channel was not already closed, use pthread_cond_broadcast to wake up all threads that may be waiting on condition variables.

The chan_signal function signals the relevant condition variable, but only if there are actually other threads waiting:

static void chan_signal( struct chan *chan, chan_dir dir,
                         int (*pthread_cond_fn)( pthread_cond_t* ) ) {
  if ( chan->wait_cnt[ dir ] > 0 )
    (*pthread_cond_fn)( &chan->ready[ dir ] );
}
Enter fullscreen mode Exit fullscreen mode

The implementation of chan_cleanup is also fairly straightforward:

void chan_cleanup( struct chan *chan,
                   void (*msg_cleanup_fn)( void* ) ) {
  if ( chan == NULL )
    return;

  if ( chan->buf_cap > 0 ) {
    if ( chan->buf.ring_len > 0 && msg_cleanup_fn != NULL ) {
      unsigned recv_idx = chan->buf.ring_idx[ CHAN_RECV ];
      for ( unsigned i = 0; i < chan->buf.ring_len; ++i ) {
        (*msg_cleanup_fn)( chan_buf_at( chan, recv_idx ) );
        recv_idx = (recv_idx + 1) % chan->buf_cap;
      }
    }
    free( chan->buf.ring_buf );
  }
  else {
    pthread_cond_destroy( &chan->unbuf.copy_done[ CHAN_RECV ] );
    pthread_cond_destroy( &chan->unbuf.copy_done[ CHAN_SEND ] );
    pthread_cond_destroy( &chan->unbuf.not_busy[ CHAN_RECV ] );
    pthread_cond_destroy( &chan->unbuf.not_busy[ CHAN_SEND ] );
  }

  pthread_cond_destroy( &chan->ready[ CHAN_RECV ] );
  pthread_cond_destroy( &chan->ready[ CHAN_SEND ] );
  pthread_mutex_destroy( &chan->mtx );
}
Enter fullscreen mode Exit fullscreen mode

where chan_buf_at is a utility function to get the address of the ith message in the ring buffer:

static inline void* chan_buf_at( struct chan *chan, unsigned abs_idx ) {
  return (char*)chan->buf.ring_buf + abs_idx * chan->msg_size;
}
Enter fullscreen mode Exit fullscreen mode

Send, Receive, & Time

The implementation of chan_send is:

int chan_send( struct chan *chan, void const *send_buf,
               struct timespec const *duration ) {
  assert( chan != NULL );

  struct timespec abs_ts;
  struct timespec const *const abs_time = ts_rel_to_abs( duration, &abs_ts );

  int const rv = chan->buf_cap > 0 ?
    unlikely( send_buf == NULL ) ? EINVAL :
      chan_buf_send( chan, send_buf, abs_time )
  :
    unlikely( chan->msg_size > 0 && send_buf == NULL ) ? EINVAL :
      chan_unbuf_send( chan, send_buf, abs_time );

  if ( rv > 0 )
    errno = rv;
  return rv;
}
Enter fullscreen mode Exit fullscreen mode

In the channel API, we allow the user to specify a time-out as a duration, i.e., a time relative to now, because it’s simpler. However, in the pthreads API, a time-out must be specified as an absolute time in the future. Hence, we need to convert a relative to an absolute time:

static struct timespec const*
ts_rel_to_abs( struct timespec const *rel_time,
               struct timespec *abs_time ) {
  assert( abs_time != NULL );

  if ( rel_time == CHAN_NO_WAIT || rel_time == CHAN_NO_TIMEOUT )
    return rel_time;

  struct timespec now;
  clock_gettime( CLOCK_REALTIME, &now );

  *abs_time = (struct timespec){
    .tv_sec  = now.tv_sec  + rel_time->tv_sec,
    .tv_nsec = now.tv_nsec + rel_time->tv_nsec
  };

  return abs_time;
}
Enter fullscreen mode Exit fullscreen mode

Once that’s done, chan_send calls either chan_buf_send or chan_unbuf_send.

The implementation of chan_recv is similar:

int chan_recv( struct chan *chan, void *recv_buf,
               struct timespec const *duration ) {
  assert( chan != NULL );

  struct timespec abs_ts;
  struct timespec const *const abs_time = ts_rel_to_abs( duration, &abs_ts );

  int const rv = chan->buf_cap > 0 ?
    unlikely( recv_buf == NULL ) ? EINVAL :
      chan_buf_recv( chan, recv_buf, abs_time )
  :
    unlikely( chan->msg_size > 0 && recv_buf == NULL ) ? EINVAL :
      chan_unbuf_recv( chan, recv_buf, abs_time );

  if ( rv > 0 )
    errno = rv;
  return rv;
}
Enter fullscreen mode Exit fullscreen mode

Channels in C Implementation: Buffered Send & Receive

This part of the implementation covers buffered send and receive. They’re a fairly straightforward implementation of the classic producer-consumer problem.

Buffered Send

The implementation of chan_buf_send is:

static int chan_buf_send( struct chan *chan, void const *send_buf,
                          struct timespec const *abs_time ) {
  int rv = 0;
  pthread_mutex_lock( &chan->mtx );

  do {
    if ( chan->is_closed ) {
      rv = EPIPE;
    }
    else if ( chan->buf.ring_len < chan->buf_cap ) {
      memcpy( chan_buf_at( chan, chan->buf.ring_idx[ CHAN_SEND ] ),
              send_buf, chan->msg_size );
      chan->buf.ring_idx[ CHAN_SEND ] =
        (chan->buf.ring_idx[ CHAN_SEND ] + 1) % chan->buf_cap;
      if ( ++chan->buf.ring_len == 1 )
        chan_signal( chan, CHAN_BUF_NOT_EMPTY, &pthread_cond_signal );
      break;
    }
    else {
      rv = chan_wait( chan, CHAN_BUF_NOT_FULL, abs_time );
    }
  } while ( rv == 0 );

  pthread_mutex_unlock( &chan->mtx );
  return rv;
}
Enter fullscreen mode Exit fullscreen mode

There are three cases:

  1. The channel is closed: return EPIPE.
  2. The channel isn’t full: copy the message into the buffer and increment its length. If the new length is 1, it means the channel was empty but isn’t now, so signal any receivers that may be waiting.
  3. Otherwise, wait until abs_time for the channel to become not full.

For mnemonic convenience, we define these macros to specify which condition we either want to notify about or wait for:

#define CHAN_BUF_NOT_EMPTY    CHAN_RECV
#define CHAN_BUF_NOT_FULL     CHAN_SEND
Enter fullscreen mode Exit fullscreen mode

The chan_wait function is used to wait until a channel is “ready” (can proceed):

static int chan_wait( struct chan *chan, chan_dir dir,
                      struct timespec const *abs_time ) {
  assert( chan != NULL );

  if ( chan->is_closed )
    return EPIPE;

  ++chan->wait_cnt[ dir ];
  int const rv =
    pthread_cond_wait_wrapper( &chan->ready[ dir ].chan_ready,
                               &chan->mtx, abs_time );
  --chan->wait_cnt[ dir ];

  return chan_is_hard_closed( chan, dir ) ? EPIPE : rv;
}
Enter fullscreen mode Exit fullscreen mode

If the channel is closed, return EPIPE; otherwise increment the relevant wait_cnt and wait. When the wait returns, decrement the relevant wait_cnt. If the channel was closed while waiting, return EPIPE; otherwise return whatever the wrapper returned.

The pthread_cond_wait_wrapper function handles the special cases for abs_time, specifically if NULL, returns EAGAIN; if CHAN_NO_TIMEOUT, calls pthread_cond_wait; otherwise calls pthread_cond_timedwait:

static int pthread_cond_wait_wrapper( pthread_cond_t *cond,
                                      pthread_mutex_t *mtx,
                                      struct timespec const *abs_time ) {
  assert( cond != NULL );
  assert( mtx != NULL );

  if ( abs_time == CHAN_NO_WAIT )
    return EAGAIN;

  int const pcw_rv = abs_time == CHAN_NO_TIMEOUT ?
    pthread_cond_wait( cond, mtx ) :
    pthread_cond_timedwait( cond, mtx, abs_time );

  switch ( pcw_rv ) {
    case 0:
    case ETIMEDOUT:
      return pcw_rv;
    default:
      errno = pcw_rv;
      perror( PACKAGE );  // defined by Autotools
      exit( 1 );
  }
}
Enter fullscreen mode Exit fullscreen mode

The chan_is_hard_closed function is:

static bool chan_is_hard_closed( struct chan const *chan, chan_dir dir ) {
  return  chan->is_closed &&
          (dir == CHAN_SEND ||
           chan->buf_cap == 0 ||
           chan->buf.ring_len == 0);
}
Enter fullscreen mode Exit fullscreen mode

A non-empty buffered channel can still be received from even if it’s closed, so it shouldn’t be considered “hard closed.”

Buffered Receive

The implementation of chan_buf_recv is:

static int chan_buf_recv( struct chan *chan, void *recv_buf,
                          struct timespec const *abs_time ) {
  int rv = 0;
  pthread_mutex_lock( &chan->mtx );

  do {
    // Since we can still read from a closed, non-empty, buffered
    // channel, there's no check for is_closed first.
    if ( chan->buf.ring_len > 0 ) {
      memcpy( recv_buf,
              chan_buf_at( chan, chan->buf.ring_idx[ CHAN_RECV ] ),
              chan->msg_size );
      chan->buf.ring_idx[ CHAN_RECV ] =
        (chan->buf.ring_idx[ CHAN_RECV ] + 1) % chan->buf_cap;
      if ( chan->buf.ring_len-- == chan->buf_cap )
        chan_signal( chan, CHAN_BUF_NOT_FULL, &pthread_cond_signal );
      break;
    }
    rv = chan_wait( chan, CHAN_BUF_NOT_EMPTY, abs_time );
  } while ( rv == 0 );

  pthread_mutex_unlock( &chan->mtx );
  return rv;
}
Enter fullscreen mode Exit fullscreen mode

There are two cases:

  1. The channel isn’t empty: copy the message from the buffer and decrement its length. If the old length was its capacity, it means the channel was full but isn’t now, so notify any senders that may be waiting.
  2. Otherwise, wait until abs_time for the channel to become not empty.

Channels in C Implementation: Unbuffered Send & Receive

This part of the implementation covers unbuffered send and receive. They’re a bit trickier to implement because:

  1. The sender and receiver have to rendezvous, i.e., meet at the same place (in memory) and time to copy a message.
  2. However, this does have the advantage of enabling the sender to copy a message directly to where the receiver wants it with only a single memcpy rather than the two required for buffered channels.
  3. While either a sender (or receiver) is waiting for a receiver (or sender), additional senders (or receivers) must be excluded since, for a single channel, only a single sender and receiver can rendezvous.
  4. After a message is copied, the sender has to block to allow the receiver to do something with the message, before attempting to send another message (more later).

Unbuffered Send

The implementation of chan_unbuf_send is:

static int chan_unbuf_send( struct chan *chan, void const *send_buf,
                            struct timespec const *abs_time ) {
  pthread_mutex_lock( &chan->mtx );

  int rv = chan_unbuf_acquire( chan, CHAN_SEND, abs_time );
  if ( rv == 0 ) {
    chan_signal( chan, CHAN_RECV, &pthread_cond_signal );

    do {
      if ( chan->unbuf.is_busy[ CHAN_RECV ] ) {
        if ( chan->msg_size > 0 )
          memcpy( chan->unbuf.recv_buf, send_buf, chan->msg_size );
        chan_unbuf_handshake( chan, CHAN_SEND );
        break;
      }
      rv = chan_wait( chan, CHAN_SEND, abs_time );
    } while ( rv == 0 );

    chan_unbuf_release( chan, CHAN_SEND );
  }

  pthread_mutex_unlock( &chan->mtx );
  return rv;
}
Enter fullscreen mode Exit fullscreen mode

The chan_unbuf_acquire function is used to acquire exclusive access to an unbuffered channel for either sending or receiving:

static int chan_unbuf_acquire( struct chan *chan, chan_dir dir,
                               struct timespec const *abs_time ) {
  int rv = 0;
  while ( rv == 0 && chan->unbuf.is_busy[ dir ] ) {
    rv = chan->is_closed ? EPIPE :
      pthread_cond_wait_wrapper( &chan->unbuf.not_busy[ dir ],
                                 &chan->mtx, abs_time );
  } // while
  if ( rv == 0 )
    chan->unbuf.is_busy[ dir ] = true;
  return rv;
}
Enter fullscreen mode Exit fullscreen mode

This is what the is_busy[2] is for: to know whether either the send end, receive end, or both are “busy” with a thread. If busy, the thread waits until either the relevant not_busy condition has been signaled or the time-out expires. There’s also a corresponding release function:

static void chan_unbuf_release( struct chan *chan, chan_dir dir ) {
  chan->unbuf.is_busy[ dir ] = false;
  pthread_cond_signal( &chan->unbuf.not_busy[ dir ] );
}
Enter fullscreen mode Exit fullscreen mode

In chan_unbuf_send, the line:

if ( chan->unbuf.is_busy[ CHAN_RECV ] ) {
Enter fullscreen mode Exit fullscreen mode

checks whether the channel is busy for a receiver: if so, it means there is a receiver already waiting to rendezvous with the sender, so the sender can proceed to copy the message immediately:

if ( chan->msg_size > 0 )
  memcpy( chan->unbuf.recv_buf, send_buf, chan->msg_size );
Enter fullscreen mode Exit fullscreen mode

As a reminder, for an unbuffered channel, we allow msg_size to be zero for only signaling (no message copying) between threads.

The chan_unbuf_handshake function addresses point 4 above:

static void chan_unbuf_handshake( struct chan *chan,
                                  chan_dir dir ) {
  chan->unbuf.is_copy_done[ !dir ] = true;
  pthread_cond_signal( &chan->unbuf.copy_done[ !dir ] );

  chan->unbuf.is_copy_done[ dir ] = false;
  do {  // guard against spurious wake-ups
    pthread_cond_wait( &chan->unbuf.copy_done[ dir ], &chan->mtx );
  } while ( !chan->unbuf.is_copy_done[ dir ] );

  chan->unbuf.is_copy_done[ !dir ] = true;
  pthread_cond_signal( &chan->unbuf.copy_done[ !dir ] );
}
Enter fullscreen mode Exit fullscreen mode

That is the sender has to block to allow the receiver to do something with the message, before attempting to send another message. To understand the problem, consider the following sequence of events between two threads where each thread is in a loop, one sending and the other receiving:

  1. On thread 1, chan_unbuf_recv is called on a channel, but no sender is present, so it waits.
  2. On thread 2, chan_unbuf_send is called, sees a receiver is waiting, copies the message immediately, and returns.
  3. The kernel’s task scheduler arbitrarily decides to schedule thread 2 again immediately. As stated, it’s in a loop, so chan_unbuf_send is called again, sees a receiver is still “waiting” (even though the message was already copied), and immediately copies a new message overwriting the previous message!

Step 3 can happen any number of times overwriting messages before the scheduler could decide to run thread 1. Note that the same thing can happen with the sender and receiver roles reversed, i.e., the receiver could receive the same message multiple times thinking it’s a new message since the sender isn’t given a chance to run. (For simplicity, we’ll stick to the event sequence as originally presented, i.e., the sender is called multiple times.)

What’s needed is a way to force thread 2 to block after sending a message and wait on a condition variable. Since it’s blocked, the scheduler won’t schedule it again and it therefore will schedule thread 1 to allow its chan_unbuf_recv to return and allow its loop to do whatever with the message.

This is what the copy_done condition variable is for. Additionally, both calls to pthread_cond_signal are necessary to implement a handshake between the two threads.

The is_copy_done flag is to guard against spurious wake-ups.

Unbuffered Receive

The implementation of chan_unbuf_recv is pretty much the same as chan_unbuf_send except CHAN_RECV and CHAN_SEND are swapped:

static int chan_unbuf_recv( struct chan *chan, void *recv_buf,
                            struct timespec const *abs_time ) {
  pthread_mutex_lock( &chan->mtx );

  int rv = chan_unbuf_acquire( chan, CHAN_RECV, abs_time );
  if ( rv == 0 ) {
    chan->unbuf.recv_buf = recv_buf;
    chan_signal( chan, CHAN_SEND, &pthread_cond_signal );

    do {
      if ( chan->unbuf.is_busy[ CHAN_SEND ] ) {
        chan_unbuf_handshake( chan, CHAN_RECV );
        break;
      }
      rv = chan_wait( chan, CHAN_RECV, abs_time );
    } while ( rv == 0 );

    chan->unbuf.recv_buf = NULL;
    chan_unbuf_release( chan, CHAN_RECV );
  }

  pthread_mutex_unlock( &chan->mtx );
  return rv;
}
Enter fullscreen mode Exit fullscreen mode

The only other difference is the addition of the lines:

chan->unbuf.recv_buf = recv_buf;
// ...
chan->unbuf.recv_buf = NULL;
Enter fullscreen mode Exit fullscreen mode

that tells the sender where to copy the message to directly and to reset it when done.

Channels in C API: Select

Just as we can’t add an <- operator to C, we can't add a select statement either, so, again, we’ll have to substitute an ordinary function call:

int chan_select( unsigned recv_len, struct chan *recv_chan[recv_len],
                 void *recv_buf[recv_len],
                 unsigned send_len, struct chan *send_chan[send_len],
                 void const *send_buf[send_len],
                 struct timespec const *duration );
Enter fullscreen mode Exit fullscreen mode

That is you pass an optional array of channels to receive from (and the buffers to receive into), another optional array of channels to send from (and the buffers to send from), and a time-out. When a channel is selected, the function returns the index into either recv_chan or send_chan so you know which channel was selected. If no channel was selected, it returns CHAN_NONE:

#define CHAN_NONE  0
Enter fullscreen mode Exit fullscreen mode

If an error occurred, returns that error code and sets errno to the same value.

You can use a chan_select in a switch statement. For example, the fibonacci function in Go can be rewritten in C as:

void fibonacci( struct chan *fibc, struct chan *quitc ) {
  unsigned fib = 0, prev_fib, next_fib = 1;
  for (;;) {
    switch ( chan_select(
               1, (struct chan*[]){ quitc }, (void      *[]){ NULL },
               1, (struct chan*[]){ fibc  }, (void const*[]){ &fib },
               CHAN_NO_TIMEOUT ) ) {
      case CHAN_SEND(0): // If sent number ...
        prev_fib = fib;  // ... calculate next number.
        fib = next_fib;
        next_fib += prev_fib;
        break;
      case CHAN_RECV(0): // If received quit ...
        return;          // ... quit.
    }
  }
}
Enter fullscreen mode Exit fullscreen mode

CHAN_RECV and CHAN_SEND are macros used to discriminate between the returned index in the recv_chan array from the send_chan array:

#define CHAN_RECV(IDX)  (-1 - (int)(IDX))
#define CHAN_SEND(IDX)  (-1024 - (int)(IDX))
Enter fullscreen mode Exit fullscreen mode

Negative numbers are for selected channels; positive numbers are for error codes (since both the C standard and POSIX guarantee that all Exxxx values are positive).

If you’re wondering how there can be both an enumeration with constants CHAN_RECV and CHAN_SEND and macros having the same names, the preprocessor doesn’t expand a macro with arguments unless the macro’s name is followed by (.

Admittedly, chan_select is nowhere near as elegant as the select statement in Go, but that’s the best you can do in C without direct language support.

Channels in C Implementation: Select

The code for send and receive is relatively straightforward (for multithreaded code). The code to implement select, specifically a blocking select, is quite a bit more complicated. Why? A blocking select waits for any one of a number of channels to become ready and, unfortunately, the pthreads API has no way to pthread_cond_wait for multiple condition variables simultaneously, one for each channel being selected from. But the signaler can signal any number of condition variables. Hence one approach is to have a linked list of “observers” for every channel where an observer is either the channel itself or an active select in some thread.

Data Structures

To implement an observer, we’ll need additional data structures (where the non-obvious members will be explained in due course):

typedef struct chan_impl_link chan_impl_link;
typedef struct chan_impl_obs  chan_impl_obs;

struct chan_impl_link {
  chan_impl_obs  *obs;              // The observer.
  chan_impl_link *next;             // The next link, if any.
};

struct chan_obs_impl {
  struct chan      *chan;           // The channel being observed.
  pthread_mutex_t  *pmtx;           // The mutex to use.
  pthread_cond_t    chan_ready;     // Is chan ready?
};
Enter fullscreen mode Exit fullscreen mode

The chan_impl_link structure is used for a singly linked list of observers. In the chan structure, the ready[2] array will be replaced by:

  chan_impl_link    head_link[2];   // Linked lists of observers.
  chan_obs_impl     self_obs[2];    // Receiver/sender.
Enter fullscreen mode Exit fullscreen mode

where head_link will point to the head of the linked list of observers, the first of which will always be the channel itself, self_obs.

When an observer is embedded inside a chan structure, pmtx will be NULL; when an observer is a chan_select, pmtx will point to a mutex in chan_select’s stack frame (more later).

Given chan_obs_impl, we’ll rename chan_signal to chan_signal_all_obs and expand its implementation to signal all observers:

static void chan_signal_all_obs( struct chan *chan, chan_dir dir,
                                 int (*pthread_cond_fn)( pthread_cond_t* ) ) {
  if ( chan->wait_cnt[ dir ] == 0 )     // Nobody is waiting.
    return;

  for ( chan_impl_link *curr_link = &chan->head_link[ dir ]; curr_link != NULL;
        curr_link = curr_link->next ) {
    chan_impl_obs *const obs = curr_link->obs;
    if ( obs->pmtx != NULL ) {
      pthread_mutex_lock( obs->pmtx );
      obs->chan = chan;
    }
    (*pthread_cond_fn)( &obs->chan_ready );
    if ( obs->pmtx != NULL )
      pthread_mutex_unlock( obs->pmtx );
  } // for
}
Enter fullscreen mode Exit fullscreen mode

The function starts at the channel’s own observer, then traverses the linked list of chan_select observers following the next pointers signaling each in turn.

We’ll also need another data structure to refer to a channel:

struct chan_select_ref {
  struct chan    *chan;         // The channel referred to.
  unsigned short  param_idx;    // Index into recv_chan/send_chan.
  chan_dir        dir;          // Selected channel direction.
  bool            maybe_ready;  // Is channel maybe ready?
};
typedef struct chan_select_ref chan_select_ref;
Enter fullscreen mode Exit fullscreen mode

Channels in C Implementation: Select

The implementation for chan_select starts off as:

int chan_select( unsigned recv_len, struct chan *recv_chan[recv_len],
                 void *recv_buf[recv_len],
                 unsigned send_len, struct chan *send_chan[send_len],
                 void const *send_buf[send_len],
                 struct timespec const *duration ) {
  if ( unlikely( recv_len > 0 && (recv_chan == NULL || recv_buf == NULL) ) ||
       unlikely( send_len > 0 && (send_chan == NULL || send_buf == NULL) ) ) {
    errno = EINVAL;
    return -1;
  }

  chan_select_ref  *ref, stack_ref[16];
  unsigned const    total_channels = recv_len + send_len;

  if ( total_channels <= ARRAY_SIZE( stack_ref ) ) {
    ref = stack_ref;
  }
  else {
    ref = malloc( total_channels * sizeof( chan_select_ref ) );
    if ( unlikely( ref == NULL ) )
      return ENOMEM;                    // malloc sets errno
  }

  struct timespec abs_ts;
  struct timespec const *const abs_time =
    ts_rel_to_abs( duration, &abs_ts );

  struct timespec               abs_ts;
  struct timespec const *const  abs_time =
                                  ts_rel_to_abs( duration, &abs_ts );

  chan_select_init_args   csi;
  bool const              is_blocking = duration != CHAN_NO_WAIT;
  unsigned                seed = 0;     // random number seed
  chan_impl_obs           select_obs;   // observer for this select
  pthread_mutex_t         select_mtx;   // mutex for select_obs
  chan_select_ref const  *selected_ref; // ref to selected channel
  int                     rv;

  // ...
Enter fullscreen mode Exit fullscreen mode

The first few lines simply check for invalid arguments. Next, we attempt small-size optimization, specifically if the total number of channels is ≤ 16, we use a stack-based array of chan_select_ref; otherwise we malloc it. Next, abs_time is calculated just as in other functions. Next, more local variables are declared.

The next part of the implementation is:

  if ( is_blocking ) {
    pthread_mutex_init( &select_mtx, NULL );
    chan_obs_init( &select_obs, &select_mtx );
  }

  // ...
Enter fullscreen mode Exit fullscreen mode

If this is a blocking select, then we need to initialize the mutex we’ll use as well as our observer:

static void chan_obs_init( chan_impl_obs *obs, pthread_mutex_t *pmtx ) {
  assert( obs != NULL );

  obs->chan = NULL;
  pthread_cond_init( &obs->chan_ready, NULL );
  obs->pmtx = pmtx;
}
Enter fullscreen mode Exit fullscreen mode

The next part of the chan_select implementation is:

  do {
    csi = (chan_select_init_args){ 0 };
    rv = 0;
    selected_ref = NULL;

    if ( !chan_select_init( ref, recv_len, recv_chan, CHAN_RECV,
                            is_blocking ? &select_obs : NULL,
                            &csi ) ) {
      break;
    }

    if ( !chan_select_init( ref, send_len, send_chan, CHAN_SEND,
                            is_blocking ? &select_obs : NULL,
                            &csi ) ) {
      if ( is_blocking ) {
        for ( unsigned i = 0; i < recv_len; ++i )
          chan_remove_obs( recv_chan[i], CHAN_RECV, &select_obs );
      }
      break;
    }

    if ( csi.chans_open == 0 ) {
      rv = EPIPE;
      break;
    }

    // ...
Enter fullscreen mode Exit fullscreen mode

It starts a big do ... while loop and determines which of the given channels might be ready by calling chan_select_init once each for the receive and send channels. The csi variable is of a new data structure:

struct chan_select_init_args {
  unsigned  chans_open;         // Number of channels open.
  unsigned  chans_maybe_ready;  // Subset that may be ready.
  unsigned  ref_len;            // Length of ref array.
};
typedef struct chan_select_init_args chan_select_init_args;
Enter fullscreen mode Exit fullscreen mode

It’s used as a convenience to bundle values updated by chan_select_init so a pointer to one argument can be passed instead of three. Upon return, if chans_open is zero, then all channels are closed so we just break and return.

The implementation of chan_select_init is:

static bool chan_select_init( chan_select_ref ref[],
                              unsigned chan_len,
                              struct chan *chan[chan_len],
                              chan_dir dir, chan_impl_obs *add_obs,
                              chan_select_init_args *csi ) {
  assert( ref != NULL );
  assert( csi != NULL );
  if ( chan_len == 0 )
    return true;
  assert( chan != NULL );

  unsigned i = 0;

  for ( ; i < chan_len; ++i ) {
    bool add_failed = false;
    bool is_ready = false;
    pthread_mutex_lock( &chan[i]->mtx );

    bool const is_hard_closed = chan_is_hard_closed( chan[i], dir );
    if ( !is_hard_closed ) {
      is_ready = chan_is_ready( chan[i], dir );
      if ( add_obs != NULL )
        add_failed = !chan_add_obs( chan[i], dir, add_obs );
    }

    pthread_mutex_unlock( &chan[i]->mtx );

    if ( unlikely( add_failed ) )
      goto remove_already_added;
    if ( is_hard_closed )
      continue;
    ++csi->chans_open;
    if ( is_ready || add_obs != NULL ) {
      ref[ csi->ref_len++ ] = (chan_select_ref){
        .chan = chan[i],
        .dir = dir,
        .param_idx = (unsigned short)i,
        .maybe_ready = is_ready
      };
    }
    if ( is_ready )
      ++csi->chans_maybe_ready;
  } // for

  return true;

remove_already_added:
  for ( unsigned j = 0; j < i; ++j )
    chan_remove_obs( chan[j], dir, add_obs );
  return false;
}
Enter fullscreen mode Exit fullscreen mode

It iterates through the array of channels looking for those that are not closed and determining whether each is ready.

The add_obs parameter is not NULL only for blocking selects. For a blocking select, we need to add its observer via chan_add_obs:

static bool chan_add_obs( struct chan *chan, chan_dir dir,
                          chan_impl_obs *add_obs ) {
  assert( chan != NULL );
  assert( add_obs != NULL );

  chan_impl_link *const new_link = malloc( sizeof( chan_impl_link ) );
  if ( unlikely( new_link == NULL ) )
    return false;
  *new_link = (chan_impl_link){
    .obs = add_obs,
    .next = chan->head_link[ dir ].next
  };
  chan->head_link[ dir ].next = new_link;

  ++chan->wait_cnt[ dir ];
  return true;
}
Enter fullscreen mode Exit fullscreen mode

It also increments wait_cnt since the select will be waiting.

In chan_select_init, the lines:

    if ( is_ready || add_obs != NULL ) {
      ref[ csi->ref_len++ ] = (chan_select_ref){
        .chan = chan[i],
        .dir = dir,
        .param_idx = (unsigned short)i,
        .maybe_ready = is_ready
      };
    }
Enter fullscreen mode Exit fullscreen mode

initialize the chan_select_ref for the current channel.

If, during iteration, chan_add_obs fails (because malloc failed — unlikely, but still possible, so we should check), we have to remove the observer from all the channels it’s already been added as an observer for.

If chan_select_init checks whether channels are ready, why does it return a count of the channels that may be ready? Because even if a channel is ready at the time it’s checked, as soon as its mutex is unlocked, another thread might operate on the channel and make it no longer ready — “maybe” is the best we can do.

In the next part of the chan_select implementation, we check for the degenerate case of only a single channel being open in which case we might as well wait for the timeout for a blocking select:

    struct chan *selected_chan = NULL;
    struct timespec const *select_abs_time = CHAN_NO_WAIT;

    if ( csi.chans_open == 1 ) {        // Degenerate case.
      if ( csi.ref_len > 0 ) {
        selected_ref = ref;
        select_abs_time = abs_time;
      }
    }

    // ...
Enter fullscreen mode Exit fullscreen mode

If multiple channels are open, but none are ready and this is a blocking select, we wait for any one of them to signal our observer:

    else if ( csi.chans_maybe_ready == 0 && is_blocking ) {
      // None of the channels may be ready and we should wait.
      pthread_mutex_lock( &select_mtx );
      select_obs.chan = NULL;
      do {
        if ( pthread_cond_wait_wrapper( &select_obs.chan_ready,
                                        &select_mtx,
                                        abs_time ) == ETIMEDOUT ) {
          rv = ETIMEDOUT;
        }
      } while ( rv == 0 && select_obs.chan == NULL );
      // Copy select_obs.chan to local variable while lock held.
      selected_chan = select_obs.chan;
      pthread_mutex_unlock( &select_mtx );

      if ( rv == 0 ) {  // A channel became ready: find it.
        for ( unsigned i = 0; i < csi.chans_open; ++i ) {
          if ( selected_chan == ref[i].chan ) {
            selected_ref = &ref[i];
            break;
          }
        } // for
        assert( selected_ref != NULL );
      }
    }

    // ...
Enter fullscreen mode Exit fullscreen mode

Otherwise, either some or all channels may be ready, so sort the array putting those first to increase the odds we’ll select a channel that’s ready, and pick one at random:

    else {  // Some or all may be ready: pick one.
      unsigned select_len;
      if ( csi.chans_maybe_ready > 0 && csi.chans_maybe_ready < csi.ref_len ) {
        qsort(
          ref, csi.ref_len, sizeof( chan_select_ref ),
          (qsort_cmp_fn)&chan_select_ref_cmp
        );
        select_len = csi.chans_maybe_ready;
      }
      else {
        // Otherwise, either no or all channels are ready,
        // so there's no need to sort them.
        select_len = csi.chans_open;
      }

      if ( seed == 0 )
        seed = rand_seed( abs_time );
      selected_ref = &ref[ (unsigned)rand_r( &seed ) % select_len ];
    }
Enter fullscreen mode Exit fullscreen mode

where the chan_select_ref_cmp function is:

static int chan_select_ref_cmp( chan_select_ref const *i_csr,
                                chan_select_ref const *j_csr ) {
  // sort maybe_ready (true, aka, 1) before !maybe_ready (false, aka, 0)
  return (int)j_csr->maybe_ready - (int)i_csr->maybe_ready;
}
Enter fullscreen mode Exit fullscreen mode

and the rand_seed function is:

static unsigned rand_seed( struct timespec const *abs_time ) {
  if ( abs_time != CHAN_NO_WAIT && abs_time != CHAN_NO_TIMEOUT )
    return (unsigned)abs_time->tv_nsec;
  struct timespec now;
  CLOCK_GETTIME( CLOCK_REALTIME, &now );
  return (unsigned)now.tv_nsec;
}
Enter fullscreen mode Exit fullscreen mode

The next part of the chan_select implementation is:

    if ( selected_ref != NULL )
      rv = chan_select_io( selected_ref, recv_buf, send_buf, select_abs_time );
Enter fullscreen mode Exit fullscreen mode

where chan_select_io is a helper function (to reduce the length of the already long chan_select function):

static int chan_select_io( chan_select_ref const *ref,
                           void *recv_buf[], void const *send_buf[],
                           struct timespec const *abs_time ) {
  return ref->dir == CHAN_RECV ?
    ref->chan->buf_cap > 0 ?
      chan_buf_recv( ref->chan, recv_buf[ ref->param_idx ], abs_time ) :
      chan_unbuf_recv( ref->chan, recv_buf[ ref->param_idx ], abs_time )
  :
    ref->chan->buf_cap > 0 ?
      chan_buf_send( ref->chan, send_buf[ ref->param_idx ], abs_time ) :
      chan_unbuf_send( ref->chan, send_buf[ ref->param_idx ], abs_time );
}
Enter fullscreen mode Exit fullscreen mode

If we’ve selected a channel, then attempt either to receive from or send to the channel:

  1. If the channel is still ready, we won’t wait.
  2. If there’s only a single channel open, wait for it to become ready.
  3. Otherwise don’t wait: we’ll try another channel.

At this point, either we successfully sent or received on the selected channel, it became no longer ready, or the time-out expired. We need to do a bit of clean-up. The next part of the chan_select implementation is:

    if ( is_blocking ) {
      for ( unsigned i = 0; i < recv_len; ++i )
        chan_remove_obs( recv_chan[i], CHAN_RECV, &select_obs );
      for ( unsigned i = 0; i < send_len; ++i )
        chan_remove_obs( send_chan[i], CHAN_SEND, &select_obs );
    }

    // ...
Enter fullscreen mode Exit fullscreen mode

If we’re a blocking select, we need to remove ourselves from our channels’ list of observers:

static void chan_remove_obs( struct chan *chan, chan_dir dir,
                             chan_impl_obs *remove_obs ) {
  assert( chan != NULL );
  assert( remove_obs != NULL );

  // Whether the channel is closed now doesn't matter since
  // it may have been open when the observer was added.

  pthread_mutex_lock( &chan->mtx );

  chan_impl_link *curr_link = &chan->head_link[ dir ];
  do {
    chan_impl_link *const next_link = curr_link->next;
    assert( next_link != NULL );
    if ( next_link->obs == remove_obs ) {
      curr_link->next = next_link->next;
      free( next_link );
      break;
    }
    curr_link = next_link;
  } while ( curr_link != NULL );

  --chan->wait_cnt[ dir ];
  pthread_mutex_unlock( &chan->mtx );
  assert( curr_link != NULL );
}
Enter fullscreen mode Exit fullscreen mode

The next part of the chan_select implementation ends the big do ... while loop:

  } while ( rv == EAGAIN || (rv == EPIPE && csi.chans_open > 1) );

  // ...
Enter fullscreen mode Exit fullscreen mode

If rv is 0, we succeeded; if ETIMEDOUT, we timed-out. In either of those cases we’re done and shouldn’t try again. However, if rv is:

  • EAGAIN, we selected a channel that isn’t ready (even if it had been ready shortly before);
  • EPIPE, we selected a channel that was open when we originally checked, but closed now.

In either of the latter two cases, we should try again, hence the loop.

The last part of the chan_select implementation is:

  if ( selected_ref == NULL ) {
    if ( rv > 0 )
      errno = rv;
  }
  else {
    rv = selected_ref->dir == CHAN_RECV ?
      CHAN_RECV( selected_ref->param_idx ) :
      CHAN_SEND( selected_ref->param_idx );
  }

  if ( is_blocking ) {
    chan_obs_cleanup( &select_obs );
    pthread_mutex_destroy( &select_mtx );
  }
  if ( ref != stack_ref )
    free( ref );

  return rv;
}
Enter fullscreen mode Exit fullscreen mode

If we didn’t select a channel and rv is positive, set errno; otherwise, set rv to the index into either the chan_recv or chan_send array. Lastly, a bit more clean-up.

Conclusion

The full source code for this implementation, C Chan, is available. If you ignore the syntactic sugar Go provides for dealing with channels, C Chan is perfectly usable in real-world programs.

Special thanks to Steadman Dillroper for his help with code review and finding bugs.

Epilogue

Weren’t there already other Go-like channel libraries written in C? Yes, but the ones I could find that are serious are either incomplete (e.g., don’t support blocking select) or whose code is buggy due to having race conditions.

What about a C++ version? Many C libraries can have trivial C++ wrappers written for them to get full type support (to deal with T* rather than void*) and RAII. Unfortunately, a trivial C++ wrapper can’t be written for C Chan because, in a C++ version, you want messages to be copied as objects of type T using T’s = operator, not memcpy; or you want the option to std::move messages instead. C Chan itself would have to be modified to support alternatives to memcpy.

Top comments (1)

Collapse
 
pgradot profile image
Pierre Gradot

Amazing article! I guess I was a lot of work 😅