网络socket编程实现并发服务器——多线程编程
2021-04-18 10:26
标签:相互 程序 input 变量 body 内存 结构体 信号 常见 网络socket编程实现并发服务器——多线程编程 标签:相互 程序 input 变量 body 内存 结构体 信号 常见 原文地址:https://www.cnblogs.com/wanghuaijun/p/13295555.html
1、什么是线程?
线程在操作系统原理中是这样描述的:线程是进程的一条执行路径。线程在Unix系统下,通常被称为轻量级的进程,线程虽然不是进程,但却可以看作是Unix进程的表亲,所有的线程都是在同一进程空间运行,这也意味着多条线程将共享该进程中的全部系统资源,如虚拟地址空间,文件描述符和信号处理等等。但同一进程中的多个线程有各自的调用栈(call stack),自己的寄存器环境(register context),自己的线程本地存储(thread-local storage)。 一个进程可以有很多线程,每条线程并行执行不同的任务。
一个进程创建后,会首先生成一个缺省的线程,通常称这个线程为主线程(或称控制线程),C/C++程序中,主线程就是通过main函数进入的线程,由主线程调用pthread_create()创建的线程称为子线程,子线程也可以有自己的入口函数,该函数由用户在创建的时候指定。每个线程都有自己的线程ID,可以通过pthread_self()函数获取。最常见的线程模型中,除主线程较为特殊之外,其他线程一旦被创建,相互之间就是对等关系,不存在隐含的层次关系。每个进程可创建的最大线程数由具体实现决定。
无论在windows中还是Posix中,主线程和子线程的默认关系是:无论子线程执行完毕与否,一旦主线程执行完毕退出,所有子线程执行都会终止。这时整个进程结束或僵死,部分线程保持一种终止执行但还未销毁的状态,而进程必须在其所有线程销毁后销毁,这时进程处于僵死状态。线程函数执行完毕退出,或以其他非常方式终止,线程进入终止态,但是为线程分配的系统资源不一定释放,可能在系统重启之前,一直都不能释放,终止态的线程,仍旧作为一个线程实体存在于操作系统中,什么时候销毁,取决于线程属性。在这种情况下,主线程和子线程通常定义以下两种关系:
相分离(detached):表示子线程无需和主线程会合,也就是相分离的,这种情况下,子线程一旦进入终止状态,这种方式常用在线程数较多的情况下,有时让主线程逐个等待子线程结束,或者让主线程安排每个子线程结束的等待顺序,是很困难或不可能的,所以在并发子线程较多的情况下,这种方式也会经常使用。
3、创建子线程
pthread_create()函数
函数原型:
#include
*arg);
1234
参数介绍:
第二个参数是线程的属性attr,其类型是 pthread_attr_t 结构体类型,其定义如下:
{
int detachstate; //线程的分离状态
int schedpolicy; //线程调度策略
struct sched_param schedparam; //线程的调度参数
int inheritsched; //线程的继承性
int scope; //线程的作用域
size_t guardsize; //线程栈末尾的警戒缓冲区大小
int stackaddr_set;
void *stackaddr; //线程栈的位置
size_t stacksize; //线程栈的大小
}pthread_attr_t;
对于这些属性,我们需要设定的是线程的分离状态,如果有必要,也需修改每个线程的栈大小。
每个线程创建后默认是joinable状态,该状态需要主线程调用 pthread_join 等待它退出,否则,
子线程在结束时,内存资源不能得到释放造成内存泄漏。所以我们创建线程时一般会将线程设置为
分离状态,具体有两种方法:
1. 线程里面调用 pthread_detach(pthread_self()) 函数,这个方法最简单
2. 在创建线程的属性设置里设置PTHREAD_CREATE_DETACHED属性
*/
123456789101112131415161718192021
第四个参数arg是传给所调用的函数的参数,如果有多个参数要传递的话,就需要将这多个参数封装到一个结构体中,再传入函数中;
代码如下:
1 #include
2 #include
3 #include
4 #include
5 #include
6 #include
7
8 void *thread_worker1(void *args);
9 void *thread_worker2(void *args);
10
11 int main(int argc, char *argv[])
12 {
13 int shared_var = 1000;
14 pthread_t tid;
15 pthread_attr_t thread_attr;
16
17
18 if (pthread_attr_init(&thread_attr))
19 {
20 printf("pthread_attr_init() failure: %s\n", strerror(errno));
21 return -1;
22 }
23
24 if (pthread_attr_setstacksize(&thread_attr, 120*1024)) //重新设置子线程栈大小
25 {
26 printf("pthread_attr_setstacksize() failure: %s\n", strerror(errno));
27 return -1;
28 }
29 //设置子线程与主线程为相分离的关系
30 if (pthread_attr_setdetachstate(&thread_attr, PTHREAD_CREATE_DETACHED))
31 {
32 printf("pthread_attr_setdetachstate() failure: %s\n", strerror(errno));
33 return -1;
34 }
35 //创建第一个子线程,去执行thread_worker1()
36 pthread_create(&tid, &thread_attr, thread_worker1, &shared_var);
37 printf("Thread worker1 tid[%ld] created ok\n", tid);
38 //创建第二个子线程,去执行thread_worker2()
39 pthread_create(&tid, NULL, thread_worker2, &shared_var);
40 printf("Thread worker2 tid[%ld] created ok\n", tid);
41
42 pthread_attr_destroy(&thread_attr); //销毁为线程重新设置的属性
43
44 /* 第二个子线程默认是joinable,在这里阻塞,等待与子线程会合 */
45 pthread_join(tid, NULL);
46
47
48 while (1)
49 {
50 printf("Main/Control thread shared_var: %d\n", shared_var);
51 sleep(10);
52 }
53 }
54
55 void *thread_worker1(void *args)
56 {
57 int *ptr = (int *)args;
58
59 if (!args)
60 {
61 printf("%s() get invalid arguments\n", __FUNCTION__);
62 pthread_exit(NULL);
63 }
64
65 printf("Thread workder 1 [%ld] start running...\n", pthread_self());
66
67 while (1)
68 {
69 printf("+++: %s before shared_var++: %d\n", __FUNCTION__, *ptr);
70 *ptr += 1;
71 sleep(2);
72 printf("+++: %s after sleep shared_var: %d\n", __FUNCTION__, *ptr);
73 }
74
75 printf("Thread workder 1 exit...\n");
76
77 return NULL;
78 }
79 //宏__FUNCTION__用来获取函数名
80 void *thread_worker2(void *args)
81 {
82 int *ptr = (int *)args;
83
84 if (!args)
85 {
86 printf("%s() get invalid arguments\n", __FUNCTION__);
87 pthread_exit(NULL);
88 }
89
90 printf("Thread workder 2 [%ld] start running...\n", pthread_self());
91
92 while (1)
93 {
94 printf("---: %s before shared_var++: %d\n", __FUNCTION__, *ptr);
95 *ptr += 1;
96 sleep(2);
97 printf("---: %s after sleep shared_var: %d\n", __FUNCTION__, *ptr);
98 }
99
100 printf("Thread workder 2 exit...\n");
101
102 return NULL;
103 }
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103
运行结果:
Thread worker1 tid[1993757808] created ok
Thread workder 1 [1993757808] start running...
+++: thread_worker1 before shared_var++: 1000
Thread worker2 tid[1993634928] created ok
Thread workder 2 [1993634928] start running...
---: thread_worker2 before shared_var++: 1001
+++: thread_worker1 after sleep shared_var: 1002
+++: thread_worker1 before shared_var++: 1002
---: thread_worker2 after sleep shared_var: 1002
---: thread_worker2 before shared_var++: 1003
+++: thread_worker1 after sleep shared_var: 1004
+++: thread_worker1 before shared_var++: 1004
---: thread_worker2 after sleep shared_var: 1005
---: thread_worker2 before shared_var++: 1005
1234567891011121314
程序分析:
代码15行我们定义了创建线程的属性变量thread_attr ,在对该属性进行设置前,我们需要先调用pthread_attr_init 函数初始化它(第18行),在第24行我们设置线程的栈大小为120K,同时在第30行设置线程的属性为分离状态。第36行创建线程时使用了该属性创建线程,这时创建的子进程就是分离状态了。线程属性在使用完之后,需调用pthread_attr_destroy (第45行)把它摧毁释放;
而代码39行创建子线程时并没有使用该线程,同时在thread_worker2() 里并没有调用pthread_detach()将线程设置为分离状态。这时就需要主线程在45行处调用pthread_join() 等待第二个子线程退出。因此主线程也就阻塞在这里,从而不会往下继续执行;
在创建两个线程时,我们都通过第四个参数将主线程栈中的 shared_var 变量地址传给了子线程,因为所有线程都是在同一进程空间中运行,而只是子线程有自己独立的栈空间,所以这时所有子线程都可以访问主线程空间的shared_var变量。
4、锁的概念
上面的程序存在着一定的问题!两个子线程操纵了同一个变量shared_var ,那么两个线程都对变量shared_var 进行修改的话,可能会产生数据的不一致!因此会看到第一个线程只是对变量shared_var 进行加1操作,但下次打印确实102,也就是加了2。接下来,引入锁的概念。
① 互斥锁
试想一下,我们寝室只有一个洗手间,那多个人是怎么解决洗漱台共享的问题?那么,这时就要引入锁的机制!在这里洗漱台就是临界资源,我们在进入到洗手间(临界区)后,就首先将洗手间上锁; 然后用完离开洗手间(临界区)之后,把锁打开以供别人使用。如果有人想去洗手间时发现门锁上了,他也有两种方法:
2、暂时先离开,不过过会儿再过来看(非阻塞模式);
1 #include
2 #include
3 #include
4 #include
5 #include
6 #include
8 void *thread_worker1(void *args);
9 void *thread_worker2(void *args);
10 //由于要传两个参数,所以这里定义了一个结构体
11 typedef struct worker_ctx_s
12 {
13 int shared_var;
14 pthread_mutex_t lock; //引入锁
15 }worker_ctx_t;
16
17 int main(int argc, char **argv)
18 {
19 worker_ctx_t worker_ctx;
20 pthread_t tid;
21 pthread_attr_t thread_attr;
22
23 worker_ctx.shared_var = 1000;
24 pthread_mutex_init(&worker_ctx.lock, NULL);
25 //初始化互斥锁
26
27 if (pthread_attr_init(&thread_attr))
28 {
29 printf("pthread_attr_init() failure: %s\n", strerror(errno));
30 return -1;
31 }
32
33 if (pthread_attr_setstacksize(&thread_attr, 120*1024))
34 {
35 printf("pthread_attr_setstacksize() failure: %s\n", strerror(errno));
36 return -1;
37 }
38
39 if (pthread_attr_setdetachstate(&thread_attr, PTHREAD_CREATE_DETACHED))
40 {
41 printf("pthread_attr_setdetachstate() failure: %s\n", strerror(errno));
42 return -1;
43 }
44
45 pthread_create(&tid, &thread_attr, thread_worker1, &worker_ctx);
46 printf("Thread worker1 tid[%ld] created ok\n", tid);
47
48 pthread_create(&tid, &thread_attr, thread_worker2, &worker_ctx);
49 printf("Thread worker2 tid[%ld] created ok\n", tid);
50
51 while (1)
52 {
53 printf("Main/Control thread shared_var: %d\n", worker_ctx.shared_var);
54 sleep(10);
55 }
56
57 pthread_mutex_destroy(&worker_ctx.lock);
58 }
59
60 void *thread_worker1(void *args)
61 {
62 worker_ctx_t *ctx = (worker_ctx_t *)args;
63
64 if (!args)
65 {
66 printf("%s() get invalid arguments\n", __FUNCTION__);
67 pthread_exit(NULL);
68 }
69
70 printf("Thread workder 1 [%ld] start running...\n", pthread_self());
71
72 while (1)
73 {
74 pthread_mutex_lock(&ctx->lock); //设置阻塞锁
75
76 printf("+++: %s before shared_var++: %d\n", __FUNCTION__, ctx->shared_var);77 ctx->shared_var ++;
78 sleep(2);
79 printf("+++: %s after sleep shared_var: %d\n", __FUNCTION__, ctx->shared_var);
80
81 pthread_mutex_unlock(&ctx->lock); //打开阻塞锁
82
83 sleep(1);
84 }
85
86 printf("Thread workder 1 exit...\n");
87
88 return NULL;
89 }
90
91 void *thread_worker2(void *args)
92 {
93 worker_ctx_t *ctx = (worker_ctx_t *)args;
94
95 if (!args)
96 {
97 printf("%s() get invalid arguments\n", __FUNCTION__);
98 pthread_exit(NULL);
99 }
100
101 printf("Thread workder 2 [%ld] start running...\n", pthread_self());
102
103 while(1)
104 {
105 if (0 != pthread_mutex_trylock(&ctx->lock)) //设置非阻塞锁
106 {
107 continue;
108 }
109
110 printf("---: %s before shared_var++: %d\n", __FUNCTION__, ctx->shared_var);
111 ctx->shared_var ++;
112 sleep(2);
113 printf("---: %s after sleep shared_var: %d\n", __FUNCTION__, ctx->shared_var);
114
115 pthread_mutex_unlock(&ctx->lock);//打开非阻塞锁
116
117 sleep(1);
118 }
119
120 printf("Thread workder 2 exit...\n");
121
122 return NULL;
123 }
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121
程序分析:
代码19行:使用work_ctx_t结构体类型定义了传给子线程的变量参数;
代码24行:互斥锁在使用之前,需要先调用 pthread_mutex_init() 函数来初始化互斥锁;
代码48行:在创建第二个线程时也设置了分离属性,这时主线程后面的while(1)循环就会执行了;
代码57行:互斥锁在使用完之后,我们应该调用pthread_mutex_destroy()将他摧毁释放;
代码74行: 这里调用pthread_mutex_lock() 来申请锁,这里是阻塞锁,如果锁被别的线程持有,则该函数不会返回;
代码81行: 在访问临界资源(shared_var)完成退出临界区时,我们调用pthread_mutex_unlock来释放锁,这样其他线程才能再次访问;
代码105行: 第二个线程我们使用pthread_mutex_trylock() 来申请锁,这里使用的是非阻塞锁;如果锁现在被别的线程占用则返回非0值,如果没有被占用则返回0;
代码83行、117行: 这里都要加上延时,否则一个线程拿到锁之后会一直占有该锁;另外一个线程则不能获取到锁;
Thread worker1 tid[1994032240] created ok
Thread workder 1 [1994032240] start running...
+++: thread_worker1 before shared_var++: 1000
Thread worker2 tid[1993909360] created ok
Main/Control thread shared_var: 1001
Thread workder 2 [1993909360] start running...
+++: thread_worker1 after sleep shared_var: 1001
---: thread_worker2 before shared_var++: 1001
---: thread_worker2 after sleep shared_var: 1002
+++: thread_worker1 before shared_var++: 1002
+++: thread_worker1 after sleep shared_var: 1003
---: thread_worker2 before shared_var++: 1003
---: thread_worker2 after sleep shared_var: 1004
+++: thread_worker1 before shared_var++: 1004
Main/Control thread shared_var: 1005
+++: thread_worker1 after sleep shared_var: 1005
---: thread_worker2 before shared_var++: 1005
1234567891011121314151617
通过引入互斥锁,就解决了数据不一致的问题。
② 死锁
如果多个线程要调用多个对象,则在上锁的时候可能会出现“死锁”。举个例子: A、B两个线程会同时使用到两个共享变量m和n,同时每个变量都有自己相应的锁M和N。 这时A线程首先拿到M锁访问m,接下来他需要拿N锁来访问变量n; 而如果此时B线程拿着N锁等待着M锁的话,就造成了线程“死锁”。
1、互斥:某种资源一次只允许一个进程访问,即该资源一旦分配给某个进程,其他进程就不能再访问,直到该进程访问结束。
2、占有且等待:一个进程本身占有资源(一种或多种),同时还有资源未得到满足,正在等待其他进程释放该资源。
3、不可抢占:别人已经占有了某项资源,你不能因为自己也需要该资源,就去把别人的资源抢过来。
4、循环等待:存在一个进程链,使得每个进程都占有下一个进程所需的至少一种资源。
当以上四个条件均满足,必然会造成死锁,发生死锁的进程无法进行下去,它们所持有的资源也无法释放。这样会导致CPU的吞吐量下降。所以死锁情况是非常浪费系统资源以及影响计算机的使用性能的。那么,解决死锁问题就是相当有必要的了!
由于产生死锁需要四个条件,那么,只要这四个条件中至少有一个条件得不到满足,就不可能发生死锁了。由于互斥条件是非共享资源所必须的,不仅不能改变,还应加以保证,所以,主要是破坏产生死锁的其他三个条件。
a、破坏“占有且等待”条件
方法1:所有的进程在开始运行之前,必须一次性地申请其在整个运行过程中所需要的全部资源。
优点:简单易实施且安全。缺点:因为某项资源不满足,进程无法启动,而其他已经满足了的资源也不会得到利用,严重降低了资源的利用率,造成
资源浪费。使进程经常发生饥饿现象。
方法2:该方法是对第一种方法的改进,允许进程只获得运行初期需要的资源,便开始运行,在运行过程中逐步释放掉分配到的已经使用完毕的资源,然后再去请求新的资源。这样的话,资源的利用率会得到提高,也会减少进程的饥饿问题。
b、破坏“不可抢占”条件
当一个已经持有了一些资源的进程在提出新的资源请求没有得到满足时,它必须释放已经保持的所有资源,待以后需要使用的时候再重新申请。这就意味着进程已占有的资源会被短暂地释放或者说是被抢占了。该种方法实现起来比较复杂,且代价也比较大。释放已经保持的资源很有可能会导致进程之前的工作实效等,反复的申请和释放资源会导致进程的执行被无限的推迟,这不仅会延长进程的周转周期,还会影响系统的吞吐量。
c、破坏“循环等待”条件
可以通过定义资源类型的线性顺序来预防,可将每个资源编号,当一个进程占有编号为i的资源时,那么它下一次申请资源只能申请编号大于i的资源。
二、多线程改写服务器程序
1、多进程并发服务器
程序代码如下:
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
void *thread_worker(void *ctx);
//封装thread_start()函数,实现创建子线程的功能
int thread_start(pthread_t * thread_id, THREAD_BODY * thread_workbody, void *thread_arg);
{
printf("%s usage: \n", progname);
printf("-p(--port): sepcify server listen port.\n");
printf("-h(--Help): print this help information.\n");
return ;
}
{
int sockfd = -1;
int rv = -1;
struct sockaddr_in servaddr;
struct sockaddr_in cliaddr;
socklen_t len;
int port = 0;
int clifd;
int ch;
int on = 1;
pthread_t tid;
struct option opts[] = {
{"port", required_argument, NULL, ‘p‘},
{"help", no_argument, NULL, ‘h‘},
{NULL, 0, NULL, 0}
};
while ((ch=getopt_long(argc, argv, "p:h", opts, NULL)) != -1)
{
switch(ch)
{
case ‘p‘:
port=atoi(optarg);
break;
case ‘h‘:
print_usage(argv[0]);
return 0;
}
}
if( !port )
{
print_usage(argv[0]);
return 0;
}
sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (sockfd {
printf("Create socket failure: %s\n", strerror(errno));
return -1;
}
printf("Create socket[%d] successfully!\n", sockfd);
setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
memset(&servaddr, 0, sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_port = htons(port);
servaddr.sin_addr.s_addr = htonl(INADDR_ANY); /* 监听本机所有IP*/
//inet_aton("192.168.0.16", &servaddr.sin_addr); /* 监听指定ip */
if (rv {
printf("Socket[%d] bind on port[%d] failure: %s\n", sockfd, port, strerror(errno));
return -2;
}
listen(sockfd, 13);
printf("Start to listen on port [%d]\n", port);
while (1)
{
printf("Start accept new client incoming...\n");
clifd = accept(sockfd, (struct sockaddr *)&cliaddr, &len);
if (clifd {
printf("Accept new client failure: %s\n", strerror(errno));
continue;
}
printf("Accept new client[%s:%d] successfully\n", inet_ntoa(cliaddr.sin_addr),
ntohs(cliaddr.sin_port));
/*注意,这里传入的是clifd的值,而不是clifd的地址*/
thread_start(&tid, thread_worker, (void *)clifd);
}
close(sockfd);
return 0;
}、
{
int rv = -1;
pthread_attr_t thread_attr;
if (pthread_attr_init(&thread_attr))
{
printf("pthread_attr_init() failure: %s\n", strerror(errno));
goto CleanUp;
}
if (pthread_attr_setstacksize(&thread_attr, 120*1024))
{
printf("pthread_attr_setstacksize() failure: %s\n", strerror(errno));
goto CleanUp;
}
if (pthread_attr_setdetachstate(&thread_attr, PTHREAD_CREATE_DETACHED))
{
printf("pthread_attr_setdetachstate() failure: %s\n", strerror(errno));
goto CleanUp;
}
/* Create the thread */
if (pthread_create(thread_id, &thread_attr, thread_workbody, thread_arg))
{
printf("Create thread failure: %s\n", strerror(errno));
goto CleanUp;
}
rv = 0;
CleanUp:
/* Destroy the attributes of thread */
pthread_attr_destroy(&thread_attr);
return rv;
}
{
int clifd;
int rv;
char buf[1024];
int i;
if (!ctx)
{
printf("Invalid input arguments in %s()\n", __FUNCTION__);
pthread_exit(NULL);
}
clifd = (int)ctx;
printf("Child thread start to commuicate with socket client...\n");
while (1)
{
memset(buf, 0, sizeof(buf));
rv = read(clifd, buf, sizeof(buf));
if (rv {
printf("Read data from client sockfd[%d] failure: %s and thread will exit\n", clifd,
strerror(errno));
close(clifd);
pthread_exit(NULL);
}
else if( rv == 0)
{
printf("Socket[%d] get disconnected and thread will exit.\n", clifd);
close(clifd);
pthread_exit(NULL);
}
else if( rv > 0 )
{
printf("Read %d bytes data from Server: %s\n", rv, buf);
}
/* convert letter from lowercase to uppercase */
for (i = 0; i {
buf[i] = toupper(buf[i]);
}
rv = write(clifd, buf, rv);
if (rv {
printf("Write to client by sockfd[%d] failure: %s and thread will exit\n", clifd,
strerror(errno));
close(clifd);
pthread_exit(NULL);
}
}
}
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200
接下来在windows下使用TCP socket测试工具连接并测试服务器的执行情况,我们可以发现服务器可以同时处理多个客户端的连接请求和通信,并在客户端断开时子线程退出,从而实现了服务器并发访问。