标签:manage 变量 构造 完成 for lin 线程 while ret
线程池:
线程池是一种多线程处理形式,初始创建多个线程,初始线程处于wait状态。处理过程中将任务添加到队列中,按照队列顺序依次处理,此时线程处于work状态自动启动这些任务。线程任务处理完后继续处理队列中待执行任务,最后完成所有任务放回至线程池统一销毁。线程池线程都是后台线程,适用于连续产生大量并发任务的场合。每个线程都使用默认的堆栈大小,以默认的优先级运行,并处于多线程单元中。
线程池(英语:thread pool):一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待监督管理者分配可并发执行的任务。线程池不仅避免了在处理短时间任务时创建与销毁线程的代价,还能够保证内核的充分利用,防止过分调度。
1、初始化线程池:创建初始的任务链表(队列)及N个线程,初始化条件变量;
2、任务链表管理:任务存放至链表中,记录当前待处理任务量,并存放未处理的任务,为线程池提供一种缓冲机制;
当任务链表头为空,则表示无待处理任务,可等待或销毁线程池;当任务链表头指向非空,线程执行待任务,任务链表头下移,等待任务-1。
3、线程池管理器(ThreadPoolManager):监测并管理线程池中线程状态,
线程状态管理:1)wait -- 线程池处于非关闭,当前无任务,线程关闭;
2)work -- 线程开启,执行待处理任务(process),任务链表头下移,等待任务-1;
4、线程池销毁:当任务全部完成或接收到销毁指令,销毁等待链表及所有线程,销毁后指针置空。
1 #include 2 #include 3 #include 4 #include 5 #include 6
7
8 typedef struct task
9 {
10 void *(*process) (void *arg);
11 void *arg;
12 struct task *next;
13 } Cthread_task;
14
15
16 /*线程池结构*/
17 typedef struct
18 {
19 pthread_mutex_t queue_lock;
20 pthread_cond_t queue_ready;
21
22 /*链表结构,线程池中所有等待任务7*/
23 Cthread_task *queue_head;
24
25 /*是否销毁线程池*/
26 int shutdown;
27 pthread_t *threadid;
28
29 /*线程池中线程数目3*/
30 int max_thread_num;
31
32 /*当前等待的任务数*/
33 int cur_task_size;
34
35 } Cthread_pool;
36
37 static Cthread_pool *pool = NULL;
38
39 void *thread_routine (void *arg);
40
41 void pool_init (int max_thread_num)
42 {
43 int i = 0;
44
45 pool = (Cthread_pool *) malloc (sizeof (Cthread_pool));
46
47 pthread_mutex_init (&(pool->queue_lock), NULL);
48 /*初始化条件变量*/
49 pthread_cond_init (&(pool->queue_ready), NULL);
50
51 pool->queue_head = NULL; //等待任务链表为空
52
53 pool->max_thread_num = max_thread_num;
54 pool->cur_task_size = 0;
55
56 pool->shutdown = 0;
57
58 pool->threadid = (pthread_t *) malloc (max_thread_num * sizeof (pthread_t));
59
60 for (i = 0; i //3
61 {
62 pthread_create (&(pool->threadid[i]), NULL, thread_routine, NULL); //创建线程
63 }
64 }
65
66
67
68 /*向线程池中加入任务*/
69 int pool_add_task (void *(*process) (void *arg), void *arg)
70 {
71 /*构造一个新任务 初始化*/
72 Cthread_task *task = (Cthread_task *) malloc (sizeof (Cthread_task));
73 task->process = process;
74 task->arg = arg;
75 task->next = NULL;
76
77 pthread_mutex_lock (&(pool->queue_lock));
78 /*将任务加入到等待队列中 加到链表尾部或空成员中*/
79 Cthread_task *member = pool->queue_head;
80 if (member != NULL)
81 {
82 while (member->next != NULL)
83 member = member->next;
84 member->next = task;
85 }
86 else
87 {
88 pool->queue_head = task;
89 }
90
91 pool->cur_task_size++;
92 pthread_mutex_unlock (&(pool->queue_lock));
93
94 pthread_cond_signal (&(pool->queue_ready)); //唤醒线程
95
96 return 0;
97 }
98
99
100
101 /*销毁线程池,等待队列中的任务不会再被执行,但是正在运行的线程会一直
102 把任务运行完后再退出*/
103 int pool_destroy ()
104 {
105 if (pool->shutdown)
106 return -1;/*防止两次调用*/
107 pool->shutdown = 1;
108
109 /*唤醒所有等待线程,线程池要销毁了*/
110 pthread_cond_broadcast (&(pool->queue_ready));
111
112 /*阻塞等待线程退出,否则就成僵尸了*/
113 int i;
114 for (i = 0; i max_thread_num; i++)
115 pthread_join (pool->threadid[i], NULL);
116 free (pool->threadid);
117
118 /*销毁等待队列*/
119 Cthread_task *head = NULL;
120 while (pool->queue_head != NULL)
121 {
122 head = pool->queue_head;
123 pool->queue_head = pool->queue_head->next;
124 free (head);
125 }
126 /*条件变量和互斥量也别忘了销毁*/
127 pthread_mutex_destroy(&(pool->queue_lock));
128 pthread_cond_destroy(&(pool->queue_ready));
129
130 free (pool);
131 /*销毁后指针置空是个好习惯*/
132 pool=NULL;
133 return 0;
134 }
135
136
137
138 void * thread_routine (void *arg)
139 {
140 printf ("starting thread 0x%x\n", pthread_self ());
141 while (1)
142 {
143 pthread_mutex_lock (&(pool->queue_lock));
144
145 while (pool->cur_task_size == 0 && !pool->shutdown) //当前没有任务,线程池处于非关闭
146 {
147 printf ("thread 0x%x is waiting\n", pthread_self ());
148 pthread_cond_wait (&(pool->queue_ready), &(pool->queue_lock)); //线程等待
149 }
150
151 /*线程池要销毁了*/
152 if (pool->shutdown)
153 {
154 /*遇到break,continue,return等跳转语句,千万不要忘记先解锁*/
155 pthread_mutex_unlock (&(pool->queue_lock));
156 printf ("thread 0x%x will exit\n", pthread_self ());
157 pthread_exit (NULL);
158 }
159
160 printf ("thread 0x%x is starting to work\n", pthread_self ());
161
162
163 /*待处理任务减1,并取出链表中的头元素*/
164 pool->cur_task_size--; //等待任务-1
165 Cthread_task *task = pool->queue_head; //取第一个任务处理
166 pool->queue_head = task->next; //链表头指向下一个任务
167 pthread_mutex_unlock (&(pool->queue_lock));
168
169 /*调用回调函数,执行任务*/
170 (*(task->process)) (task->arg);
171 free (task);
172 task = NULL;
173 }
174 /*这一句应该是不可达的*/
175 pthread_exit (NULL);
176 }
177
178 void * myprocess (void *arg)
179 {
180 printf ("threadid is 0x%x, working on task %d\n", pthread_self (),*(int *) arg);
181 sleep (1);/*休息一秒,延长任务的执行时间*/
182 return NULL;
183 }
184
185 int main (int argc, char **argv)
186 {
187 pool_init (3);/*创建线程池,线程池中最多三个活动线程*/
188
189 /*连续向池中投入10个任务*/
190 int *workingnum = (int *) malloc (sizeof (int) * 10);
191 int i;
192 for (i = 0; i 10; i++)
193 {
194 workingnum[i] = i;
195 pool_add_task (myprocess, &workingnum[i]);
196 }
197 /*等待所有任务完成*/
198 sleep (5);
199 /*销毁线程池*/
200 pool_destroy ();
201
202 free (workingnum);
203
204 return 0;
205 }
线程池技术优化
标签:manage 变量 构造 完成 for lin 线程 while ret
原文地址:https://www.cnblogs.com/hjh-666/p/11110570.html