Dissect CS - Semaphore

Hyoseop SongHyoseop Song
9 min read

동기화란? 작업 상태를 “상호 공유” in Multi process (or thread)

Synchronization Mechanisms

Semaphores, Mutex lock is typical usage for synchronize

세마포어 값(사용 가능한 공유 자원의 수)을 조작하는 로직

  • 자원이 할당되는 시점까지의 세마포어 조작 로직

producer -> wait(empty) -> wait(mutex) -> produce -> post(mutex) -> post(full)

  • 자원을 사용하고 재 할당 가능한 상태로 되돌리는 세마포어 조작 로직

consumer -> wait(full) -> wait(mutex) -> consume -> post(mutex) -> post(empty)

Race, busyWait : 동기화 미적용 시 발생할 문제

  1. Race Contidion (경쟁 조건 문제) | 모든 Shared memory (queue, list, buffer, cache ,,,)
  • 두 개의 스레드가 Shared memory에 동시 접근하는 상황

    Context Switch 실행 순서의 기준이 불명확하여 작업이 꼬이는 케이스 :

    스레드끼리 경쟁 구도를 띠며 매번 다른 결과를 낳음

    실행되는 코드를 저장하는 Code segment를 하나만 두고, 접근 권한에 대한 기준을 둔다면 Critical section(꼬이는 부분에서는 shared data를 1개만 가지도록 해서 다중 접근을 막음)을 해소할 수 있음

  • 경쟁 구도가 형성되면 아래와 같이 매번 기대값과 다른 결과로 진행되고, 프로세스(or 스레드)간 공유 자원을 활용하지 못하게 됨

2. Busy waiting (무한 대기 문제)

  • 특정 조건이 충족될 때까지 다른 프로세스나 스레드가 대기하는 경우에 발생
  • CPU를 계속 사용하므로 다른 작업에 대한 리소스를 활용하지 못함

    • Spinlock으로 동기화를 구현하면 대기중인 스레드가 계속 cpu의 메모리 사용상태를 체크해야 하고, 이게 다중 스레드로 걸리면 과도한 리소스 소모로 발생 = busy waiting

    • Solution : Sleeping , Blocking

      (대기 상태 전환 시 CPU를 다른 작업이 점유하고, 조건이 충족될 때까지 대기 큐에 할당한다)

Semaphore essential point

System V : Process, POSIX : Thread

세마포어의 주요 연산 : P(Produce | Wait) & V(Consume | Signal)

세마포어 값 : 사용 가능한 공유자원의 개수

(= shared memory에 있는 자원중에 접근해서 읽고쓰기 가능한 칸수)

예를 들어, 세마포어 값이 특정 개수의 자원을 나타내면, 해당 세마포어를 통해 동시에 사용 가능한 자원의 수를 조절

Resource status by semaphore value

  • 양수인 경우, value만큼 사용 가능한 자원이 존재함을 표현한다.

  • 0보다 작은 경우, P 연산을 수행하는 프로세스나 스레드는 대기 상태로 전환될 수 있다.

    • 이로 인해 다른 대기 중인 프로세스나 스레드가 하나씩 깨어나게 됩니다.
  • 0인 경우, 사용 가능한 자원이 없음을 나타낸다.

    • P 연산을 시도하는 주체 프로세스나 스레드는 다시 대기 상태로 전환됩니다.

P 연산 (Wait 또는 Produce):

  • P 연산은 세마포어 값을 1 감소시킴 (주체가 점유하려는 요청)

    • 0 이상 정수 : 세마포어 값을 1 감소시키고 계속 진행합니다.

    • 음수 : 요청한 주체는 대기 상태로 들어가며 잠금을 얻을 때까지 기다립니다.

  • V 연산 (Signal 또는 Consume):

    • V 연산은 세마포어 값을 1 증가시킴 (공유메모리 다 써서 반납)

    • 대기 중인 프로세스나 스레드 중 하나를 running 상태로 바꾸고 실행함

    • 음수 : 대기 중인 프로세스나 스레드가 없으므로 그냥 값을 1 올림

System V semaphore : Process 간 세마포어 방식

- system call : semget, semctl 구현

semctl : 세마포어의 생성, 제거, 조회, 상태 변경을 수행하는 시스템 콜

#include <sys/sem.h>
int semctl(int semid, int semnum, int cmd, ...);
  • semid: 세마포어 집합을 식별하는 식별자 (세마포어 세트 ID).

  • semnum: 세마포어 집합 내의 세마포어 번호 (0부터 시작).

  • cmd: 수행할 제어 명령.

  1. IPC_RMID:

    • 세마포어 집합을 제거합니다. 해당 세마포어 집합이 소멸되며, 연결된 모든 자원이 해제됩니다.
  2. IPC_SET:

    • 세마포어의 속성을 설정합니다. 세마포어의 초기 값이나 권한을 변경할 수 있습니다.
  3. IPC_STAT:

    • 세마포어의 상태 정보를 얻습니다. 현재 세마포어의 속성 및 상태를 확인할 수 있습니다.
  4. GETALL, SETALL:

    • 모든 세마포어의 값을 배열로 얻거나 설정합니다.
  • semctl 함수는 실패할 경우 -1을 반환함 IF (RETURN VALUE < 0)

  • 프로세스 간 통신(IPC)에서 세마포어를 조작할 때 이용됨.

POSIX semaphore : thread 간 세마포어 방식

  • user-defined semlib.h library using Sys-V(POSIX like)
#include <stdio.h>
#include "semlib.h"

int semInit(key_t key) // key에 해당하는 세마포어 생성
{
    int semid;

    if ((semid = semget(key, 1, 0600 | IPC_CREAT)) < 0)
    { // make 1. only me
        perror("semget");
        return -1;
    }

    return semid;
}

// semInitValue : 세마포어 값 초기화
// semctl : 세마포어 제어(초기화, 값 얻기, 제거 등) - 세마포어 만들면 초기화해야 되니까 무조건 같이 쓰는 함수
// 세마포어의 값? : 공유할 자원의 개수
int semInitValue(int semid, int value)
{
    union semun
    {
        int val;
    } semun;

    semun.val = value;
    if (semctl(semid, 0, SETVAL, semun) < 0)
    { // set value 로 값 초기화
        perror("semctl");
        return -1;
    }

    return semid;
}

// semWait : 세마포어 값이 0보다 크면 -1을 하고, 0이면 대기
// wait 연산 : 자원을 쓰고 있으면 대기, 쓰고 있지 않으면 쓰기 시작
// semop(semid, semcmd, nsems) : semid에 해당하는 세마포어에 semcmd에 해당하는 연산을 nsems개 실행
int semWait(int semid)
{
    struct sembuf semcmd;

    semcmd.sem_num = 0;
    semcmd.sem_op = -1; // wait : -1
    semcmd.sem_flg = SEM_UNDO;
    if (semop(semid, &semcmd, 1) < 0)
    {
        perror("semop");
        return -1;
    }

    return 0;
}

// semTryWait : 세마포어 값이 0보다 크면 -1을 하고, 0이면 대기하지 않고 바로 리턴
// 약간 semWait와 비슷한데, semWait는 대기를 하고 semTryWait는 대기를 하지 않는다는 차이가 있음
int semTryWait(int semid)
{
    struct sembuf semcmd;

    semcmd.sem_num = 0;
    semcmd.sem_op = -1;
    semcmd.sem_flg = IPC_NOWAIT | SEM_UNDO;
    if (semop(semid, &semcmd, 1) < 0)
    {
        perror("semop");
        return -1;
    }

    return 0;
}

// semPost : 세마포어 값에 1을 더함 (signal)
// signal 연산 : 자원을 쓰고 있으면 쓰기 종료, 쓰고 있지 않으면 대기
int semPost(int semid) // signal. +1
{
    struct sembuf semcmd;

    semcmd.sem_num = 0;
    semcmd.sem_op = 1;           //+1
    semcmd.sem_flg = SEM_UNDO; // 다쓰고반납
    if (semop(semid, &semcmd, 1) < 0)
    {
        perror("semop");
        return -1;
    }

    return 0;
}

// semGetValue : 세마포어 값 얻기
// semctl 써서 세마포어의 현재값을 얻어옴 (공유할 자원의 개수)
int semGetValue(int semid)
{
    union semun
    {
        int val;
    } dummy;

    return semctl(semid, 0, GETVAL, dummy); // 현재값 리턴
}

// semDestroy : 세마포어 제거 (IPC_RMID)
// semctl 써서 IPC_RMID로 세마포어 제거 (IPC_RMID : 세마포어 제거하는 플래그)
int semDestroy(int semid)
{
    union semun
    {
        int val;
    } dummy;

    if (semctl(semid, 0, IPC_RMID, dummy) < 0)
    {
        perror("semctl");
        return -1;
    }
    close(semid);

    return 0;
}

Producer / Consumer using Semaphore

Remind multi thread resource flow

  • 할당 흐름 in producer_s.c

    producer -> wait(empty) -> wait(mutex) -> produce -> post(mutex) -> post(full)

  • 자원 사용 흐름 in consumer_s.c

    consumer -> wait(full) -> wait(mutex) -> consume -> post(mutex) -> post(empty)

  • Producer_s.c implement

      #include <stdio.h>
      #include <stdlib.h>
      #include <unistd.h>
      #include <sys/types.h>
      #include <sys/ipc.h>
      #include <sys/shm.h>
      #include "semlib.h"
      #include "prodcons.h"
    
      main()
      {
          BoundedBufferType *pBuf;
          int shmid, i, data;
          int emptySemid, fullSemid, mutexSemid;
          // shmget : shared memory를 생성하고 id를 반환
          // shmget(key, size, flag) : key로 식별되는 공유 메모리 생성
          // key : 공유 메모리를 식별하는 key
          // size : 공유 메모리의 크기
          // flag : 공유 메모리의 접근 권한
          if ((shmid = shmget(SHM_KEY, SHM_SIZE, SHM_MODE)) < 0)
          {
              perror("shmget");
              exit(1);
          }
          if ((pBuf = (BoundedBufferType *)shmat(shmid, 0, 0)) == (void *)-1)
          {
              perror("shmat");
              exit(1);
          }
          // semInit : semaphore를 생성하고 id를 반환 - semget 함수를 호출
          // EMPTY_SEM_KEY, FULL_SEM_KEY, MUTEX_SEM_KEY : semaphore를 식별하는 key
          // EMPTY_SEM_KEY : 생산자가 생산한 데이터를 저장할 수 있는 공간의 개수
          // FULL_SEM_KEY : 소비자가 소비할 데이터가 저장된 공간의 개수
          // MUTEX_SEM_KEY : 생산자와 소비자가 동시에 접근하지 못하도록 하는 mutex
    
          // 1. semaphore 쓰려고 공유 메모리 각각 만듦
          if ((emptySemid = semInit(EMPTY_SEM_KEY) /* semInit */) < 0)
          {
              fprintf(stderr, "semInit failure\\n");
              exit(1);
          }
          if ((fullSemid = semInit(FULL_SEM_KEY) /* semInit */) < 0)
          {
              fprintf(stderr, "semInit failure\\n");
              exit(1);
          }
          if ((mutexSemid = semInit(MUTEX_SEM_KEY) /* semInit */) < 0)
          {
              fprintf(stderr, "semInit failure\\n");
              exit(1);
          }
    
          // 2. 공유 메모리 쓰고 다른 거에 락 걸려고 wait 연산함 -> 세마포어 값 minus
          srand(0x8888);
          for (i = 0; i < NLOOPS; i++)
          {
              if (semWait(emptySemid) /* semWait */ < 0)
              {
                  fprintf(stderr, "semWait failure\\n");
                  exit(1);
              }
              if (semWait(mutexSemid) /* semWait */ < 0)
              {
                  fprintf(stderr, "semWait failure\\n");
                  exit(1);
              }
    
              printf("Producer: Producing an item.....\\n");
              data = (rand() % 100) * 10000;
              pBuf->buf[pBuf->in].data = data;
              pBuf->in = (pBuf->in + 1) % MAX_BUF;
              pBuf->counter++;
    
              // 3. 다 쓰고 나면 반납하므로 signal 연산 -> 값 증가하고 다시 접근가능한 상태
              if (semPost(mutexSemid) /* semPost */ < 0)
              {
                  fprintf(stderr, "semPost failure\\n");
                  exit(1);
              }
              if (semPost(fullSemid) /* semPost */ < 0)
              {
                  fprintf(stderr, "semPost failure\\n");
                  exit(1);
              }
              usleep(data);
          }
    
          printf("Producer: Produced %d items.....\\n", i);
    
          sleep(2);
          printf("Producer: %d items in buffer.....\\n", pBuf->counter);
    
          // 4. 모든 처리 끝났으므로 공유 메모리도 아예 할당 해제함
          if (semDestroy(emptySemid) /* semDestroy */ < 0)
          {
              fprintf(stderr, "semDestroy failure\\n");
          }
          if (semDestroy(fullSemid) /* semDestroy */ < 0)
          {
              fprintf(stderr, "semDestroy failure\\n");
          }
          if (semDestroy(mutexSemid) /* semDestroy */ < 0)
          {
              fprintf(stderr, "semDestroy failure\\n");
          }
          if (shmctl(shmid, IPC_RMID, 0) < 0)
          {
              perror("shmctl");
              exit(1);
          }
      }
    
  • Consumer_s.c implement

      #include <stdio.h>
      #include <stdlib.h>
      #include <unistd.h>
      #include <sys/types.h>
      #include <sys/ipc.h>
      #include <sys/shm.h>
      #include "semlib.h"
      #include "prodcons.h"
    
      // 자원 사용 흐름 : consumer -> wait(full) -> wait(mutex) -> consume -> post(mutex) -> post(empty) in consumer_s.c
      main()
      {
          BoundedBufferType *pBuf;
          int shmid, i, data;
          int emptySemid, fullSemid, mutexSemid;
    
          if ((shmid = shmget(SHM_KEY, SHM_SIZE, SHM_MODE)) < 0)
          {
              perror("shmget");
              exit(1);
          }
          // 0. shmat(shmid, shmaddr, shmflg) : shmid에 해당하는 공유 메모리를 호출 프로세스의 가상 메모리 공간에 붙임
          if ((pBuf = (BoundedBufferType *)shmat(shmid, 0, 0)) == (void *)-1)
          {
              perror("shmat");
              exit(1);
          }
    
          // 1. semInit 로 세마포어 3개 생성
          if ((emptySemid = semInit(EMPTY_SEM_KEY) /* semInit */) < 0)
          {
              fprintf(stderr, "semInit failure\\n");
              exit(1);
          }
          if ((fullSemid = semInit(FULL_SEM_KEY) /* semInit */) < 0)
          {
              fprintf(stderr, "semInit failure\\n");
              exit(1);
          }
          if ((mutexSemid = semInit(MUTEX_SEM_KEY) /* semInit */) < 0)
          {
              fprintf(stderr, "semInit failure\\n");
              exit(1);
          }
    
          // 2. semInitValue 로 세마포어 값 초기화
          // (emptySemid = MAX_BUF, fullSemid = 0, mutexSemid = 1)
          // emptySemid : 생산자가 생산한 데이터를 저장할 수 있는 공간의 개수
          // fullSemid : 소비자가 소비할 데이터가 저장된 공간의 개수
          // mutexSemid : 생산자와 소비자가 동시에 접근하지 못하도록 하는 mutex
          if (semInitValue(emptySemid, MAX_BUF) /* semInitValue */ < 0)
          {
              fprintf(stderr, "semInitValue failure\\n");
              exit(1);
          }
          if (semInitValue(fullSemid, 0) /* semInitValue */ < 0)
          {
              fprintf(stderr, "semInitValue failure\\n");
              exit(1);
          }
          if (semInitValue(mutexSemid, 1) /* semInitValue */ < 0)
          {
              fprintf(stderr, "semInitValue failure\\n");
              exit(1);
          }
    
          // 3. 소비자가 소비할 데이터가 저장된 공간의 개수가 0이 될 때까지 반복
          // 3-1. semWait(full) : 소비자가 소비할 데이터가 저장된 공간의 개수가 0이면 대기
          // 3-2. semWait(mutex) : 생산자와 소비자가 동시에 접근하지 못하도록 하는 mutex
          // 3-3. consume : 소비
          // 3-4. semPost(mutex) : 다 썼으니까 mutex를 다시 1로 만들어서 생산자가 생산할 수 있도록 signal 보냄
          srand(0x9999);
          for (i = 0; i < NLOOPS; i++)
          {
              if (semWait(fullSemid) /* semWait */ < 0)
              {
                  fprintf(stderr, "semWait failure\\n");
                  exit(1);
              }
              if (semWait(mutexSemid) /* semWait */ < 0)
              {
                  fprintf(stderr, "semWait failure\\n");
                  exit(1);
              }
              printf("Consumer: Consuming an item.....\\n");
              data = pBuf->buf[pBuf->out].data;
              pBuf->out = (pBuf->out + 1) % MAX_BUF;
              pBuf->counter--;
    
              if (semPost(mutexSemid) /* semPost */ < 0)
              {
                  fprintf(stderr, "semPost failure\\n");
                  exit(1);
              }
              if (semPost(emptySemid) /* semPost */ < 0)
              {
                  fprintf(stderr, "semPost failure\\n");
                  exit(1);
              }
    
              usleep((rand() % 100) * 10000);
          }
    
          printf("Consumer: Consumed %d items.....\\n", i);
          printf("Consumer: %d items in buffer.....\\n", pBuf->counter);
      }
    

This article is referred to <Linux System Programming> lecture, Lecture (khu.ac.kr)

0
Subscribe to my newsletter

Read articles from Hyoseop Song directly inside your inbox. Subscribe to the newsletter, and don't miss out.

Written by

Hyoseop Song
Hyoseop Song