Concurrency - Locks
3 minute read
General ideas
- use hardware to ensure atomic
- spining is bad for single processor but good for multi-processor
Analysis
key: 1. correctness 2. fairness 3. performance
- disable interrupts: bad
- loads and stores: bad
- flag and return: spin when it’s not your turn, use self and turn = 1 - self
- test and set: set a new value and return the old value, use while spin
- compare and swap: comparing with expected, if equals, set to the new value
- load-linked and store conditions: small changes from load and store, store when no updates
- fetch and add(ticket lock): fetch old and add 1, when turn is our turn, run.
1
| lock->turn == fetchandadd(lock->ticket)
|
Mutex
1
2
3
| lock()
// critical section
unlock()
|
Conditional variable(CV)
1
2
3
4
5
6
7
8
9
10
11
12
| void thread_exit() {
Mutex_lock(&m);
done = 1; // a
Cond_signal(&c); // b
Mutex_unlock(&m);
}
void thread_join() {
Mutex_lock(&m); // w
while (done == 0) // x
Cond_wait(&c, &m); // y
Mutex_unlock(&m); // z
}
|
Park/unpark
On Solaris, OS provides two calls:
- park() to put a calling thread to sleep
- unpark(threadID) to wake a particular thread as designated by threadID
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
| typedef struct __lock_t {
int flag;
int guard;
queue_t *q;
} lock_t;
void lock_init(lock_t *m) {
m->flag = 0;
m->guard = 0;
queue_init(m->q);
}
void lock(lock_t *m) {
while (TestAndSet(&m->guard, 1) == 1)
; //acquire guard lock by spinning
if (m->flag == 0) {
m->flag = 1; // lock is acquired
m->guard = 0;
} else {
queue_add(m->q, gettid());
m->guard = 0;
park();
}
}
void unlock(lock_t *m) {
while (TestAndSet(&m->guard, 1) == 1)
; //acquire guard lock by spinning
if (queue_empty(m->q))
m->flag = 0; // let go of lock; no one wants it
else
// hold lock (for next thread!)
unpark(queue_remove(m->q));
m->guard = 0;
}
|
Implement thread join using CV
1
2
3
4
5
6
7
8
9
10
11
12
| void thread_exit() {
Mutex_lock(&m);
done = 1; // a
Cond_signal(&c); // b
Mutex_unlock(&m);
}
void thread_join() {
Mutex_lock(&m); // w
while (done == 0) // x
Cond_wait(&c, &m); // y
Mutex_unlock(&m); // z
}
|
Producer and Consumer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
| void *producer(int loops) {
for (int i=0; i < loops; i++){
Mutex_lock(&m); //p1
while (count == max) //p2
Cond_wait(&E, &m); //p3
put(i); //p4
Cond_signal(&F); //p5
Mutex_unlock(&m); //p6
}
}
void *consumer(int loops) {
for (int i=0; i < loops; i++){
Mutex_lock(&m); //c1
while (count == 0) //c2
Cond_wait(&F, &m); //c3
int tmp = get(); //c4
Cond_signal(&E); //c5
Mutex_unlock(&m); //c6
printf("%d\n", tmp);
}
}
|
Semaphore
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
| sem_init(sem_t *s, int initval) {
s->value = initval
}
sem_wait(sem_t *s) {
s->value -= 1
wait if s->value < 0
}
sem_post(sem_t *s) {
s->value += 1
wake one waiting thread (if there are any)
}
typedef struct __lock_t {
sem_t m;
} lock_t;
void init(lock_t *lock) {
sem_init(&m, ?);
}
void acquire(lock_t *lock) {
sem_wait(&m);
}
void release(lock_t *lock) {
sem_post(&m);
}
|
Reader-Writer Locks
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
| acquire_r (rwlock_t *rw) {
sem_wait(&rw->lock);
rw->readers++;
if (rw->readers == 1)
sem_wait(&rw->writelock);
sem_post(&rw->lock);
}
release_r (rwlock_t *rw) {
sem_wait(&rw->lock);
rw->readers--;
if (rw->readers == 0)
sem_post(&rw->writelock);
sem_post(&rw->lock);
}
typedef struct _rwlock_t {
sem_t lock;
sem_t writelock;
int readers;
} rwlock_t;
void rwlock_init(rwlock_t *rw) {
rw->readers = 0;
sem_init(&rw->lock, 0, 1);
sem_init(&rw->writelock, 0, 1);
}
acquire_w (rwlock_t *rw) {
sem_wait (&rw->writelock);
}
release_w (rwlock_t *rw) {
sem_post (&rw->writelock);
}
|