python 线程

2021-05-19 18:29

阅读:382

标签:开始   div   单位   信号量   不可   while   线程同步   strftime   sum   

导航 

1、Thread类
2、线程同步
3、threading.Condition
4、threading.Event
5、threading.Semaphore  信号量
6、queue模块,线程队列

 

线程是cpu运行的最小单位,没有自己的内存空间,同一线程的多线程共享一个内存空间,同一线程下的多线程都可以访问全局变量,对全局变量进行操作时要注意同步问题。

threading模块是建立在 _thread 模块上的,对其进行封装,包含较多的功能。

1、Thread类

threading.Thread(group=None, target=None, name=None, args=(), kwargs=None,  daemon=None)

  • group                 线程组,预留的还没有什么用,应设置为None
  • target                目的方法,线程要执行的函数入口,这是直接通过Thread创建多线程需要指定的函数入口;通过继承Thread类创建的多线程,target应设置为None
  • name           线程名
  • args/kwargs      target函数的参数
  • daemon             是否设置为守护线程

1)创建多线程的方式

创建多线程主要有两种方式,①通过Thread类直接创建子线程,并指定子线程需要执行的函数逻辑。②通过继承Thread类创建实例对象,重写run方法,开启子线程

技术分享图片技术分享图片
 1 from threading import Thread
 2 
 3 
 4 def fn(i):
 5     # 子线程逻辑
 6     print("--子线程--", i)
 7 
 8 
 9 if __name__ == "__main__":
10     t = Thread(target=fn, args=(100,))
11     t.start()
12     print("--主线程--")
13 
14 
15 # 输出结果
16 --子线程-- 100
17 --主线程--
Thread创建多线程
技术分享图片技术分享图片
 1 from threading import Thread
 2 
 3 
 4 class MyThread(Thread):
 5     def __init__(self, name):
 6         super().__init__()
 7         self.name = name
 8 
 9     def run(self):
10         # 子线程逻辑
11         print("--子线程--", self.name)
12 
13 
14 if __name__ == "__main__":
15     t = MyThread("my_thread_01")
16     t.start()
17     print("--主线程--")
18 
19 
20 # 输出结果
21 --子线程-- my_thread_01
22 --主线程--
继承Thread创建多线程

2)下面来看一下使用多线程与不使用多线程的区别

技术分享图片技术分享图片
 1 from threading import Thread
 2 import time
 3 
 4 
 5 def fn(i):
 6     # 子线程逻辑
 7     print("+++", i)
 8     time.sleep(1)
 9 
10 
11 if __name__ == "__main__":
12     start_time = time.time()
13     t_l = []
14     for i in range(5):
15         t = Thread(target=fn, args=(i,))
16         t.start()
17         t_l.append(t)
18     for i in range(5):
19         t.join()
20     print("运行所需时间:", time.time() - start_time)
21 
22 
23 # 输出结果
24 +++ 0
25 +++ 1
26 +++ 2
27 +++ 3
28 +++ 4
29 运行所需时间: 1.0024588108062744
多线程简单例子
技术分享图片技术分享图片
 1 import time
 2 
 3 
 4 def fn(i):
 5     # 子线程逻辑
 6     print("+++", i)
 7     time.sleep(1)
 8 
 9 
10 if __name__ == "__main__":
11     start_time = time.time()
12     for i in range(5):
13         fn(i)
14     print("运行所需时间:", time.time() - start_time)
15 
16 
17 # 输出结果
18 +++ 0
19 +++ 1
20 +++ 2
21 +++ 3
22 +++ 4
23 运行所需时间: 5.0021538734436035
未使用多线程

可以看到同样的需要执行5次fn函数的逻辑,使用多线程的效率却比单线程的要高

3)daemon守护线程的设置

设置守护线程有有两种方法 ① t.daemon = True  直接设置属性  ②  t.setDaemon(True)  通过方法设置。

当子线程设置为守护线程是,主线程结束时程序将不再等待子线程是否执行完毕直接退出程序。如下边子线程设置为守护线程后,主线程结束了,子线程也将跟着结束,并未输出子线程的语句

技术分享图片技术分享图片
 1 from threading import Thread
 2 import time
 3 
 4 
 5 def fn(i):
 6     # 子线程逻辑
 7     time.sleep(1)
 8     print("+++", i)
 9 
10 
11 if __name__ == "__main__":
12     for i in range(5):
13         t = Thread(target=fn, args=(i,))
14         t.setDaemon(True)
15         t.start()
16     print("---end---")
17 
18 
19 # 输出结果
20 ---end---
daemon设置守护线程

4)join(timeout=None)

join方法堵塞当前上下文环境的线程,直到调用join方法的子线程执行结束后当前线程才继续执行。在哪里调用就在哪堵塞,最多堵塞timeout秒

技术分享图片技术分享图片
 1 from threading import Thread
 2 import time
 3 
 4 
 5 def fn():
 6     # 子线程逻辑
 7     time.sleep(5)
 8 
 9 
10 if __name__ == "__main__":
11     start_time = time.time()
12     t = Thread(target=fn)
13     t.start()
14     t.join()
15     print("所需时间-->", time.time() - start_time)
16 
17 
18 # 输出结果
19 所需时间--> 5.001126527786255
join方法

5)is_alive(),返回子线程存活状态

is_alive方法判断指定对象线程的存活状态。线程 start 后一直到该线程结束都返回True。线程 start 前 或线程已经结束返回False

 

2、线程同步

当多个线程的同时对一个变量进行操作时便会存在着安全的隐患问题,

如下:有2个线程同时对全局变量进行 + 1,000,000 为什么结果却不是 2,000,000。这是因为当线程一执行完11行时还未开始执行12行,全局变量a没有进行 +1,就已经切换了线程二执行;线程二从10行开始执行,取到的值依旧是还没有 +1 的值,执行完12行,切换回线程一;线程一继续上次的断点执行12行,这时就相当于两次线程总共才完成了一次 +1 操作。多次循环如此,那么全局变量最后的结果就不可能是2,000,000。这里为了看到明显的效果,我将 a += 1 变成了三条语句来实现

技术分享图片技术分享图片
 1 from threading import Thread
 2 
 3 
 4 a = 0
 5 
 6 
 7 def fn():
 8     global a
 9     for i in range(1000000):
10         t = a
11         t += 1
12         a = t
13 
14 
15 if __name__ == "__main__":
16     t_l = []
17     for i in range(2):
18         t = Thread(target=fn)
19         t_l.append(t)
20         t.start()
21     for t in t_l:
22         t.join()
23     print("全局变量 a-->", a)
24 
25 
26 # 输出结果
27 全局变量 a--> 1332605
多线程对全局变量的修改

 

技术分享图片技术分享图片
 1 from threading import Thread
 2 
 3 def fn():
 4     for i in range(5):
 5         print("1234")
 6 
 7 
 8 for i in range(2):
 9     t = Thread(target=fn)
10     t.start()
11 
12 
13 # 输出结果1234
14 1234
15 12341234
16 
17 1234
18 1234
19 1234
20 12341234
21 
22 1234
多线程同时对控制台打印

 

经过这个例子,就可以看出当多线程对共享的资源进行修改是就涉及到同步问题了,我们希望同一时刻只有一个线程t执行某段逻辑,其它线程必须等待这个线程t执行完才能执行。

1)threading.Lock()

Lock线程锁,在需要进行同步的语句块加上可以确保线程间的同步问题

技术分享图片技术分享图片
 1 from threading import Thread, Lock
 2 
 3 
 4 def fn():
 5     global a
 6     for i in range(1000000):
 7         lock.acquire()
 8         t = a
 9         t += 1
10         a = t
11         lock.release()
12 
13 
14 a = 0
15 t_l = []
16 lock = Lock()
17 for i in range(2):
18     t = Thread(target=fn)
19     t_l.append(t)
20     t.start()
21 for t in t_l:
22     t.join()
23 print("全局变量 a-->", a)
24 
25 
26 # 全局变量
27 全局变量 a--> 2000000
Lock后

2)死锁

关于死锁的概念我就不再叙述,就简单的说说下边的代码,同时开启两个线程,线程t1中执行到8行,线程t2执行到20行的时候,t1发现锁lock2没有被释放便会在这一直等待,t2发现锁lock1没有被释放也在这一直等待,这样谁也不会先释放锁,就会造成死锁的现象,程序一直卡着。

技术分享图片技术分享图片
 1 from threading import Thread, Lock
 2 import time
 3 
 4 def fn1():
 5     lock1.acquire()
 6     print("fn1-->lock1.acquire")
 7     time.sleep(1)  # 确保另一个线程已经执行fn2函数且lock2已经锁上
 8     lock2.acquire()
 9     print("fn1-->lock2.acquire")
10     lock2.release()
11     print("fn1-->lock2.release")
12     lock1.release()
13     print("fn1-->lock1.release")
14 
15 
16 def fn2():
17     lock2.acquire()
18     print("fn1-->lock2.acquire")
19     time.sleep(1)  # 确保另一个线程已经执行fn1函数且lock1已经锁上
20     lock1.acquire()
21     print("fn1-->lock1.acquire")
22     lock1.release()
23     print("fn1-->lock1.release")
24     lock2.release()
25     print("fn1-->lock2.release")
26 
27 
28 lock1 = Lock()
29 lock2 = Lock()
30 t1 = Thread(target=fn1)
31 t2 = Thread(target=fn2)
32 t1.start()
33 t2.start()
34 t1.join()
35 t2.join()
36 print("----end----")
37 
38 
39 # 输出结果
40 fn1-->lock1.acquire
41 fn1-->lock2.acquire
死锁

 

3、threading.Condition

  • acquire()                获取锁
  • release()                释放锁
  • wait(timeout=None)            等待notify的通知,没有等到则在此堵塞,并释放锁cdt.release();有notify则继续往下执行,并释放锁。timeout:堵塞最长时间秒,默认一直堵塞。使用这个方法之前要已经acquire(),因为这个方法会释放锁 
  • notify(n=1)              通知一个正在 wait 堵塞 的线程,n:通知线程数,默认为1
  • notify_all()               通知所有正在 wait 堵塞 的线程
技术分享图片技术分享图片
 1 from threading import Thread, Condition
 2 import time
 3 
 4 
 5 def producer():
 6     global cdt
 7     while True:
 8         time.sleep(1)
 9         cdt.acquire()
10         print("+++++ 1个包子")
11         cdt.notify()
12         cdt.release()
13 
14 
15 def consumer():
16     global cdt
17     while True:
18         cdt.acquire()
19         cdt.wait()
20         print("----- 1个包子, 时间:", time.strftime("%X"))
21         # cdt.release()
22 
23 
24 cdt = Condition()
25 prd = Thread(target=producer)
26 csm = Thread(target=consumer)
27 prd.start()
28 csm.start()
29 prd.join()
30 csm.join()
31 print("----end----")
32 
33 
34 # 输出结果
35 +++++ 1个包子
36 ----- 1个包子, 时间: 10:05:30
37 +++++ 1个包子
38 ----- 1个包子, 时间: 10:05:31
39 +++++ 1个包子
40 ----- 1个包子, 时间: 10:05:32
41 +++++ 1个包子
42 ----- 1个包子, 时间: 10:05:33
43 +++++ 1个包子
44 ----- 1个包子, 时间: 10:05:34
Condition,1个生产者与1个消费者
技术分享图片技术分享图片
 1 from threading import Thread, Condition
 2 import time
 3 
 4 
 5 def producer():
 6     global cdt
 7     while True:
 8         time.sleep(1)
 9         cdt.acquire()
10         print("+++++ 1个包子")
11         cdt.notify()
12         cdt.release()
13 
14 
15 def consumer(name):
16     global cdt
17     while True:
18         cdt.acquire()
19         cdt.wait()
20         print("消费者:%s----- 1个包子, 时间:%s" % (name, time.strftime("%X")))
21         # cdt.release()
22 
23 
24 cdt = Condition()
25 prd = Thread(target=producer)
26 prd.start()
27 for i in range(5):
28     csm = Thread(target=consumer, args=(i,))
29     csm.start()
30 
31 
32 # 输出结果
33 +++++ 1个包子
34 消费者:0----- 1个包子, 时间:10:7:07
35 +++++ 1个包子
36 消费者:1----- 1个包子, 时间:10:7:08
37 +++++ 1个包子
38 消费者:2----- 1个包子, 时间:10:7:09
39 +++++ 1个包子
40 消费者:3----- 1个包子, 时间:10:7:10
41 +++++ 1个包子
42 消费者:4----- 1个包子, 时间:10:7:11
43 +++++ 1个包子
44 消费者:0----- 1个包子, 时间:10:7:12
45 +++++ 1个包子
46 消费者:1----- 1个包子, 时间:10:7:13
47 +++++ 1个包子
48 消费者:2----- 1个包子, 时间:10:7:14
49 +++++ 1个包子
50 消费者:3----- 1个包子, 时间:10:7:15
51 +++++ 1个包子
52 消费者:4----- 1个包子, 时间:10:7:16
Condition,1个生产者与多个消费者

 

4、threading.Event

  • set()                               设置flag=True
  • clear()                            清除flag,即flag=False
  • wait(timeout=None)       堵塞线程等待set()后继续执行,timeout:堵塞最长时间秒,默认一直堵塞。

Event内部定义了一个flag,当flag=Flase时,wait()将会一直堵塞线程;flag=True时,wait()不会堵塞线程,继续向下执行,这时的wait()相当与pass语句

技术分享图片技术分享图片
 1 from threading import Thread, Event
 2 import time
 3 
 4 
 5 def producer():
 6     global eve
 7     while True:
 8         time.sleep(1)
 9         print("+++++ 1个包子")
10         eve.set()
11 
12 
13 def consumer():
14     global eve
15     while True:
16         eve.wait()
17         print("----- 1个包子, 时间:%s" % (time.strftime("%X")))
18         eve.clear()
19 
20 
21 eve = Event()
22 prd = Thread(target=producer)
23 csm = Thread(target=consumer)
24 prd.start()
25 csm.start()
26     
27     
28 # 输出结果
29 +++++ 1个包子
30 ----- 1个包子, 时间:10:12:00
31 +++++ 1个包子
32 ----- 1个包子, 时间:10:12:01
33 +++++ 1个包子
34 ----- 1个包子, 时间:10:12:02
35 +++++ 1个包子
36 ----- 1个包子, 时间:10:12:03
37 +++++ 1个包子
38 ----- 1个包子, 时间:10:12:04
Event,依旧是生产者消费者
技术分享图片技术分享图片
 1 from threading import Thread, Event
 2 import time
 3 
 4 
 5 def producer():
 6     global eve
 7     while True:
 8         time.sleep(1)
 9         print("+++++ 3个包子")
10         eve.set()
11 
12 
13 def consumer(name):
14     global eve
15     while True:
16         eve.wait()  # 在此堵塞,直到eve.set()
17         print("消费者:%s----- 1个包子, 时间:%s" % (name, time.strftime("%X")))
18         eve.clear()  # 将标识清除,需重新eve.set()
19 
20 
21 eve = Event()
22 prd = Thread(target=producer)
23 prd.start()
24 for i in range(5):
25     csm = Thread(target=consumer, args=(i,))
26     csm.start()
27     
28     
29 # 输出结果
30 +++++ 3个包子
31 消费者:2----- 1个包子, 时间:10:10:25
32 消费者:1----- 1个包子, 时间:10:10:25
33 消费者:0----- 1个包子, 时间:10:10:25
34 +++++ 3个包子
35 消费者:0----- 1个包子, 时间:10:10:26
36 消费者:2----- 1个包子, 时间:10:10:26
37 消费者:1----- 1个包子, 时间:10:10:26
38 +++++ 3个包子
39 消费者:0----- 1个包子, 时间:10:10:27
40 消费者:2----- 1个包子, 时间:10:10:27
41 消费者:1----- 1个包子, 时间:10:10:27
Event,1个生产者与多个消费者

 

5、threading.Semaphore  信号量

  • Semaphore(value=1)                         默认最大并发数为1
  • acquire(blocking=True, timeout=None)     获取一个信号,内部计数器  -= 1,blocking当计数器为零时堵塞线程,timeout线程堵塞时间秒
  • release()                                                    释放一个信号,内部计数器 += 1

Semaphore内部管理着一个计数器,acquire时计数器减1,release时计数器加1,当计数器 =0 时,将在acquire语句处堵塞线程。

信号量简单来说,就是控制线程最大并发数的。将信号量比作一定数量的停车位,线程比作车,当有停车位空出来时车才能上去停泊,等停车位满了时,剩下的车只能等停车位上的车走了才能继续上去停泊。

技术分享图片技术分享图片
 1 from threading import Thread, Lock, Semaphore
 2 import time
 3 
 4 def fn(num):
 5     global sp
 6     sp.acquire()
 7     time.sleep(1)
 8     print("--->", num, time.strftime("%X"))
 9     sp.release()
10 
11 
12 sp = Semaphore(3)
13 for i in range(15):
14     prd = Thread(target=fn, args=(i,))
15     prd.start()
16 
17 
18 # 输出结果
19 ---> 0 14:03:29
20 ---> 2 14:03:29
21 ---> 1 14:03:29
22 ---> 3 14:03:30
23 ---> 4 14:03:30 
24 ---> 5 14:03:30
25 ---> 6 14:03:31
26 ---> 7 14:03:31
27 ---> 8 14:03:31
28 ---> 9 14:03:32
29 ---> 10 14:03:32
30 ---> 11 14:03:32
31 ---> 12 14:03:33
32 ---> 13 14:03:33
33 ---> 14 14:03:33
Semaphore

这个例子可以看到信号量为3,每一秒最多执行三个线程

 

6、queue模块,线程队列

我们知道多线程操作全局变量的时候是不安全的,那么线程队列则没有这个问题。和进程队列一样的方法

在线程中队列有三种方式,① 先进先出 queue.Queue  ② 后进先出  queue.LifoQueue  (其实这个应该叫"栈")  ③  优先级队列  queue.PriorityQueue  这个队列 put一个元组,元组第一个元素为优先级,第二个元素为值,优先级数值越小优先级越高,越先出来

技术分享图片技术分享图片
 1 >>> import queue
 2 >>> q = queue.Queue()
 3 >>> q.put(1)
 4 >>> q.put("a")
 5 >>> q.put("{}")
 6 >>> q.get()
 7 1
 8 >>> q.get()
 9 a
10 >>> q.get()
11 {}
queue.Queue
技术分享图片技术分享图片
 1 >>> import queue
 2 >>> q = queue.LifoQueue()
 3 >>> q.put(1)
 4 >>> q.put("a")
 5 >>> q.put("[]")
 6 >>> q.get()
 7 []
 8 >>> q.get()
 9 a
10 >>> q.get()
11 1
queue.LifoQueue
技术分享图片技术分享图片
 1 >>> import queue
 2 >>> q = queue.PriorityQueue()
 3 >>> q.put((1, "a"))
 4 >>> q.put((2, "{}"))
 5 >>> q.put((3, "[]"))
 6 >>> q.put((3, "[]"))
 7 >>> import queue
 8 >>> q = queue.PriorityQueue()
 9 >>> q.put((1, "a"))
10 >>> q.put((3, "{}"))
11 >>> q.put((2, "[]"))
12 >>> q.get()
13 (1, a)
14 >>> q.get()
15 (2, [])
16 >>> q.get()
17 (3, {})
queue.PriorityQueue
技术分享图片技术分享图片
 1 from threading import Thread
 2 import queue
 3 import time
 4 
 5 
 6 def producer():
 7     global q
 8     for i in range(100):
 9         time.sleep(1)
10         q.put(i)
11         print("+++++ 1个包子")
12 
13 
14 def consumer(name):
15     global q
16     while True:
17         bz = q.get()
18         print("消费者:%s---得到1个包子%s, 时间:%s" % (name, bz, time.strftime("%X")))
19 
20 
21 q = queue.Queue(10)  # 最多能添加10包子
22 prd = Thread(target=producer)
23 prd.start()
24 for i in range(3):
25     csm = Thread(target=consumer, args=(i,))
26     csm.start()
27 
28 
29 # 输出结果
30 +++++ 1个包子
31 消费者:0---得到1个包子0, 时间:15:11:30
32 +++++ 1个包子
33 消费者:1---得到1个包子1, 时间:15:11:31
34 +++++ 1个包子
35 消费者:2---得到1个包子2, 时间:15:11:32
36 +++++ 1个包子
37 消费者:0---得到1个包子3, 时间:15:11:33
38 +++++ 1个包子
39 消费者:1---得到1个包子4, 时间:15:11:34
生产者与消费者

 

python 线程

标签:开始   div   单位   信号量   不可   while   线程同步   strftime   sum   

原文地址:https://www.cnblogs.com/yhongji/p/9735925.html


评论


亲,登录后才可以留言!