Sunday, November 30, 2014

The Freedom from Locks

I'm currently working on a project where I need to (i) cope with very high data rates over a shared memory buffer and (ii) squeeze as much processing power out of the (pretty low power) CPU as I possibly can. Oh, it it's going to have to be multi-threaded.

The system is on a POSIX platform and I experimented with Queues, Shared Memory but non of them gave the performance I needed. One of the big issues is making the application thread safe, and that usually involves locks. Locks are expensive and are, on most compilers and CPU targets, pretty slow. There is an interesting article here comparing performance. Locks are costly to acquire and release, and there's always the potentially for deadlock and contention scenarios.

So, I looked to adopting a Lock Free Ring / Circular Buffer approach. The challenge is to create a data structure and algorithm that allows multiple threads pass data between them concurrently without invoking any locks.

The diagram below shows the structure of a Ring Buffer.


This structure is often used in the classic multi threaded Producer / Consumer problem, but there are potential concurrency problems with this structure:
  • The Producer must find free space in the queue
  • The Consumer must find the next item
  • The Producer must be able to tell the queue is full
  • The Consumer must be able to tell if the queue is empty
All these operations could incur a contention / lock issue. So how to you get around this?

Firstly you need to define a static fixed global data structure with free running counters:
static volatile sig_atomic_t tail=0;
static volatile sig_atomic_t head=0;
static char buffer[CAPACITY];
static int mask=CAPACITY-1;
The Volatile keyword in C/C++ tells the compiler that the value held by the variable can be modified by another thread, process, or even external device. In fact the Volatile keyboard is often used to detect data from an external piece of hardware that uses memory mapped i/o.

sig_atomic_t is an Integer data type that tells the compiler to ensure that the variable is not partially written or partially read in the presence of asynchronous interrupts. Essentially used for signal handling in multi threaded / process contexts.

The combination of a volatile sig_atomic_t integer is that it creates a inter process thread safe signal handling variable that ensures any operation completes in a single CPU cycle that cannot be interrupted.

So, we've declared tail and head as our read and write positions in our circular buffer. Now, how do we insert a piece of data into the ring buffer?
int Offer(char e) {
 int currentTail;
 int wrapPoint;

 currentTail=tail;
 wrapPoint=currentTail-CAPACITY;
 if (head<=wrapPoint) {
  return 0;
 }
 buffer[currentTail % mask]=e;
 tail=currentTail+1;
 return 1;
}
Now lets retrieve a byte from the buffer.
char Poll(void) {
 char e;
  int i;
 int currentHead;

 currentHead=head;
 if (currentHead>=tail) {
  return 0;
 }
 i=currentHead & mask;
 e=buffer[i];
 head=currentHead+1;
 return e;
}
Now, here's Producer / Consumer thread that invokes our lock free buffer.
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <sys/time.h>
#include "LockFreeBuffer.h"

#define REPETITIONS 10000
#define TEST_VALUE 65
#define ITERATIONS 256
#define BILLION 1E9

void *Producer(void *arg);
void *Consumer(void *arg);

int main(void)
{
 int i,t;
 double secs;
 struct timespec start,stop;
 pthread_t mythread[2];

 printf("Starting...\n");
 for (i=0;i<ITERATIONS;i++) {
  Init();
  clock_gettime(CLOCK_PROCESS_CPUTIME_ID,&start);
  if (pthread_create(&mythread[0],NULL,Producer,NULL)) {
   perror("Thread failed to create...\n");
   exit(-1);
  }
  if (pthread_create(&mythread[1],NULL,Consumer,NULL)) {
   perror("Thread failed to create...\n");
   exit(-1);
  }
  for(t=0;t<2;t++) {
   if (pthread_join(mythread[t],NULL)) {
    perror("Thread failed to join...\n");
    exit(-1);
   }
  }
  clock_gettime(CLOCK_PROCESS_CPUTIME_ID,&stop);
  secs=(stop.tv_sec-start.tv_sec)+((stop.tv_nsec-start.tv_nsec)/BILLION);
  printf("Operations/sec: %4.0f\n",REPETITIONS/secs);
 }
 printf("Completed...\n");
 exit(0);
}

void *Producer(void *arg)
{
 int i;

 i=REPETITIONS;
 do {
  while (!Offer(TEST_VALUE));
 } while (0!=--i);
 pthread_exit(NULL);
}

void *Consumer(void *arg)
{
 char buf;
 int i;

 i=REPETITIONS;
 buf=0;
 do {
  while (buf=Poll());
 } while (0!=--i);
 pthread_exit(NULL);
}
If you're interested in more on lock free buffers, check out the LMAX Distruptor for Java

No comments:

Post a Comment