如何线程中通信?

2021-03-21 15:25

阅读:648

标签:lock   sid   使用   tar   XML   stringio   pen   nload   element   

需求:
http://table.finance.yahoo.com/table.csv?s=000001.sz
我们通过雅虎网站获取了中国股市某支股票csv数据文件,现在要下载多只股票的csv数据,并将其转换为xml文件
由于全局解释器锁的存在,多线程进行CPU 密集型操作并不能提高执行效率,我们修改程序构架:
1、使用多个DownloadThread线程进行下载(I/O操作)
2、使用一个ConvertThread线程进行转换(CPU密集型操作)
3、下载线程把下载数据安全地传递给转换线程

思路:
使用标准库中Queue.Queue,它是一个线程安全的队列,Download线程把下载的数据放入队列,Convert线程从队列中提取数据,它在内部实现了锁,帮助我们完成了同步工作

代码:

import csv
from xml.etree.ElemenTree import Element, ElementTree
import requests
from StringIO import StringIO
from xml_pretty import pretty

from Queue import Queue 

class DownloadThread(Thread):
    def __init__(self,sid,queue):
        Thread.__init__(self)
        self.sid = sid
        self.url = ‘http://table.finance.yahoo.com/table.csv?s=%s.sz‘
        self.url %= str(sid).rjust(6,‘0‘)
        self.queue = queue

    def download(self,url):
        response = requests.get(url,timeout=3)
        if response.ok:
            return StringIO(response.content)

    def run(self):
        print(‘download‘,self.sid)
        # 1
        data = self.download(self.url)
        # 2 (sid,data)
        # lock
        self.queue.put((self.sid,data))

        
class ConvertThread(Thread,queue):
    def __init__(self):
        Thread.__init__(self)
        self.queue = queue

    def csvToxml(self,scsv,fxml):
        reader = csv.reader(scsv)
        header = reader.next()
        headers = map(lambda h: h.replace( , ),headers)
    
        root = Element("Data")
        for row in reader:
            eRow = Element("Row")
            root.append(eRow)
            for tag,text in zip(headers,row):
                e = Element(tag)
                e.text = text
                eRow.append(e)

        pretty(root)
        et = ElementTree(root)
        et.write(fxml)

    def run(self):
        #1 sid,data
        while True:
            sid,data = self.queue.get()
            print(‘Convert‘,sid)
            if sid == -1:
                break
            if data:
            fname = str(sid).rjust(6,‘0‘) +‘.xml‘
            with open(fname,‘wb‘) as wf:
                self.csvToxml(data,wf)

def handle(sid):
    print(‘Download...(%d)‘ % sid)
    url = ‘http://table.finance.yahoo.com/table.csv?s=%s.sz‘
    url %= str(sid).rjust(6,‘0‘)
    rf = download(url)
    if rf is None:return

    print(‘convert to xml...(%d)‘ % sid)
    fname = str(sid).rjust(6,‘0‘) +‘.xml‘
    with open(fname,‘wb‘) as wf:
        csvToxml(rf,wf)

q = Queue()
dThreads = [DownloadThreadI(i,q) for i in xrange(1,11)]
cThread = ConvertThread(q)
for t in dThreads:
    t.start()
cThread.start()

for t in dThreads:
    t.join()

q.put((-1,None))

如何线程中通信?

标签:lock   sid   使用   tar   XML   stringio   pen   nload   element   

原文地址:https://www.cnblogs.com/Richardo-M-Q/p/13885101.html


评论


亲,登录后才可以留言!