1.Python通用Spider
2021-01-13 21:15
标签:col dom 提取 with amp sig ast tostring 多线程爬虫 (.*?) 1.Python通用Spider 标签:col dom 提取 with amp sig ast tostring 多线程爬虫 原文地址:https://www.cnblogs.com/L-dongf/p/12945141.html一、requests模块
1、get请求
import requests
def main():
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/81.0.4044.138 Safari/537.36"
}
params = {"wd": "python"}
url = ‘https://www.baidu.com/s‘
response = requests.get(url, headers=headers, params=params)
print(response.status_code)
print(response.request.url)
print(response.content.decode())
if __name__ == ‘__main__‘:
main()
In [9]: assert response.status_code == 200
In [10]: assert response.status_code == 300
---------------------------------------------------------------------------
AssertionError Traceback (most recent call last)
2、post请求
import requests
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/81.0.4044.138 Safari/537.36"}
data = {
"from": "en",
"to": "zh",
"query": "hello",
"transtype": "realtime",
"simple_means_flag": "3",
"sign": "54706.276099",
"token": "62eea0d706973a10baa955cb794bec03",
"domain": "common"
}
post_url = ‘https://fanyi.baidu.com/v2transapi‘
if __name__ == ‘__main__‘:
response = requests.post(post_url, data=data, headers=headers)
print(response)
print(response.content.decode())
3、代理请求
import requests
proxies = {"http": "http://58.253.156.161:9999"}
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/81.0.4044.138 Safari/537.36"}
def main():
r = requests.get("https://www.baidu.com", proxies=proxies, headers=headers)
print(r.status_code)
print(r.content.decode())
if __name__ == ‘__main__‘:
main()
4、模拟登录请求
import requests
def main():
session = requests.session()
post_url = "http://www.renren.com/PLogin.do"
post_data = {"email": "xxx", "password": "xxxx"}
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/81.0.4044.138 Safari/537.36"}
# 使用session发送post请求,将cookie保存在其中
session.post(post_url, data=post_data, headers=headers)
r = session.get("http://www.renren.com/974467258/newsfeed/photo", headers=headers)
with open(‘html/renren.html‘, ‘w‘, encoding=‘utf-8‘) as f:
f.write(r.content.decode())
if __name__ == ‘__main__‘:
main()
import requests
def main():
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/81.0.4044.138 Safari/537.36",
"Cookie": "anonymid=kaduaq5h-dmoxs5; depovince=BJ; _r01_=1; ick_login=4f646e57-a960-40f9-a753-a09c226cd84d; taihe_bi_sdk_uid=4eeb444d69cb4214801515232a43e736; taihe_bi_sdk_session=2f36595c5512561fe1a0148c2eac89db; ick=0821386d-deab-4f18-bf1b-82d5f9620b52; JSESSIONID=abczCFIwWH3dd9DseUSix; first_login_flag=1; ln_uact=13131321334; ln_hurl=http://head.xiaonei.com/photos/0/0/men_main.gif; jebe_key=92f610c1-e888-41bd-90b8-b2599087b5ac%7Cd6c37d61861087de490c40113f673a10%7C1589888308248%7C1%7C1589888307470; jebe_key=92f610c1-e888-41bd-90b8-b2599087b5ac%7Cd6c37d61861087de490c40113f673a10%7C1589888308248%7C1%7C1589888307472; wp_fold=0; wp=0; vip=1; jebecookies=1aafcf53-ea20-4ee1-bf64-9e6c5c43f670|||||; _de=E082FAD4161B8D2D027123AF20EEBA9A; p=9b9dc071200a1ed265993332d9e447028; t=665f5d65a4726516af87f0fdb9865cf58; societyguester=665f5d65a4726516af87f0fdb9865cf58; id=974467258; xnsid=d2c1a658; ver=7.0; loginfrom=null"
}
r = requests.get("http://www.renren.com/974467258/newsfeed/photo", headers=headers)
with open(‘html/renren2.html‘, ‘w‘, encoding=‘utf-8‘) as f:
f.write(r.content.decode())
if __name__ == ‘__main__‘:
main()
import requests
def main():
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/81.0.4044.138 Safari/537.36"}
cookies = "anonymid=kaduaq5h-dmoxs5; depovince=BJ; _r01_=1; ick_login=4f646e57-a960-40f9-a753-a09c226cd84d; taihe_bi_sdk_uid=4eeb444d69cb4214801515232a43e736; taihe_bi_sdk_session=2f36595c5512561fe1a0148c2eac89db; ick=0821386d-deab-4f18-bf1b-82d5f9620b52; JSESSIONID=abczCFIwWH3dd9DseUSix; first_login_flag=1; ln_uact=13131321334; ln_hurl=http://head.xiaonei.com/photos/0/0/men_main.gif; jebe_key=92f610c1-e888-41bd-90b8-b2599087b5ac%7Cd6c37d61861087de490c40113f673a10%7C1589888308248%7C1%7C1589888307470; jebe_key=92f610c1-e888-41bd-90b8-b2599087b5ac%7Cd6c37d61861087de490c40113f673a10%7C1589888308248%7C1%7C1589888307472; wp_fold=0; wp=0; vip=1; jebecookies=1aafcf53-ea20-4ee1-bf64-9e6c5c43f670|||||; _de=E082FAD4161B8D2D027123AF20EEBA9A; p=9b9dc071200a1ed265993332d9e447028; t=665f5d65a4726516af87f0fdb9865cf58; societyguester=665f5d65a4726516af87f0fdb9865cf58; id=974467258; xnsid=d2c1a658; ver=7.0; loginfrom=null"
cookies_dict = {i.split(‘=‘)[0]:i.split(‘=‘)[1] for i in cookies.split(‘; ‘)} # 字典推导式
print(cookies_dict)
r = requests.get("http://www.renren.com/974467258/newsfeed/photo", headers=headers, cookies=cookies_dict)
with open(‘html/renren3.html‘, ‘w‘, encoding=‘utf-8‘) as f:
f.write(r.content.decode())
if __name__ == ‘__main__‘:
main()
5、requests获取HTML
import requests
def main():
response = requests.get(‘http://www.baidu.com‘)
# 获取HTML字符串的方法一
response.encoding = ‘utf-8‘
print(response.encoding)
print(response.text) # str类型
# 获取HTML字符串的方法二,推荐
print(response.content.decode(‘utf-8‘)) # bytes类型
if __name__ == ‘__main__‘:
main()
6、requests获取图片
import requests
def main():
r = requests.get(‘https://ww2.sinaimg.cn/large/0064sfU0jw1f663hy23ggj30m80xc0v0.jpg‘)
with open(‘html/a.png‘, ‘wb‘) as f:
f.write(r.content)
if __name__ == ‘__main__‘:
main()
7、cookie转换字典
In [21]: r = requests.get("http://www.baidu.com")
In [22]: r.cookies
Out[22]:
8、URL编解码
In [25]: requests.utils.quote("http://www.baidu.com/s?wd=你好")
Out[25]: ‘http%3A//www.baidu.com/s%3Fwd%3D%E4%BD%A0%E5%A5%BD‘
In [26]: requests.utils.unquote(‘http%3A//www.baidu.com/s%3Fwd%3D%E4%BD%A0%E5%A5%BD‘)
Out[26]: ‘http://www.baidu.com/s?wd=你好‘
9、忽略HTTPS证书和设置超时时间
In [30]: r = requests.get(‘https://www.12306.cn/‘, verify=False, timeout=10)
二、retrying模块
import requests
from retrying import retry
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/81.0.4044.138 Safari/537.36"}
@retry(stop_max_attempt_number=3) # 如果报错最多重试3次
def _parse_url(url, method, data, proxies):
if method == ‘POST‘:
response = requests.post(url, data=data, headers=headers, timeout=3)
else:
response = requests.get(url, headers=headers, timeout=3)
assert response.status_code == 200
return response.content.decode()
def parse_url(url, method="GET", data=None, proxies={}):
try:
html_str = _parse_url(url, method)
except:
html_str = None
return html_str
if __name__ == ‘__main__‘:
print(parse_url(‘http://www.baidu.com‘))
三、json模块
import json
from parse_url import parse_url
from pprint import pprint
def main():
url = ‘https://movie.douban.com/j/search_subjects?type=movie&tag=%E7%83%AD%E9%97%A8&page_limit=10&page_start=0‘
html_srt = parse_url(url)
ret = json.loads(html_srt)
# 美化输出
# pprint(ret)
# 将python类型数据转为json字符串
with open(‘html/douban.json‘, ‘w‘, encoding=‘utf-8‘) as f:
f.write(json.dumps(ret, ensure_ascii=False, indent=4)) # 不用ASCII码编码,indent:格式化输出
with open(‘html/douban.json‘, ‘r‘, encoding=‘utf-8‘) as f:
ret2 = json.loads(f.read())
pprint(ret2)
if __name__ == ‘__main__‘:
main()
四、练习示例
import requests
import json
class DoubanSpider(object):
def __init__(self):
self.url = ‘https://m.douban.com/rexxar/api/v2/subject_collection/book_top250/items?&start={}&count=18&loc_id=0‘
self.headers = {
‘User-Agent‘: ‘Mozilla/5.0 (iPhone; CPU iPhone OS 13_2_3 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/13.0.3 Mobile/15E148 Safari/604.1‘,
‘Referer‘: ‘https://m.douban.com/book/top250‘}
def parse_url(self, url):
r = requests.get(url, headers=self.headers)
return r.content.decode()
def get_total(self):
url = self.url.format(0)
return json.loads(self.parse_url(url))[‘total‘]
def recv_2_list(self, total, start, book_list_temp):
while True:
if start >= total:
break
url = self.url.format(start)
ret = json.loads(self.parse_url(url))
start += 18
# print([x[‘title‘] for x in ret[‘subject_collection_items‘]])
book_list_temp += [x[‘title‘] for x in ret[‘subject_collection_items‘]]
return book_list_temp
def run(self):
# 1. 获取总的数据长度
total = self.get_total()
# 2. 循环接收
book_list_temp = list()
book_list = self.recv_2_list(total, 0, book_list_temp)
# 3. 将返回的列表写入文件
with open(‘html/douban.json‘, ‘w‘, encoding=‘utf-8‘) as f:
json.dump(book_list, f, ensure_ascii=False, indent=2)
if __name__ == ‘__main__‘:
d = DoubanSpider()
d.run()
import requests
import re
from retrying import retry
class MeinvSpider(object):
def __init__(self):
# self.url = ‘https://xxx_{}.htm‘
self.url = ‘http://m.m1mm.com/mm/2986/{}‘
self.headers = {
‘User-Agent‘: ‘Mozilla/5.0 (iPhone; CPU iPhone OS 13_2_3 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/13.0.3 Mobile/15E148 Safari/604.1‘,
}
# self.proxies = {"http": "http://60.167.20.240:8888"}
@retry(stop_max_attempt_number=3)
def parse_url(self, url):
r = requests.get(url, headers=self.headers)
assert r.status_code == 200
return r.content
def get_image_url(self, html_str):
return re.findall(r‘src="http://www.mamicode.com/(http://.*?\.jpg)"‘, html_str)[0]
def save_2_file(self, img_url, index):
with open(‘img/‘ + str(index) + ‘.jpg‘, ‘wb‘) as f:
img_data = self.parse_url(img_url)
f.write(img_data)
print(‘保存完成‘, index)
def run(self):
page_num = 1
while True:
url = self.url.format(page_num)
try:
htmp_str = self.parse_url(url).decode(‘utf-8‘)
except Exception as e:
print(e)
break
img_url = self.get_image_url(htmp_str)
print(img_url)
self.save_2_file(img_url, page_num)
page_num += 1
def main():
m = MeinvSpider()
m.run()
if __name__ == ‘__main__‘:
main()
import requests
import re
class DuanziSpider(object):
def __init__(self):
self.start_url = ‘https://xxx/‘
self.headers = {
"user-agent": "Mozilla/5.0 (iPhone; CPU iPhone OS 13_2_3 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/13.0.3 Mobile/15E148 Safari/604.1"
}
def parse_url(self, url):
response = requests.get(url, headers=self.headers)
assert response.status_code == 200
return response.content.decode()
def get_total_page_num(self, html_str):
total_page_num = re.findall(r‘第 1 页 / 共 (\d+) 页‘, html_str)
return int(total_page_num[0])
def get_content_list(self, html_str):
ret = re.findall(r‘
五、xpath模块
nodename 选取此节点的所有节点
/ 从根节点选取
// 从匹配选择的当前节点选择文档中的节点,而不考虑它们的路径
. 选取当前节点
.. 选取父节点
@ 选取属性
a.text()
a/@href
/html//img[@id="bigpicimg"]
//a[text()=‘下一页>‘]
//div[@id=‘page‘]/a[last()]
//div[@id=‘page‘]/a[position()>3]
//div[@id=‘page‘]/a[position()5]
//div[contains(@class, ‘i‘)]
from lxml import etree
import requests
def main():
text = requests.get(‘http://xxx‘, headers={‘User-Agent‘: ‘Mozilla/5.0 (iPhone; CPU iPhone OS 13_2_3 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/13.0.3 Mobile/15E148 Safari/604.1‘}).content.decode()
html = etree.HTML(text)
# html = etree.tostring(html).decode() # 修正HTML代码
ret = html.xpath(‘//img/@src‘)
print(ret)
if __name__ == ‘__main__‘:
main()
六、练习示例2
import requests
from lxml import etree
import json
class TiebaSpider(object):
def __init__(self, tieba_name):
self.tieba_name = tieba_name
self.start_url = ‘https://tieba.baidu.com/mo/q/m?word=%E6%9D%8E%E6%AF%85&tn4=bdKSW&sub4=%E8%BF%9B%E5%90%A7&pn=30&‘
self.part_url = ‘https://tieba.baidu.com/‘
self.headers = {
"User-Agent": "Mozilla/5.0 (iPhone; CPU iPhone OS 13_2_3 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/13.0.3 Mobile/15E148 Safari/604.1"}
def parse_url(self, url):
response = requests.get(url, headers=self.headers)
return response.content.decode()
def get_content_list(self, html_str):
html = etree.HTML(html_str)
li_list = html.xpath(‘//li[@class="tl_shadowtl_shadow_new"]‘)
content_list = list()
for li in li_list:
item = {}
item["title"] = li.xpath(‘//div[@class="ti_title"]/span/text()‘) if len(
li.xpath(‘//div[@class="ti_title"]/span/text()‘)) > 0 else None
item["href"] = self.part_url+li.xpath(‘//a/@href‘) if len(li.xpath(‘//a/@href‘)) > 0 else None
item["img_list"] = self.get_img_list(item["href"], list())
content_list.append(item)
next_url = self.part_url+html.xpath(‘//a[text()="下一页"]‘)[0] if len(html.xpath(‘//a[text()="下一页"]‘)[0]) > 0 else None
return content_list, next_url
def get_img_list(self, detail_url, total_img_list):
# 3.1提取列表页的URL地址和标题
# 3.2请求列表页的URL,获取详情的第一页
detail_html_str = self.parse_url(detail_url)
detail_html = etree.HTML(detail_html_str)
# 3.3提取详情第一页的图片,提取下一页的地址
img_list = detail_html.xpath(‘//img[@class="BDF_Image"]/@src‘)
total_img_list.extend(img_list)
# 3.4请求详情页下一页的地址,进入循环3.2-3.4
detail_next_url = detail_html.xpath(‘//a[text()="下一页"]/@href‘)
if len(detail_html) > 0:
detail_next_url = self.part_url+detail_next_url[0]
return self.get_img_list(detail_next_url, total_img_list)
return total_img_list
def save_content_list(self, content_list):
file_path = ‘tieba/‘ + self.tieba_name + ‘.txt‘
with open(file_path, ‘a‘) as f:
for content in content_list:
f.write(json.dumps(content, ensure_ascii=False, indent=2))
f.write(‘\n‘)
def run(self):
next_url = self.start_url
while next_url is not None:
# 1.start_url
# 2.发送请求,获取响应
html_str = self.parse_url(next_url)
print(html_str)
# 3.提取数据,提取下一页的URL地址
content_list, next_url = self.get_content_list(html_str)
# 4.保存数据
self.save_content_list(content_list)
# 5.请求下一页的URL地址,进入循环2-5步
if __name__ == ‘__main__‘:
teibaspider = TiebaSpider(‘lol‘)
teibaspider.run()
import requests
from lxml import etree
class QiubaiSpider(object):
def __init__(self):
self.url_temp = "https://www.qiushibaike.com/text/page/{}/"
self.headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/81.0.4044.138 Safari/537.36"}
def get_url_list(self):
return [self.url_temp.format(i) for i in range(1, 14)]
def parse_url(self, url):
response = requests.get(url, headers=self.headers)
return response.content.decode()
def get_content_list(self, html_str):
html = etree.HTML(html_str)
div_list = html.xpath(‘//div[@class="article block untagged mb15 typs_hot"]‘)
content_list = list()
for div in div_list:
item = dict()
item[‘content‘] = div.xpath(‘./a/div/span/text()‘)
item[‘content‘] = [i.replace("\n", "") for i in item[‘content‘]]
# item[‘author_gender‘] = div.xpath(‘.//div[contains(@class, "articleGender")]/@class‘)
# item[‘author_gender‘] = item[‘author_gender‘][0].split()[-1].replace("Icon", "") if len(item[‘author_gender‘][0]) else None
content_list.append(item)
return content_list
def save_content_list(self, content_list):
for i in content_list:
print(i)
def run(self):
# 1.获取url_list
url_list = self.get_url_list()
# 2.遍历,发送请求,获取响应
for url in url_list:
html_str = self.parse_url(url)
# 3.提取数据
content_list = self.get_content_list(html_str)
# 4.保存
self.save_content_list(content_list)
if __name__ == ‘__main__‘:
qiubaibpider = QiubaiSpider()
qiubaibpider.run()
七、多线程爬虫
import requests
from lxml import etree
import threading
from queue import Queue
class QiubaiSpider(object):
def __init__(self):
self.url_temp = "https://www.qiushibaike.com/text/page/{}/"
self.headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/81.0.4044.138 Safari/537.36"}
self.url_queue = Queue()
self.html_queue = Queue()
self.content_queue = Queue()
def get_url_list(self):
# return [self.url_temp.format(i) for i in range(1, 14)]
for i in range(1, 14):
self.url_queue.put(self.url_temp.format(i))
def parse_url(self):
while True:
url = self.url_queue.get()
response = requests.get(url, headers=self.headers)
# return response.content.decode()
self.html_queue.put(response.content.decode())
self.url_queue.task_done()
def get_content_list(self):
while True:
html_str = self.html_queue.get()
html = etree.HTML(html_str)
div_list = html.xpath(‘//div[@class="article block untagged mb15 typs_hot"]‘)
content_list = list()
for div in div_list:
item = dict()
item[‘content‘] = div.xpath(‘./a/div/span/text()‘)
item[‘content‘] = [i.replace("\n", "") for i in item[‘content‘]]
# item[‘author_gender‘] = div.xpath(‘.//div[contains(@class, "articleGender")]/@class‘)
# item[‘author_gender‘] = item[‘author_gender‘][0].split()[-1].replace("Icon", "") if len(item[‘author_gender‘][0]) else None
content_list.append(item)
# return content_list
self.content_queue.put(content_list)
self.html_queue.task_done()
def save_content_list(self):
while True:
content_list = self.content_queue.get()
for i in content_list:
print(i)
self.content_queue.task_done()
def run(self):
thread_list = list()
# 1.获取url_list
t_url = threading.Thread(target=self.get_url_list)
thread_list.append(t_url)
# 2.遍历,发送请求,获取响应
for i in range(5):
t_parse = threading.Thread(target=self.parse_url)
thread_list.append(t_parse)
# # 3.提取数据
t_html = threading.Thread(target=self.get_content_list)
thread_list.append(t_html)
# # 4.保存
t_save = threading.Thread(target=self.save_content_list)
thread_list.append(t_save)
for t in thread_list:
t.setDaemon(True) # 把子线程设置守护线程,主线程结束后子线程跟着结束
t.start()
for q in [self.url_queue, self.html_queue, self.content_queue]:
q.join() # 让主线程阻塞,等待队列的任务完成后继续
print(‘main thread done‘)
if __name__ == ‘__main__‘:
qiubaibpider = QiubaiSpider()
qiubaibpider.run()
上一篇:线程的生命周期
下一篇:centos安装nodejs