如何线程中通信?
2021-03-21 15:25
标签:lock sid 使用 tar XML stringio pen nload element 需求: 思路: 代码: 如何线程中通信? 标签:lock sid 使用 tar XML stringio pen nload element 原文地址:https://www.cnblogs.com/Richardo-M-Q/p/13885101.html
http://table.finance.yahoo.com/table.csv?s=000001.sz
我们通过雅虎网站获取了中国股市某支股票csv数据文件,现在要下载多只股票的csv数据,并将其转换为xml文件
由于全局解释器锁的存在,多线程进行CPU 密集型操作并不能提高执行效率,我们修改程序构架:
1、使用多个DownloadThread线程进行下载(I/O操作)
2、使用一个ConvertThread线程进行转换(CPU密集型操作)
3、下载线程把下载数据安全地传递给转换线程
使用标准库中Queue.Queue,它是一个线程安全的队列,Download线程把下载的数据放入队列,Convert线程从队列中提取数据,它在内部实现了锁,帮助我们完成了同步工作import csv
from xml.etree.ElemenTree import Element, ElementTree
import requests
from StringIO import StringIO
from xml_pretty import pretty
from Queue import Queue
class DownloadThread(Thread):
def __init__(self,sid,queue):
Thread.__init__(self)
self.sid = sid
self.url = ‘http://table.finance.yahoo.com/table.csv?s=%s.sz‘
self.url %= str(sid).rjust(6,‘0‘)
self.queue = queue
def download(self,url):
response = requests.get(url,timeout=3)
if response.ok:
return StringIO(response.content)
def run(self):
print(‘download‘,self.sid)
# 1
data = self.download(self.url)
# 2 (sid,data)
# lock
self.queue.put((self.sid,data))
class ConvertThread(Thread,queue):
def __init__(self):
Thread.__init__(self)
self.queue = queue
def csvToxml(self,scsv,fxml):
reader = csv.reader(scsv)
header = reader.next()
headers = map(lambda h: h.replace( , ),headers)
root = Element("Data")
for row in reader:
eRow = Element("Row")
root.append(eRow)
for tag,text in zip(headers,row):
e = Element(tag)
e.text = text
eRow.append(e)
pretty(root)
et = ElementTree(root)
et.write(fxml)
def run(self):
#1 sid,data
while True:
sid,data = self.queue.get()
print(‘Convert‘,sid)
if sid == -1:
break
if data:
fname = str(sid).rjust(6,‘0‘) +‘.xml‘
with open(fname,‘wb‘) as wf:
self.csvToxml(data,wf)
def handle(sid):
print(‘Download...(%d)‘ % sid)
url = ‘http://table.finance.yahoo.com/table.csv?s=%s.sz‘
url %= str(sid).rjust(6,‘0‘)
rf = download(url)
if rf is None:return
print(‘convert to xml...(%d)‘ % sid)
fname = str(sid).rjust(6,‘0‘) +‘.xml‘
with open(fname,‘wb‘) as wf:
csvToxml(rf,wf)
q = Queue()
dThreads = [DownloadThreadI(i,q) for i in xrange(1,11)]
cThread = ConvertThread(q)
for t in dThreads:
t.start()
cThread.start()
for t in dThreads:
t.join()
q.put((-1,None))