0%

HIT-OSLab5

HIT-OSLab5

实验目的:

  • 加深对进程同步与互斥概念的认识
  • 掌握信号量的实现原理两种(不同的实现方式)
  • 掌握信号量的使用,并应用它解决生产者-消费者问题

实验内容:

  • 在 Linux-0.11 中实现信号量,具体来说就是实现如下4个系统调用:
    • sys_sem_open:用于打开一个信号量文件描述符
    • sys_sem_wait:等待一个信号量的释放
    • sys_sem_post:释放一个信号量
    • sys_sem_unlink:删除一个信号量
  • 在 Ubuntu 下编写程序,用已经实现的信号量解决生产者-消费者问题

实验过程

进程同步与互斥是操作系统中实现进程间通信和同步的基本机制,它们的主要目的是确保在多个进程之间共享资源时,能够正确地保持同步状态,避免竞争条件和数据不一致等问题

  • 进程同步是指在多个进程之间同步数据访问,确保同一时刻只有一个进程可以访问共享资源
  • 进程互斥是指在多个进程之间同步对共享资源的互斥访问,确保同一时刻只有一个进程可以访问共享资源

进程同步和互斥可以通过不同的同步原语来实现,例如:锁(Lock)、信号量(Semaphore)、条件变量(ConditionVariable)

  • 锁:有两种实现方式,基于原子操作和基于信号量
  • 信号量:核心结构为一个整数和一个等待队列
  • 条件变量:在满足特定条件时通知进程,使其可以访问共享资源

PV 操作是指进程之间的同步原语,用于实现进程之间的同步和通信(PV 操作是 Posix 信号量的一种实现方式)

  • P操作:信号量 —,判断是不是要阻塞
  • V操作:信号量 ++,判断是不是要唤醒

PV 操作的实现需要保证操作的原子性,而保证原子性有很多方法:

  • 这里不采用软件保护法(比如:轮换法 \ 标记法 \ peterson 算法 \ Lamport 面包店算法),而是采用硬件保护法
  • 由于是 linux-0.11 运行在单核 cpu 上(Bochs 虚拟机提供单核 cpu 环境),所以可以采用简单的开关中断的方法
  • 如果是多 cpu 环境,就使用硬件原子指令保护法(用硬件原子指令操控一个 mutex 信号量来保护临界区)

开关中断的实现需要依赖如下函数:

1
2
#define sti() __asm__ ("sti"::) // 开中断
#define cli() __asm__ ("cli"::) // 关中断

基于 PV 操作实现信号量的模板如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
P(){ 
cli();
value--; // 信号量--
if(value < 0){
schedual(); // 调度其他进程
}
sti();
}

V(){
cli();
value++; // 信号量++
if(value <= 0){
wakeup(); // 唤醒等待队列中的进程
}
sti();
}
  • 信号量 > 0 时,信号量代表临界区中拥有的资源数目
  • 信号量 < 0 时,信号量代表等待队列中的进程数目

本实验要求我们实现信号量,定义信号量的数据结构如下:(在 /include/unistd.h 中)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
#define QUE_LEN 16

struct semaphore_queue{
int front;
int rear;
struct task_struct *wait_tasks[QUE_LEN];
};
typedef struct semaphore_queue sem_queue;

struct semaphore_t{
int value; /* 信号量计数器 */
char name[20]; /* 信号量名称 */
struct semaphore_queue wait_queue; /* 等待队列 */
};
typedef struct semaphore_t sem_t;
  • wait_tasks[QUE_LEN] 为静态列表,索引 front / rear 分别表示其头部 / 尾部的位置

首先我们需要添加信号量的系统调用号:(在 /include/unistd.h 中)

1
2
3
4
#define __NR_sem_open 	72
#define __NR_sem_wait 73
#define __NR_sem_post 74
#define __NR_sem_unlink 75
  • 需要注意的是:这里同时需要修改 hdc-0.11.img 中的 hdc/usr/include/unistd.h 文件,如果想在虚拟机中使用 gcc 编译的话,会导入虚拟机 hdc/usr/include/ 中的文件为头文件

接着修改系统调用号的总数:(在 /kernel/system_call.s 中)

1
nr_system_calls = 76

最后添加新的系统调用定义:(在 /include/linux/sys.h 中)

1
2
3
4
5
6
extern int sys_sem_open();
extern int sys_sem_wait();
extern int sys_sem_post();
extern int sys_sem_unlink();

fn_ptr sys_call_table[] = {......,sys_sem_open,sys_sem_wait,sys_sem_post,sys_sem_unlink};

头文件以及全局变量:(在 /kernel 中新建文件 sem.c

1
2
3
4
5
6
7
8
9
#define __LIBRARY__  
#include <unistd.h>
#include <linux/sched.h>
#include <linux/kernel.h>
#include <asm/segment.h>
#include <asm/system.h>

#define SEM_COUNT 32
sem_t semaphores[SEM_COUNT];

基础字符串函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
int strcmp_sem(char* name,char* tmp){
int i;
for(i = 0; i<20; i++){
if(name[i] != tmp[i])
return 0;
if(tmp[i] =='\0' && name[i] == '\0') break;
}
return 1;
}

int strcpy_sem(char* name,char* tmp){
int i;
for(i = 0; i<20; i++){
name[i] = tmp[i];
if(tmp[i] =='\0') break;
}
return i;
}

插入 / 获取 task_struct 结构体:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
struct task_struct * get_task(sem_t* q)
{
if(q->wait_queue.front == q->wait_queue.rear) {
printk("Queue is empty!\n");
return NULL;
}
struct task_struct *tmp = q->wait_queue.wait_tasks[q->wait_queue.front];
q->wait_queue.front = (q->wait_queue.front+1)%QUE_LEN;
return tmp;
}

int insert_task(struct task_struct *p,sem_t* q)
{
if((q->wait_queue.rear+1)%QUE_LEN == q->wait_queue.front){
printk("Queue is full!\n");
return -1;
}
q->wait_queue.wait_tasks[q->wait_queue.rear] = p;
q->wait_queue.rear = (q->wait_queue.rear+1)%QUE_LEN;
return 1;
}
  • 使用静态队列 FIFO

系统调用 sys_sem_open:打开一个信号量文件描述符

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
int sys_sem_open(const char* name,unsigned int value)
{
char tmp[20];
char c;
int i;

for( i = 0; i<20; i++){
c = get_fs_byte(name+i);
tmp[i] = c;
if(c =='\0') break;
}
if(c >= 20) {
printk("Semaphore name is too long!");
return -1;
}

for(i = 0; i < SEM_COUNT; i++){
if(strcmp_sem(&semaphores[i].name,tmp)){
printk("sem %s is exist\n", semaphores[i].name);
return i;
}
}
for(i = 0; i < SEM_COUNT; i++){
if(semaphores[i].name[0] == '\0'){
strcpy_sem(semaphores[i].name,tmp);
semaphores[i].value = value;
semaphores[i].wait_queue.front = 0;
semaphores[i].wait_queue.rear = 0;
printk("create sem %s done\n",tmp);
return i;
}
}
printk("Numbers of semaphores are limited!\n");
return -1;
}

系统调用 sys_sem_wait:P 操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
int sys_sem_wait(int i){
cli();
if(i < 0 || i > SEM_COUNT){
sti();
printk("sem (P) error\n");
return -1;
}
sem_t* sem = &semaphores[i];
sem->value--;
if(sem->value < 0){
current->state = TASK_UNINTERRUPTIBLE;
insert_task(current,sem);
schedule();
}
sti();
return 0;
}

系统调用 sys_sem_post:V 操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
int sys_sem_post(int i)
{
cli();
if(i < 0 || i > SEM_COUNT){
sti();
printk("sem (V) error\n");
return -1;
}
sem_t* sem = &semaphores[i];
struct task_struct *p;
sem->value++;
if(sem->value <= 0){
p = get_task(sem);
if(p != NULL){
p->state = TASK_RUNNING;
}
}
sti();
return 0;
}

系统调用 sys_sem_unlink:删除一个信号量文件描述符

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
int sys_sem_unlink(const char *name)  
{
char tmp[20];
char c;
int i,j;
for( i = 0; i<20; i++){
c = get_fs_byte(name+i);
tmp[i] = c;
if(c =='\0') break;
}
if(c >= 20) {
printk("Semphore name is too long!");
return -1;
}

for(i = 0;i< SEM_COUNT; i++){
if(strcmp(semaphores[i].name,tmp) == 0){
printk("sem %s is unlinked\n",semaphores[i].name);
semaphores[i].name[0] = '\0';
semaphores[i].value = 0;
semaphores[i].wait_queue.front = 0;
semaphores[i].wait_queue.rear = 0;
for(j = 0;j<QUE_LEN; j++){
semaphores[i].wait_queue.wait_tasks[j] = NULL;
}
return 0;
}
}
return -1;
}

修改 makefile:

1
2
3
4
5
OBJS  = sched.o system_call.o traps.o asm.o fork.o \
panic.o printk.o vsprintf.o sys.o exit.o \
signal.o mktime.o sem.o

sem.s sem.o: sem.c ../include/linux/kernel.h ../include/unistd.h

实验的最后我们需要使用信号量来解决生产者-消费者问题

消费者模型是一种用于描述消费者行为和系统性能的模型:系统中有一组生产者进程和一组消费者进程,生产者进程每次生产一个产品放入缓冲区,消费者每次从缓冲区中取出一个产品并使用,生产者、消费者共享一个初始为空、大小为 n 的缓冲区

  • 只有缓冲区没满时,生产者才能把产品放入缓冲区,否则必须等待
  • 只有缓冲区不为空时,消费者才能从中取出产品,否则必须等待
  • 缓冲区是临界资源,各进程必须互斥的访问

生产者-消费者模型代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
#define   __LIBRARY__
#include <unistd.h>
#include <sys/types.h>
#include <fcntl.h>
#include <stdio.h>
#include <stdlib.h>

_syscall2(int,sem_open,const char *,name,unsigned int,value);
_syscall1(int,sem_wait,int,sem);
_syscall1(int,sem_post,int,sem);
_syscall1(int,sem_unlink,const char *,name);

const int consumerNum = 3;
const int itemNum = 6;
const int bufSize = 10;
int buf_in = 0,buf_out = 0;

int main()
{
int sem_empty, sem_full, sem_mutex;
int stat;
pid_t p;
int i,j,k,fd;

if((sem_empty = sem_open("empty",1)) < 0){
perror("empty error!\n");
return -1;
}

if((sem_full = sem_open("full",0)) < 0){
perror("full error!\n");
return -1;
}

if((sem_mutex = sem_open("mutex",10)) < 0){
perror("mutex error!\n");
return -1;
}

fd = open("buffer.dat", O_CREAT | O_RDWR | O_TRUNC, 0666);

if(!(p = fork())){
printf("A(%d) create\n",0);
for(i = 0; i < itemNum; i++){
sem_wait(sem_empty);
sem_wait(sem_mutex);

printf("A(%d) >> buf_in:%d\n",0,buf_in);
lseek(fd, buf_in * sizeof(int), SEEK_SET);
write(fd, (char *)&buf_in, sizeof(int));
buf_in = (buf_in+1) % bufSize;

sem_post(sem_mutex);
sem_post(sem_full);
}
printf("A(%d) done\n",0);
return 0;
}
else if(p < 0){
perror("fork error!\n");
return -1;
}

for(j = 0; j < consumerNum; j++){
if(!(p = fork())){
printf("B(%d) create\n",j);
for(k = 0; k < itemNum/consumerNum; k++){
sem_wait(sem_full);
sem_wait(sem_mutex);

lseek(fd, bufSize * sizeof(int), SEEK_SET);
read(fd, (char *)&buf_out, sizeof(int));
printf("B(%d) >> buf_out:%d\n",j,buf_out);

buf_out = (buf_out + 1) % bufSize;
lseek(fd, bufSize * sizeof(int), SEEK_SET);
write(fd, (char *)&buf_out, sizeof(int));

sem_post(sem_mutex);
sem_post(sem_empty);

}
printf("B(%d) done\n",j);
return 0;
}
else if(p < 0){
perror("fork error!\n");
return -1;
}
}

while ((wait(&stat)) > 0);
sem_unlink("empty");
sem_unlink("full");
sem_unlink("mutex");

puts("all done");
return 0;
}

最终效果如下: