Concurrency - Locks

3 minute read

General ideas

  • use hardware to ensure atomic
  • spining is bad for single processor but good for multi-processor

Analysis

key: 1. correctness 2. fairness 3. performance

  • disable interrupts: bad
  • loads and stores: bad
  • flag and return: spin when it’s not your turn, use self and turn = 1 - self
  • test and set: set a new value and return the old value, use while spin
  • compare and swap: comparing with expected, if equals, set to the new value
  • load-linked and store conditions: small changes from load and store, store when no updates
  • fetch and add(ticket lock): fetch old and add 1, when turn is our turn, run.
1
lock->turn == fetchandadd(lock->ticket)

Mutex

1
2
3
lock()
// critical section
unlock()

Conditional variable(CV)

1
2
3
4
5
6
7
8
9
10
11
12
void thread_exit() {
  Mutex_lock(&m);
  done = 1;            // a
  Cond_signal(&c);     // b
  Mutex_unlock(&m);
}
void thread_join() {
  Mutex_lock(&m);      // w
  while (done == 0)    // x
    Cond_wait(&c, &m); // y
  Mutex_unlock(&m);    // z
}

Park/unpark

On Solaris, OS provides two calls:

  • park() to put a calling thread to sleep
  • unpark(threadID) to wake a particular thread as designated by threadID
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
typedef struct __lock_t {
  int flag;
  int guard;
  queue_t *q;
} lock_t;

void lock_init(lock_t *m) {
  m->flag = 0;
  m->guard = 0;
  queue_init(m->q);
}

void lock(lock_t *m) {
  while (TestAndSet(&m->guard, 1) == 1)
    ; //acquire guard lock by spinning
  if (m->flag == 0) {
    m->flag = 1; // lock is acquired
    m->guard = 0;
  } else {
    queue_add(m->q, gettid());
    m->guard = 0;
    park();
  }
}

void unlock(lock_t *m) {
  while (TestAndSet(&m->guard, 1) == 1)
    ; //acquire guard lock by spinning
  if (queue_empty(m->q))
    m->flag = 0; // let go of lock; no one wants it
  else
    // hold lock (for next thread!)
    unpark(queue_remove(m->q));
    m->guard = 0;
}

Implement thread join using CV

1
2
3
4
5
6
7
8
9
10
11
12
void thread_exit() {
  Mutex_lock(&m);
  done = 1;            // a
  Cond_signal(&c);     // b
  Mutex_unlock(&m);
}
void thread_join() {
  Mutex_lock(&m);      // w
  while (done == 0)    // x
    Cond_wait(&c, &m); // y
  Mutex_unlock(&m);    // z
}

Producer and Consumer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
void *producer(int loops) {
  for (int i=0; i < loops; i++){
    Mutex_lock(&m);         //p1
    while (count == max)      //p2
      Cond_wait(&E, &m);    //p3
    put(i);                 //p4
    Cond_signal(&F);        //p5
    Mutex_unlock(&m);       //p6
  }
}

void *consumer(int loops) {
 for (int i=0; i < loops; i++){
    Mutex_lock(&m);       //c1
    while (count == 0)    //c2
      Cond_wait(&F, &m);  //c3
    int tmp = get();      //c4
    Cond_signal(&E);      //c5
    Mutex_unlock(&m);     //c6
    printf("%d\n", tmp);
  }
}

Semaphore

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
sem_init(sem_t *s, int initval) {
  s->value = initval
}
sem_wait(sem_t *s) {
  s->value -= 1
  wait if s->value < 0
}
sem_post(sem_t *s) {
  s->value += 1
  wake one waiting thread (if there are any)
}

typedef struct __lock_t {
  sem_t m;
} lock_t;
void init(lock_t *lock) {
  sem_init(&m, ?);
}
void acquire(lock_t *lock) {
  sem_wait(&m);
}
void release(lock_t *lock) {
  sem_post(&m);
}

Reader-Writer Locks

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
acquire_r (rwlock_t *rw) {
  sem_wait(&rw->lock);
  rw->readers++;
  if (rw->readers == 1)
    sem_wait(&rw->writelock);
  sem_post(&rw->lock);
}

release_r (rwlock_t *rw) {
  sem_wait(&rw->lock);
  rw->readers--;
  if (rw->readers == 0)
    sem_post(&rw->writelock);
  sem_post(&rw->lock);
}

typedef struct _rwlock_t {
  sem_t lock;
  sem_t writelock;
  int readers;
} rwlock_t;

void rwlock_init(rwlock_t *rw) {
  rw->readers = 0;
  sem_init(&rw->lock, 0, 1);
  sem_init(&rw->writelock, 0, 1);
}

acquire_w (rwlock_t *rw) {
  sem_wait (&rw->writelock);
}

release_w (rwlock_t *rw) {
  sem_post (&rw->writelock);
}