Linux高性能服务器-进程线程

第十三章多进程编程

exec系列系统调用

#include <unistd.h>
// 声明这个是外部函数或外部变量
extern char** environ;

// path 参数指定可执行文件的完成路径 file接收文件名,具体位置在PATH中搜寻
// arg-接受可变参数 和 argv用于向新的程序传递参数数组
// envp用于设置新程序的环境变量, 未设置则使用全局的环境变量
// exec函数是不返回的, 除非出错
// 如果未报错则源程序被新的程序完全替换

int execl(const char* path, const char* arg, ...);
int execlp(const char* file, const char* arg, ...);
int execle(const char* path, const char* arg, ..., char* const envp[])
int execv(const char* path, char* const argv[]);
int execvp(const char* file, char* const argv[]);
int execve(const char* path, char* const argv[], char* const envp[]);

fork系统调用-进程的创建

#include <sys/types.h>
#include <unistd.h>
// 每次调用都返回两次, 在父进程中返回的子进程的PID, 在子进程中返回0
// 次返回值用于区分是父进程还是子进程
// 失败返回-1
pid_t fork(viod);

fork系统调用
fork() 函数复制当前的进程, 在内核进程表中创建一个新的进程表项
新的进程表项有很多的属性和原进程相同

  • 堆指针
  • 栈指针
  • 标志寄存器的值
  • 子进程代码与父进程完全相同
  • 同时复制(采用了写时复制, 父进程和子进程对数据执行了写操作才会复制)父进程的数据(堆数据, 栈数据, 静态数据)
  • 创建子进程后, 父进程打开的文件描述符默认在子进程中也是打开的 文件描述符的引用计数, 父进程的用户根目录, 当前工作目录等变量的引用计数 均加1

也存在不同的项目

  • 该进程的PPID(标识父进程)被设置成原进程的PID,
  • 信号位图被清除(原进程设置的信号处理函数对新进程无效)

(引自维基百科-引用计数是计算机编程语言中的一种内存管理技术,是指将资源(可以是对象、内存或磁盘空间等等)的被引用次数保存起来,当被引用次数变为零时就将其释放的过程。)

The child process is an exact duplicate of the parent process except
for the following points:

  • The child has its own unique process ID, and this PID does not
    match the ID of any existing process group (setpgid(2)) or
    session. 子进程拥有自己唯一的进程ID, 不与其他相同

  • The child’s parent process ID is the same as the parent’s process
    ID. 子进程的父进程ID PPID 与父进程ID PID相同

  • The child does not inherit its parent’s memory locks (mlock(2),
    mlockall(2)). 子进程不继承父进程的内存锁(保证一部分内存处于内存中, 而不是sawp分区)

  • Process resource utilizations (getrusage(2)) and CPU time counters
    (times(2)) are reset to zero in the child.
    进程资源使用和CPU时间计数器在子进程中重置为0

  • The child’s set of pending signals is initially empty
    (sigpending(2)). 信号位图被初始化为空 原信号处理函数对子进程无效 需重新设置

  • The child does not inherit semaphore adjustments from its parent
    (semop(2)). 不会继承semadj

  • The child does not inherit process-associated record locks from
    its parent (fcntl(2)). (On the other hand, it does inherit
    fcntl(2) open file description locks and flock(2) locks from its
    parent.)

  • The child does not inherit timers from its parent (setitimer(2),
    alarm(2), timer_create(2)). 不会继承定时器

  • The child does not inherit outstanding asynchronous I/O operations
    from its parent (aio_read(3), aio_write(3)), nor does it inherit
    any asynchronous I/O contexts from its parent (see io_setup(2)).

处理僵尸进程-进程的管理

#include <sys/types.h>
#include <sys/wait.h>
// wait进程将阻塞进程, 直到该进程的某个子进程结束运行为止. 他返回结束的子进程的PID, 并将该子进程的退出状态存储于stat_loc参数指向的内存中. sys/wait.h 头文件中定义了宏来帮助解释退出信息.
pid_t wait(int* stat_loc);

// 非阻塞, 只等待由pid指定的目标子进程(-1为阻塞)
// options函数取值WNOHANG-waitpid立即返回
// 如果目标子进程正常退出, 则返回子进程的pid
// 如果还没有结束或意外终止, 则立即返回0
// 调用失败返回-1
pid_t waitpid(pid_t pid, int* stat_loc, int options);

WIFEXITED(stat_val); // 子进程正常结束, 返回一个非0
WEXITSTATUS(stat_val); // 如果WIFEXITED 非0, 它返回子进程的退出码
WIFSIGNALED(stat_val);// 如果子进程是因为一个未捕获的信号而终止, 返回一个非0值
WTERMSIG(stat_val);// 如果WIFSIGNALED非0 返回一个信号值
WIFSTOPPED(stat_val);// 如果子进程意外终止, 它返回一个非0值
WSTOPSIG(stat_val);// 如果WIFSTOPED非0, 它返回一个信号值

对于多进程程序而言, 父进程一般需要跟踪子进程的退出状态. 因此, 当子进程结束运行是, 内核不会立即释放该进程的进程表表项, 以满足父进程后续对孩子进程推出信息的查询

  • 子进程结束运行之后, 父进程读取其退出状态前, 我们称该子进程处于僵尸态
  • 另外一使子进程进入僵尸态的情况 - 父进程结束或者异常终止, 而子进程继续运行. (子进程的PPID设置为1,init进程接管了子进程) 父进程结束运行之后, 子进程退出之前, 处于僵尸态

以上两种状态都是父进程没有正确处理子进程的返回信息, 子进程都停留在僵尸态, 占据着内核资源.

waitpid()虽然为非阻塞, 则需要在 waitpid所监视的进程结束后再调用.
SIGCHLD信号- 子进程结束后将会给父进程发送此信号

static void handle_child(int sig)
{
pid_t pid;
int stat;
while ((pid = waitpid(-1, &stat, WNOHANG)) > 0)
{
// 善后处理emmmm
}
}

信号量-进程的锁

信号量原语
只支持两种操作, 等待(wait)和信号(signal) , 在LInux中等待和信号有特殊的含义, 所以又称为P(passeren, 传递就好像进入临界区)V(vrijgeven, 释放就好像退出临界区)操作.
假设有信号量SV(可取任何自然数, 这本书只讨论二进制信号量), 对它的PV操作含义为

  • P(SV), 如果SV的值大于0, 就将它减1, 如果sv的值为0 则挂起进程的执行
  • V(SV), 如果其他进程因为等待SV而挂起, 则唤醒之, 如果没有则将SV加1

总结PV使用方法

使用semget获取到唯一的标识.
使用semctlSETVAL传入初始化val的sem_un联合体.来初始化val
调用semop 传入唯一标识, sem_op=-1执行P(锁)操作sem_op=1执行V(开锁)操作
开关锁通过当sem_op=-1,semval=0
且未指定IPC_NOWAIT
等待semvalsem_op=1改为semval=1

创建信号量

// semeget 系统调用
// 创建一个全局唯一的信号量集, 或者获取一个已经存在的信号量集
// key 参数是一个键值, 用来标识一个全局唯一的信号量级,可以在不同进程中获取
// num_sems 参数指定要创建/获取的信号量集中信号量的数目. 如果是创建信号量-必须指定, 如果是获取-可以指定为0. 一般都是为1
// sem_flags指定一组标志, 来控制权限
// - 可以与IPC_CREAT 做或运算创建新的信号量集, 即使信号量集存在也不会报错
// - IPC_CREAT | IPC_EXCL来创建一组唯一信号量集 如果已经存在则会返回错误 errno = EEXIST
// 成功返回一个正整数, 是信号量集的标识符, 失败返回 -1
int semget(key_t key, int num_sems, int sem_flags);

int sem_id = semget((key_t)1234, 1, 0666 | IPC_CREAT);

初始化

// semctl 系统调用
// sem_id 参数是由semget返回的信号量集标识符
// sen_num指定被操作的信号量在信号集中的编号
// command指定命令, 可以追加命令所需的参数, 不过有推荐格式
// 成功返回对应command的参数, 失败返回-1 errno
int semctl(int sem_id, int sem_num, int command, ...);

// 第四个参数 竟然需要手动声明...
union semun
{
int val; /* Value for SETVAL */
struct semid_ds *buf; /* Buffer for IPC_STAT, IPC_SET */
unsigned short *array; /* Array for GETALL, SETALL */
struct seminfo *__buf; /* Buffer for IPC_INFO
(Linux-specific) */
};
// 初始化信号量
union semun sem_union;
sem_union.val = 1;
// 这里可以直接第三个参数传入1(val)
if (semctl(sem_id, 0, SETVAL, sem_union) == -1)
{
exit(0);
}

// 删除信号量
union semun sem_union{};
if (semctl(sem_id, 0, IPC_RMID, sem_union) == -1)
{
exit(EXIT_FAILURE);
}

与semop信号量关联的一些重要的内核变量

unsigned short semval; // 信号量的值
unsigned short semzcnt; // 等待信号量值变为0的进程数量
unsigned short semncnt// 等待信号量值增加的进程数量
pid_t sempid; // 最后一次执行semop操作的进程ID

操作信号量, 实际上就是对上面的内核变量操作

// sem_id 是由semget调用返回的信号量集的标识符, 用以指定被操作的,目标信号量集.
// sem_ops 参数指向一个sembuf结构体类型的数组
// num_sem_ops 说明操作数组中哪个信号量
// 成功返回0, 失败返回-1 errno. 失败的时候sem_ops[] 中的所有操作不执行
int semop(int sem_id, struct sembuf* sem_ops, size_t num_sem_ops);

// sem_op < 0 期望获得信号量
// semval-=abs(sem_op),要求调用进程对被操作信号量集有写权限
// 如果semval的值大于等于sem_op的绝对值, 则操作成功, 调用进程立即获得信号量

// 如果semval < abs(sem_op) 则在被指定IPC_NOWAIT的时候semop立即返回error, errno=EAGIN
// 如果没有指定 则 阻塞进程等待信号量可用, 且 semzcnt +=1, 等到下面三种情况唤醒
// 1 发生semval >= abs(sem_op), semzcnt-=1, semval-=abs(sem_op). 在SEM_UNDO设置时更新semadj
// 2 被操作的信号量所在的信号量集被进程移除, 此时semop调用失败返回, errno=EIDRM (同 sem_op = 0)
// 3 调用被系统中断, 此时semop调用失败返回, errno=EINTR, 同时将该信号量的semzcnt减1 (同 sem_op = 0)
bool P(int sem_id)
{
struct sembuf sem_b;
sem_b.sem_num = 0; // 信号量编号 第几个信号量 一般都是第0个
sem_b.sem_op = -1; // P
// IPC_NOWAIT 无论信号量操作是否成功, 都立即返回
// SEM_UNDO当进程退出的时候, 取消正在进行的semop操作 PV操作系统更新进程的semadj变量
sem_b.sem_flg = SEM_UNDO;
return semop(sem_id, &sem_b, 1) != -1;
}


// sem_op > 0
// semval+=sem_op , 要求调用进程对被操作的信号量集有写权限
// 如果此时设置了SEM_UNDO标志, 则系统将更新进程的semadj变量(用以跟踪进程对信号量的修改情况)
bool V(int sem_id)
{
struct sembuf sem_b;
sem_b.sem_num = 0;
sem_b.sem_op = 1; // V
sem_b.sem_flg = SEM_UNDO;
return semop(sem_id, &sem_b, 1) != -1;
}


// -- sem_op = 0
// -- 标着这是一个`等待0`的操作, 要求调用进程对被操作信号量集有用读权限
// -- 如果此时信号量的值是0, 则调用立即返回, 否则semop失败返回, 或者阻塞进程以等待信号量变为0
// -- 此时如果IPC_NOWAIT 标志被设置, sem_op立即返回错误 errno=EAGAIN
// -- 如果未指定此标志, 则信号量的semzcnt的值增加1, 这时进程被投入睡眠直到下列三个条件之一发生
// -- 1 信号量的值samval变为0, 此时系统将该信号量的semzcnt减1
// -- 2 被操作的信号量所在的信号量集被进程移除, 此时semop调用失败返回, errno=EIDRM
// -- 3 调用被系统中断, 此时semop调用失败返回, errno=EINTR, 同时将该信号量的semzcnt减1

semget成功时返回一个与之关联的内核结构体semid_ds

struct semid_ds
{
struct ipc_perm sem_perm;
unsigned long int sem_nsems; // 被设置为num_sems
time_t sem_otime; // 被设置为0
time_t sem_ctime; // 被设置为当前的系统时间
}
// 用来描述权限
struct ipc_perm
{
uid_t uid; // 所有者的有效用户ID, 被semget设置为调用进程的有效用户ID
gid_t gid; // 所有者的有效组ID, 被semget设置为调用进程的有效用户ID
uid_t cuid; // 创建者的有效用户ID, 被semget设置为调用进程的有效用户ID
gid_t cgid; // 创建者的有效组ID, 被semget设置为调用进程的有效用户ID
mode_t mode;// 访问权限, 背着只为sem_flags参数的最低9位.
}

共享内存-进程间通信

最高效的IPC(进程间通信)机制
需要自己同步进程对其的访问, 否则会产生竞态条件

// key
// 与semget相同 标识一段全局唯一的共享内存
// size 内存区域大小 单位字节
// shmflg
// IPC_CREAT 存不存在都创建新的共享内存
// IPC_CREAT | IPC_EXCL 不存在则创建 存在则报错
// SHM_HUGETLB 系统将使用"大页面"来为共享内存分配空间
// SHM_NORESERVE 不为共享内存保留swap空间, 如果物理内存不足
// -在执行写操作的时候将会触发`SIGSEGV`信号
// -成功返回唯一标识, 失败返回-1 errno
int shmget(key_t key, size_t size, int shmflg)
// shm_id 
// shmget返回的唯一标识
// shm_addr
// 关联到进程的哪块地址空间, 其效果还受到shmflg的可选标识SHM_RND的影响
// 如果shm_addr = NULL, 则关联地址由操作系统决定, 代码可移植性强
// 如果 shm_addr 非空,且没有`SHM_RND`标志 则关联到指定的地址处
// 如果 shm_addr 非空, 但是设置了标志 *这里还没用到, 暂时不写*
// shmflg
// SHM_RDONLY 设置后内存内容变成只读, 不设置则为读写模式
// SHM_REMAP 如果地址shmaddr已经关联到一段内存上则重新关联
// SHM_EXEC 有执行权限
// 成功返回关联到的地址, 失败返回 (void*)-1 errno
void* shmat(int shm_id, const void* shm_addr, int shmflg)

// 将共享内存关联到进程的地址空间 调用成功之后, 修改shmid_ds的部分内容
// -shm_nattach +1
// -更新 shm_lpid
// -shm_atime设置为当前时间
// 将共享内存从进程地址空间中分离
// 成功后
// -shm_nattach -1
// -更新 shm_lpid和shm_dtime设置为当前时间
// 成功返回0 失败返回-1 errno
int shmdt(const void* shm_addr)
int shm_ctl(int shm_id, int command, struct shmid_ds* buf)


shmget 同时会创建对应的shmid_ds结构体

struct shmid_ds
{
struct ipc_perm shm_per; // 权限相关
size_t shm_segsz; // 共享内存大小 单位字节 size
__time_t shm_atime; // 对这段内存最后一次调用semat的时间 0
__time_t shm_dtime; // 对这段内存最后一次调用semdt的时间 0
__time_t shm_ctime; // 对这段内存最后一次调用semctl的时间 当前时间
__pid_t shm_cpid; // 创建者PID
__pid_t lpid; // 最后一次执行shmat或shmdt的进程PID
shmatt_t shm_nattach // 关联到此共享内存空间的进程数量
}

共享内存的POSIX方法

int shmfd = shm_open("/shm_name", O_CREAT | O_RDWR, 0666);
ERROR_IF(shmfd == -1, "shm open");

int ret = ftruncate(shmfd, BUFFER_SIZE);
ERROR_IF(ret == -1, "ftruncate");

share_mem = (char*)mmap(nullptr, BUFFER_SIZE,
PROT_READ | PROT_WRITE, MAP_SHARED, shmfd, 0);
ERROR_IF(share_mem == MAP_FAILED, "share_mem");
close(shmfd);

// 取消关联
munmap((void*)share_mem, BUFFER_SIZE);

进程通信-管道

管道可以在父,子进程间传递数据, 利用的是fork调用后两个文件描述符(fd[0]和fd[1])都保持打开. 一对这样的文件描述符只能保证
父,子进程间一个方向的数据传输, 父进程和子进程必须有一个关闭fd[0], 另一个关闭fd[1].

可以用两个管道来实现双向传输数据, 也可以用socketpair来创建管道

消息队列

消息队列是两个进程之间传递二进制块数据的一种简单有效的方式.
每个数据块都有自己的类型, 接收方可以根据类型有选择的接收数据

#include <sys/msg.h>
// 与semget 相同, 成功返回标识符
// msgflg的设置和作用域setget相同
int msgget(key_t key, int msgflg);
// msg_ptr参数指向一个准备发送的消息, 消息必须按如下定义
// msg_sz 指的是mtext的长度!!!
// msgflg通常仅支持IPC_NOWAIT 以非阻塞形式发送数据
int msgsnd(int msqid, const void *msg_ptr, size_t msg_sz, int msgflg);
默认如果消息队列已满, 则会阻塞. 如果设置了 IPC_NOTWAIT
就立即返回 设置errno=EAGIN

系统自带这个结构体 不过mtext长度是1...
struct msgbuf
{
long mtype; /* 消息类型 正整数*/
char mtext[512]; /* 消息数据*/
}
// msgtype = 0 读取消息队列第一个消息
// msgtype > 0 读取消息队列第一个类型是msgtype的消息 除非标志了MSG_EXCEPT
// msgtype < 0 读取第一个 类型值 < abs(msgtype)的消息

// IPC_NOWAIT 如果消息队列没有消息, 则msgrcv立即返回并设置errno=ENOMSG
// MSG_EXCEPT 如果msgtype大于0, 则接收第一个非 msgtype 的数据
// MSG_NOERROR 消息部分长度超过msg_sz 则将它截断
int msgrcv(int msqid, void *msg_ptr, size_t msg_sz, long int msgtype, int msgflg);
处于阻塞状态 当消息队列被移除(errno=EIDRM)或者程序接受到信号(errno=EINTR) 都会中断阻塞状态
int msgctl(int msqid, int command, struct msqid_ds *buf);

IPC_STAT 复制消息队列关联的数据结构
IPC_SET 将buf中的部分成员更新到目标的内核数据
IPC_RMID 立即移除消息队列, 唤醒所有等待读消息和写消息的进程
IPC_INFO 获取系统消息队列资源配置信息

MSG_INFO 返回已经分配的消息队列所占用资源信息
MSG_STAT msgqid不再是标识符, 而是内核消息队列的数组索引

在进程间传递文件描述符

IPC命令-查看进程间通信的全局唯一key

第十四章 多线程编程

根据运行环境和调度者身份, 线程可以分为两种
内核线程
运行在内核空间, 由内核来调度.
用户线程
运行在用空间, 由线程库来调用

当内核线程获得CPU的使用权的时候, 他就加载并运行一个用户线程, 所以内核线程相当于用户线程的容器.

线程有三种实现方式

  • 完全在用户空间实现-无需内核支持
    创建和调度线程无需内核干预, 速度很快.
    不占用额外的内核资源, 对系统影响较小
    但是无法运行在多个处理器上, 因为这些用户线程是是实现在一个内核线程上的
  • 完全由内核调度
    创建和调度线程的任务都交给了内核, 运行在用户空间的线程库无需管理
    优缺点正好与上一个相反
  • 双层调度
    结合了前两个的优点
    不会消耗过多的内核资源,而且线程切换快, 同时它可以充分利用多处理器的优势

进程的创建和终止

#include <pthread.h>
int pthread_create(pthread_t* thread, const pthread_attr_t* attr, void* (*start_routine)(void*), void* arg);
// 成功返回0 失败返回错误码
// thread 用来唯一的标识一个新线程
// attr用来设置新县城的属性 传递NULL表示默认线程属性
// start_routine 指定新线程运行的函数
// arg指定函数的参数
void pthread_exit(void* retval);
用来保证线程安全干净的退出, 线程函数最好结束时调用.
通过`retval`参数向线程的回收者传递其退出信息
执行后不会返回到调用者, 而且永远不会失败

int pthread_join(pthread_t thread, void** retval)
可以调用这个函数来回收其他线程 不过线程必须是可回收的该函数会一直阻塞知道被回收的线程结束.
成功时返回0, 失败返回错误码
等待其他线程结束
thread 线程标识符
retval 目标线程的退出返回信息

错误码如下
`EDEADLK`引起死锁, 两个线程互相针对对方调用pthread_join 或者对自身调用
`EINVAL`目标线程是不可回收的, 或是其他线程在回收目标线程
`ESRCH`目标线程不存在

int pthread_cancel(pthread_t thread)
异常终止一个线程, 即为取消线程
成功返回0, 失败返回错误码

线程属性设置

接收到取消请求的目标线程可以决定是否允许被取消以及如何取消.
// 启动线程取消
int pthread_setcancelstart(int state, int* oldstate)
第一个参数
PTHREAD_CANCEL_ENABLE 允许线程被取消, 默认状态
PTHREAD_CANCEL_DISABLE 不允许被取消, 如果这种线程接收到取消请求, 则会挂起请求直到
这个线程允许被取消
第二个参数 返回之前设定的状态

// 设置线程取消类型
int pthread_setcanceltype(int type, int* oldtype)
第一个参数
PTHREAD_CANCEL_ASYNCHRONOUS 线程可以随时被取消
PTHREAD_CANCEL_DEFERRED 允许目标现成推迟行动, 直到调用了下面几个所谓的取消点函数
最好使用pthread_testcancel函数设置取消点
设置取消类型(如何取消)
第二个参数
原来的取消类型

设置脱离线程

// 初始化线程属性对象
int pthread_attr_init(pthread_attr_t *attr);
// 销毁线程属性对象, 直到再次初始化前都不能用
int pthread_attr_destory(pthread_attr_t *attr)

// 参数取值
// -PTHREAD_CREATE_JOINABLE 线程可回收
// -PTHREAD_CREATE_DETACH 脱离与进程中其他线程的同步 成为脱离线程
int pthread_attr_getdetachstate(const pthread_attr_t *attr, int *detachstate);
int pthread_attr_setdetachstate(pthread_attr_t *attr, int detachstate);
// 可以直接设置为脱离线程
int pthread_detach(pthread_t thread)

线程同步机制的使用场景

POSIX信号量-需要自己维护计数值, 用户空间有计数值 信号量自己又计数值
两份计数值容易出错

互斥锁-对临界资源的独占式访问

条件变量-等待某个条件满足
当某个共享数据达到某个值的时候, 唤醒等待这个共享数据的线程

读写锁-可以多个进程读, 读的时候不能写, 同时只能一个写

自旋锁-通过while循环频繁尝试获取锁, 适用于锁事件短, 需要快速切换的场景

POSIX信号量

多线程也必须考虑线程同步的问题.
虽然pthread_join()可以看做简单的线程同步方式不过它无法高效的实现复杂的同步需求
比如无法实现共享资源独占式访问, 或者在某种条件下唤醒指定的指定线程.

#include<semaphore>
// 用于初始化一个未命名的信号量.
// pshared==0 则表示是当前进程的局部信号量, 否则信号量可以在多个进程间共享
// value指定参数的初始值
int sem_init(sem_t* sem, int pshared, unsigned int value)

// 销毁信号量, 释放其占用的系统资源
int sem_destory(sem_t* sem)

// 以原子操作的形式将信号量的值 -1, 如果信号量的值为0, 则sem_wait将被阻塞直到sem_wait具有非0值
int sem_wait(sem_t* sem)

// 跟上面的函数相同不过不会阻塞. 信号量不为0则减一操作, 为0则返回-1 errno
int sem_trywait(sem_t* sem)

// 原子操作将信号量的值 +1
int sem_post(sem_t* sem)

初始化已经存在的信号量会导致无法预期的结果

销毁正被其他线程等待的信号量, 将会导致无法预期的结果

例子如下

constexpr int kNumberMax = 10;
std::vector<int> number(kNumberMax);

constexpr int kThreadNum = 10;
sem_t sems[kThreadNum];
pthread_t threads[kThreadNum];

constexpr int kPrintTime = 1;

void* t(void *no)
{
int start_sub = *static_cast<int*>(no);
int sub =start_sub;
int time = 0;
while(++time <= kPrintTime)
{
// 锁住本线程 释放下一个线程
sem_wait(&sems[start_sub]);
printf("%d\n", number[sub]);
sem_post(&sems[(start_sub + 1) % kThreadNum]);
// 计算下一次要打印的下标
sub = (sub + kThreadNum) % kNumberMax;
}
pthread_exit(nullptr);
}

int main()
{
std::iota(number.begin(), number.end(), 0);
sem_init(&sems[0], 0, 1);
for (int i = 1; i < kThreadNum; ++i)
{
sem_init(&sems[i], 0, 0);
}
for (int i = 0; i < kThreadNum; ++i)
{
pthread_create(&threads[i], nullptr, t, &number[i]);
}
// 等待最后一个线程结束
pthread_join(threads[kThreadNum - 1], nullptr);
}

kThreadNum个进程依次打印[0, kNumberMax)
每个进程打印kPrintTime次
最后一个进程打印完后主线程才能结束

互斥锁

// 初始化互斥锁
// 第一个参数指向目标互斥锁, 第二个参数指定属性 nullptr则为默认
int pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutexattr_t *mutexattr);

// 销毁目标互斥锁
int pthread_mutex_destory(pthread_mutex_t *mutex);

// 针对普通锁加锁
int pthread_mutex_lock(pthread_mutex_t *mutex);

// 针对普通锁立即返回 目标未加锁则加锁 如果已经加锁则返回错误码EBUSY
int pthread_mutex_trylock(pthread_mutex_t *mutex);

// 解锁 如果有其他线程在等待这个互斥锁, 则其中之一获得
int pthread_mutex_unlock(pthread_mutex_t *mutex);

销毁一个已经加锁的互斥锁 会发生不可预期的后果
也可使使用宏PTHREAD_MUTEX_INITIALIZER来初始化一个互斥锁
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;

互斥锁属性设置

int pthread_mutexattr_init(pthread_mutexattr_t *attr);

int pthread_mutexattr_destory(pthread_mutexattr_t *attr);

// PTHREAD_PROCESS_SHARED 跨进程共享
// PTHREAD_PROCESS_PRIVATE 隶属同一进程的线程
int pthread_mutexattr_getpshared(const pthread_mutexattr_t *attr, int *pshared);
int pthread_mutexattr_setpshared(const pthread_mutexattr_t *attr, int pshared);

// PTHREAD_MUTEX_NORMAL 普通锁 默认类型
// PTHREAD_MUTEX_ERRORCHECK 检错锁
// PTHREAD_MUTEX_RECURSVE 嵌套锁
// PTHREAD_MUTEX_DEFAULT 默认锁
int pthread_mutexattr_gettype(const pthread_mutexattr_t *attr, int *type);
int pthread_mutexattr_settype(const pthread_mutexattr_t *attr, int type);

PTHREAD_MUTEX_NORMAL
一个线程对其加锁后, 其他请求该锁的进程会形成一个等待队列, 解锁后然后按照优先级获得. 保证资源分配公平
A线程对一个已经加锁的普通锁再次加锁(也是A线程)-同一线程在解锁前再次加锁引发死锁
对一个已经被其他线程加锁的普通锁解锁, 或者再次解锁已经解锁的普通锁–解锁-不可预期后果

PTHREAD_MUTEX_ERRORCHECK
线程对已经加锁的检错锁再次加锁–加锁-加锁操作返回EDEADLK
对一个已经被其他线程加锁的检错锁解锁, 或者再次解锁已经解锁的检错锁–解锁-返回EPERM

PTHREAD_MUTEX_RECURSVE
允许一个线程在释放锁前多次加锁 而不发生死锁.
如果其他线程要获得这个锁, 则当前锁拥有者必须执行相应次数的解锁操作–加锁
对于已经被其他进程加锁的嵌套锁解锁, 或者对已经解锁的再次解锁–解锁-返回EPERM

PTHREAD_MUTEX_DEFAULT
这种锁的实现可能为上面三种之一
对已经加锁的默认锁再次加锁
对被其他线程加锁的默认锁解锁
再次解锁已经解锁的默认锁
都将会发生不可预料后果

例子

pthread_mutex_t mutex;
int count = 0;
void* t(void *a)
{
pthread_mutex_lock(&mutex);
printf("%d\n", count);
count++;
pthread_mutex_unlock(&mutex);
}
int main()
{
pthread_mutex_init(&mutex, nullptr);
pthread_t thread[10];
for (int i = 0; i < 10; ++i)
{
pthread_create(&thread[i], nullptr, t, nullptr);
}
sleep(3);
pthread_mutex_destroy(&mutex);
}

条件变量

int pthread_cond_init(pthread_cond_t *cond, const pthread_condattr *cond_attr);

// 销毁一个正在被等待的条件变量 将会失败并返回EBUSY
int pthread_cont_destory(pthread_cond_t *cond);

// 广播式的唤醒所有等待目标条件变量的线程
int pthread_cont_broadcast(pthread_cond_t *cond);

// 唤醒一个等待目标条件变量的线程
int pthread_cond_signal(pthread_cond_t *cond);

// 等待目标条件变量
int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);

pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
将各个字段初始化为0

pthread_cond_wait的第二个参数, 用于保护条件变量的互斥锁
掉用函数前必须将 mutex加锁, 否则会发生不可预料的后果.
函数执行前将调用线程放入条件变量等待队列, 然后将mutex解锁

从函数调用, 到被放入等待队列的时间内, pthread_cond_signal(broadcast)不会修改条件变量的值
也就是 pthread_cond_wait函数不会错过目标条件变量的任何变化,
将pthread_cond_wait函数返回的时候, 互斥锁mutex将会再次锁上

例子

pthread_mutex_t mutex;
pthread_cond_t cond;
int good = 3;
int produce_count = 0;
int consume_count = 0;

void* Producer(void *arg)
{
while(produce_count < 10)
{
pthread_mutex_lock(&mutex);
good++;
pthread_mutex_unlock(&mutex);

produce_count++;
printf("produce a good\n");
// 通知一个线程
pthread_cond_signal(&cond);
sleep(2);
}
pthread_exit(nullptr);
}

void* Consumer(void *arg)
{
while (consume_count < 13)
{
// 传入前需要加锁
pthread_mutex_lock(&mutex);
if (good > 0)
{
good--;
consume_count++;
printf("consume a good, reset %d\n", good);
}
else
{
printf("good is 0\n");
// wait pthread_cond_signal
pthread_cond_wait(&cond, &mutex);
}
pthread_mutex_unlock(&mutex);

usleep(500 * 1000);
}
pthread_exit(nullptr);
}

int main()
{
mutex = PTHREAD_MUTEX_INITIALIZER;
cond = PTHREAD_COND_INITIALIZER;
pthread_t producer, consumer;

pthread_create(&consumer, nullptr, Consumer, nullptr);
pthread_create(&producer, nullptr, Producer, nullptr);

pthread_join(consumer, nullptr);
pthread_mutex_destroy(&mutex);
pthread_cond_destroy(&cond);
}

读写锁

自旋锁

线程同步包装类-多线程环境

class Sem
{
public:
Sem()
{
if (sem_init(&sem_, 0, 0) != 0)
{
throw std::exception();
}
}
~Sem()
{
sem_destroy(&sem_);
}
bool Wait()
{
return sem_wait(&sem_) == 0;
}
bool Post()
{
return sem_post(&sem_) == 0;
}
private:
sem_t sem_;
};

class Mutex
{
public:
Mutex()
{
if (pthread_mutex_init(&mutex_, nullptr) != 0)
{
throw std::exception();
}

}
~Mutex()
{
pthread_mutex_destroy(&mutex_);
}
bool Lock()
{
return pthread_mutex_lock(&mutex_) == 0;
}
bool Unlock()
{
return pthread_mutex_unlock(&mutex_) == 0;
}

private:
pthread_mutex_t mutex_;
};

class Cond
{
public:
Cond()
{
if (pthread_mutex_init(&mutex_, nullptr) != 0)
{
throw std::exception();
}
if (pthread_cond_init(&cond_, nullptr) != 0)
{
// 这里我一开始没有想到..
pthread_mutex_destroy(&mutex_);
throw std::exception();
}
}
~Cond()
{
pthread_mutex_destroy(&mutex_);
pthread_cond_destroy(&cond_);
};
bool Wait()
{
int ret = 0;
pthread_mutex_lock(&mutex_);
ret = pthread_cond_wait(&cond_, &mutex_);
pthread_mutex_unlock(&mutex_);
return ret == 0;
}
bool Signal()
{
return pthread_cond_signal(&cond_) == 0;
}
private:
pthread_cond_t cond_;
pthread_mutex_t mutex_;
};

线程安全或可重入函数–函数能被多个线程同时调用而不发生竞态条件

多线程程序某个线程调用fork函数, 新进程不会与父进程有相同数量的线程
子进程只有一个线程-调用fork线程的完美复制

但是子进程会继承父进程的互斥锁(条件变量)的状态, 如果互斥锁被加锁了, 但不是由调用fork线程
锁住的, 此时子进程再次对这个互斥锁执行加锁操作将会死锁.

pthread_mutex_t mutex;
void* another(void *arg)
{
printf("in child thread, lock the mutex\n");
pthread_mutex_lock(&mutex);
sleep(5);
// 解锁后 Prepare才能加锁
pthread_mutex_unlock(&mutex);
pthread_exit(nullptr);
}
// 这个函数在fork创建子进程前被调用
void Prepare()
{
// 但是会阻塞 直到执行another函数的线程解锁 才能够继续执行
// 这个函数执行完毕前fork不会创建子进程
pthread_mutex_lock(&mutex);
}
// fork创建线程后 返回前 会在子进程和父进程中执行这个函数
void Infork()
{
pthread_mutex_unlock(&mutex);
}
int main()
{
pthread_mutex_init(&mutex, nullptr);
pthread_t id;
pthread_create(&id, nullptr, another, nullptr);

sleep(1);
// pthread_atfork(Prepare, Infork, Infork);
int pid = fork();
if (pid < 0)
{
printf("emmm????\n");
pthread_join(id, nullptr);
pthread_mutex_destroy(&mutex);
return 1;
}
else if (pid == 0)
{
printf("child process, want to get the lock\n");
pthread_mutex_lock(&mutex);
printf("i cann't run to here, opps....\n");
pthread_mutex_unlock(&mutex);
exit(0);
}
else
{
printf("wait start\n");
wait(nullptr);
printf("wait over\n"); // 没有打印 因为子进程不会终止
}
pthread_join(id, nullptr);
pthread_mutex_destroy(&mutex);
return 0;
}
// $ in child thread, lock the mutex
// $ wait start
// $ child process, want to get the lock

// $ in child thread, lock the mutex
// $ wait start
// $ child process, want to get the lock
// $ i cann't run to here, opps....
// $ wait over

原版就会发生死锁, 新版(去掉注释的代码) 能够正常运行

int pthread_atfork (void (*__prepare) (void),
void (*__parent) (void),
void (*__child) (void));

第一个句柄 在fork创建子进程前执行
第二个句柄 在fork创建出子进程后, fork返回前在父进程中执行
第二个句柄 在fork创建出子进程后, fork返回前在子进程中执行

第十五章 进程池和线程池

线程池 和 简单HTTP服务器

对我而言神秘已久的线程池终于揭开了面纱.
没想到这就是线程池23333

线程池写完后 直接写了书上的HTTP服务器.

那个服务器至少我发现两个问题

  • 无法发送大文件
  • 部分请求无法回复

无法发送大文件, 是因为书中使用了writev发送数据
期初我以为下面的判断 writev返回值 等于 -1就是为了发送大文件, 后来发现这个判断只是给期初就发送失败准备的.

正好前一阵子看了一个服务器的代码
https://github.com/Jigokubana/Notes-flamingo

我就索性直接将发送部分修改了

// write_sum_ 需发送总大小
// write_idx_ 已发送大小

int temp = 0;
if (write_sum_ - write_idx_ == 0)
{
Modfd(epollfd_, sockfd_, EPOLLIN);
Init();
return true;
}
while (true)
{
temp = send(sockfd_, &*write_buff_.begin() + write_idx_, write_sum_ - write_idx_, 0);
if (temp <= -1)
{
if (errno == EAGAIN)
{
Modfd(epollfd_, sockfd_, EPOLLOUT);
return true;
}
}
write_idx_ += temp;

if (write_idx_ == write_sum_)
{
// 解除绑定移到了其他地方
if (linger_)
{
Init();
Modfd(epollfd_, sockfd_, EPOLLIN);
return true;
}
else
{
Modfd(epollfd_, sockfd_, EPOLLIN);
return false;
}
}
}

第二个奇葩的问题就是使用ab压测时候 有些请求无法收到回复.
这个问题等后面在解决把, 等我知识更加丰富了再说


转载请注明来源,欢迎对文章中的引用来源进行考证,欢迎指出任何有错误或不够清晰的表达。