1.Python通用Spider

2021-01-13 21:15

阅读:886

标签:col   dom   提取   with   amp   sig   ast   tostring   多线程爬虫   

一、requests模块

1、get请求

import requests


def main():
    headers = {
        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/81.0.4044.138 Safari/537.36"
    }
    params = {"wd": "python"}
    url = ‘https://www.baidu.com/s‘

    response = requests.get(url, headers=headers, params=params)
    print(response.status_code)
    print(response.request.url)
    print(response.content.decode())


if __name__ == ‘__main__‘:
    main()

  • 断言
In [9]: assert response.status_code == 200

In [10]: assert response.status_code == 300
---------------------------------------------------------------------------
AssertionError                            Traceback (most recent call last)
 in 
----> 1 assert response.status_code == 300

AssertionError:

2、post请求

import requests

headers = {
    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/81.0.4044.138 Safari/537.36"}

data = {
    "from": "en",
    "to": "zh",
    "query": "hello",
    "transtype": "realtime",
    "simple_means_flag": "3",
    "sign": "54706.276099",
    "token": "62eea0d706973a10baa955cb794bec03",
    "domain": "common"
}

post_url = ‘https://fanyi.baidu.com/v2transapi‘


if __name__ == ‘__main__‘:
    response = requests.post(post_url, data=data, headers=headers)
    print(response)
    print(response.content.decode())

3、代理请求

import requests

proxies = {"http": "http://58.253.156.161:9999"}

headers = {
    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/81.0.4044.138 Safari/537.36"}


def main():
    r = requests.get("https://www.baidu.com", proxies=proxies, headers=headers)
    print(r.status_code)
    print(r.content.decode())


if __name__ == ‘__main__‘:
    main()

4、模拟登录请求

  • 使用session登录
import requests


def main():
    session = requests.session()
    post_url = "http://www.renren.com/PLogin.do"
    post_data = {"email": "xxx", "password": "xxxx"}
    headers = {
        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/81.0.4044.138 Safari/537.36"}

    # 使用session发送post请求,将cookie保存在其中
    session.post(post_url, data=post_data, headers=headers)

    r = session.get("http://www.renren.com/974467258/newsfeed/photo", headers=headers)

    with open(‘html/renren.html‘, ‘w‘, encoding=‘utf-8‘) as f:
        f.write(r.content.decode())


if __name__ == ‘__main__‘:
    main()
  • 使用cookie登录
import requests


def main():
    headers = {
        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/81.0.4044.138 Safari/537.36",
        "Cookie": "anonymid=kaduaq5h-dmoxs5; depovince=BJ; _r01_=1; ick_login=4f646e57-a960-40f9-a753-a09c226cd84d; taihe_bi_sdk_uid=4eeb444d69cb4214801515232a43e736; taihe_bi_sdk_session=2f36595c5512561fe1a0148c2eac89db; ick=0821386d-deab-4f18-bf1b-82d5f9620b52; JSESSIONID=abczCFIwWH3dd9DseUSix; first_login_flag=1; ln_uact=13131321334; ln_hurl=http://head.xiaonei.com/photos/0/0/men_main.gif; jebe_key=92f610c1-e888-41bd-90b8-b2599087b5ac%7Cd6c37d61861087de490c40113f673a10%7C1589888308248%7C1%7C1589888307470; jebe_key=92f610c1-e888-41bd-90b8-b2599087b5ac%7Cd6c37d61861087de490c40113f673a10%7C1589888308248%7C1%7C1589888307472; wp_fold=0; wp=0; vip=1; jebecookies=1aafcf53-ea20-4ee1-bf64-9e6c5c43f670|||||; _de=E082FAD4161B8D2D027123AF20EEBA9A; p=9b9dc071200a1ed265993332d9e447028; t=665f5d65a4726516af87f0fdb9865cf58; societyguester=665f5d65a4726516af87f0fdb9865cf58; id=974467258; xnsid=d2c1a658; ver=7.0; loginfrom=null"
    }

    r = requests.get("http://www.renren.com/974467258/newsfeed/photo", headers=headers)

    with open(‘html/renren2.html‘, ‘w‘, encoding=‘utf-8‘) as f:
        f.write(r.content.decode())


if __name__ == ‘__main__‘:
    main()

  • 使用cookie登录2
import requests


def main():
    headers = {
        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/81.0.4044.138 Safari/537.36"}
    cookies = "anonymid=kaduaq5h-dmoxs5; depovince=BJ; _r01_=1; ick_login=4f646e57-a960-40f9-a753-a09c226cd84d; taihe_bi_sdk_uid=4eeb444d69cb4214801515232a43e736; taihe_bi_sdk_session=2f36595c5512561fe1a0148c2eac89db; ick=0821386d-deab-4f18-bf1b-82d5f9620b52; JSESSIONID=abczCFIwWH3dd9DseUSix; first_login_flag=1; ln_uact=13131321334; ln_hurl=http://head.xiaonei.com/photos/0/0/men_main.gif; jebe_key=92f610c1-e888-41bd-90b8-b2599087b5ac%7Cd6c37d61861087de490c40113f673a10%7C1589888308248%7C1%7C1589888307470; jebe_key=92f610c1-e888-41bd-90b8-b2599087b5ac%7Cd6c37d61861087de490c40113f673a10%7C1589888308248%7C1%7C1589888307472; wp_fold=0; wp=0; vip=1; jebecookies=1aafcf53-ea20-4ee1-bf64-9e6c5c43f670|||||; _de=E082FAD4161B8D2D027123AF20EEBA9A; p=9b9dc071200a1ed265993332d9e447028; t=665f5d65a4726516af87f0fdb9865cf58; societyguester=665f5d65a4726516af87f0fdb9865cf58; id=974467258; xnsid=d2c1a658; ver=7.0; loginfrom=null"
    cookies_dict = {i.split(‘=‘)[0]:i.split(‘=‘)[1] for i in cookies.split(‘; ‘)}  # 字典推导式
    print(cookies_dict)

    r = requests.get("http://www.renren.com/974467258/newsfeed/photo", headers=headers, cookies=cookies_dict)

    with open(‘html/renren3.html‘, ‘w‘, encoding=‘utf-8‘) as f:
        f.write(r.content.decode())


if __name__ == ‘__main__‘:
    main()

5、requests获取HTML

import requests


def main():
    response = requests.get(‘http://www.baidu.com‘)

    # 获取HTML字符串的方法一
    response.encoding = ‘utf-8‘
    print(response.encoding)
    print(response.text)  # str类型

    # 获取HTML字符串的方法二,推荐
    print(response.content.decode(‘utf-8‘))  # bytes类型


if __name__ == ‘__main__‘:
    main()

6、requests获取图片

import requests


def main():
    r = requests.get(‘https://ww2.sinaimg.cn/large/0064sfU0jw1f663hy23ggj30m80xc0v0.jpg‘)

    with open(‘html/a.png‘, ‘wb‘) as f:
        f.write(r.content)

if __name__ == ‘__main__‘:
    main()

7、cookie转换字典

In [21]: r = requests.get("http://www.baidu.com")

In [22]: r.cookies
Out[22]: 

In [23]: requests.utils.dict_from_cookiejar(r.cookies)
Out[23]: {‘BDORZ‘: ‘27315‘}

In [24]: requests.utils.cookiejar_from_dict({‘BDORZ‘: ‘27315‘})
Out[24]: 

8、URL编解码

In [25]: requests.utils.quote("http://www.baidu.com/s?wd=你好")
Out[25]: ‘http%3A//www.baidu.com/s%3Fwd%3D%E4%BD%A0%E5%A5%BD‘

In [26]: requests.utils.unquote(‘http%3A//www.baidu.com/s%3Fwd%3D%E4%BD%A0%E5%A5%BD‘)
Out[26]: ‘http://www.baidu.com/s?wd=你好‘

9、忽略HTTPS证书和设置超时时间

In [30]: r = requests.get(‘https://www.12306.cn/‘, verify=False, timeout=10)

二、retrying模块

import requests
from retrying import retry

headers = {
    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/81.0.4044.138 Safari/537.36"}

@retry(stop_max_attempt_number=3)  # 如果报错最多重试3次
def _parse_url(url, method, data, proxies):
    if method == ‘POST‘:
        response = requests.post(url, data=data, headers=headers, timeout=3)
    else:
        response = requests.get(url, headers=headers, timeout=3)
    assert response.status_code == 200
    return response.content.decode()


def parse_url(url, method="GET", data=None, proxies={}):
    try:
        html_str = _parse_url(url, method)
    except:
        html_str = None
    return html_str


if __name__ == ‘__main__‘:
    print(parse_url(‘http://www.baidu.com‘))

三、json模块

import json
from parse_url import parse_url
from pprint import pprint

def main():
    url = ‘https://movie.douban.com/j/search_subjects?type=movie&tag=%E7%83%AD%E9%97%A8&page_limit=10&page_start=0‘
    html_srt = parse_url(url)
    ret = json.loads(html_srt)

    # 美化输出
    # pprint(ret)

    # 将python类型数据转为json字符串
    with open(‘html/douban.json‘, ‘w‘, encoding=‘utf-8‘) as f:
        f.write(json.dumps(ret, ensure_ascii=False, indent=4))  # 不用ASCII码编码,indent:格式化输出

    with open(‘html/douban.json‘, ‘r‘, encoding=‘utf-8‘) as f:
        ret2 = json.loads(f.read())
        pprint(ret2)

    

if __name__ == ‘__main__‘:
    main()

四、练习示例

  • 爬取豆瓣书单
import requests
import json


class DoubanSpider(object):
    def __init__(self):
        self.url = ‘https://m.douban.com/rexxar/api/v2/subject_collection/book_top250/items?&start={}&count=18&loc_id=0‘
        self.headers = {
            ‘User-Agent‘: ‘Mozilla/5.0 (iPhone; CPU iPhone OS 13_2_3 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/13.0.3 Mobile/15E148 Safari/604.1‘,
            ‘Referer‘: ‘https://m.douban.com/book/top250‘}

    def parse_url(self, url):
        r = requests.get(url, headers=self.headers)
        return r.content.decode()

    def get_total(self):
        url = self.url.format(0)
        return json.loads(self.parse_url(url))[‘total‘]

    def recv_2_list(self, total, start, book_list_temp):
        while True:
            if start >= total:
                break
            url = self.url.format(start)
            ret = json.loads(self.parse_url(url))
            start += 18
            # print([x[‘title‘] for x in ret[‘subject_collection_items‘]])
            book_list_temp += [x[‘title‘] for x in ret[‘subject_collection_items‘]]
        return book_list_temp

    def run(self):
        # 1. 获取总的数据长度
        total = self.get_total()
        # 2. 循环接收
        book_list_temp = list()
        book_list = self.recv_2_list(total, 0, book_list_temp)
        # 3. 将返回的列表写入文件
        with open(‘html/douban.json‘, ‘w‘, encoding=‘utf-8‘) as f:
            json.dump(book_list, f, ensure_ascii=False, indent=2)


if __name__ == ‘__main__‘:
    d = DoubanSpider()
    d.run()

  • 爬取图片
import requests
import re
from retrying import retry


class MeinvSpider(object):
    def __init__(self):
        # self.url = ‘https://xxx_{}.htm‘
        self.url = ‘http://m.m1mm.com/mm/2986/{}‘
        self.headers = {
            ‘User-Agent‘: ‘Mozilla/5.0 (iPhone; CPU iPhone OS 13_2_3 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/13.0.3 Mobile/15E148 Safari/604.1‘,
        }
        # self.proxies = {"http": "http://60.167.20.240:8888"}

    @retry(stop_max_attempt_number=3)
    def parse_url(self, url):
        r = requests.get(url, headers=self.headers)
        assert r.status_code == 200
        return r.content

    def get_image_url(self, html_str):
        return re.findall(r‘src="http://www.mamicode.com/(http://.*?\.jpg)"‘, html_str)[0]

    def save_2_file(self, img_url, index):
        with open(‘img/‘ + str(index) + ‘.jpg‘, ‘wb‘) as f:
            img_data = self.parse_url(img_url)
            f.write(img_data)
        print(‘保存完成‘, index)

    def run(self):
        page_num = 1
        while True:
            url = self.url.format(page_num)
            try:
                htmp_str = self.parse_url(url).decode(‘utf-8‘)
            except Exception as e:
                print(e)
                break
            img_url = self.get_image_url(htmp_str)
            print(img_url)
            self.save_2_file(img_url, page_num)
            page_num += 1


def main():
    m = MeinvSpider()
    m.run()


if __name__ == ‘__main__‘:
    main()

  • 爬取段子
import requests
import re


class DuanziSpider(object):
    def __init__(self):
        self.start_url = ‘https://xxx/‘
        self.headers = {
            "user-agent": "Mozilla/5.0 (iPhone; CPU iPhone OS 13_2_3 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/13.0.3 Mobile/15E148 Safari/604.1"
        }

    def parse_url(self, url):
        response = requests.get(url, headers=self.headers)
        assert response.status_code == 200
        return response.content.decode()

    def get_total_page_num(self, html_str):
        total_page_num = re.findall(r‘第 1 页 / 共 (\d+) 页‘, html_str)
        return int(total_page_num[0])

    def get_content_list(self, html_str):
        ret = re.findall(r‘

(.*?)

‘, html_str) return ret def save_content_2_file(self, content_list): with open(‘duanzitxt/duanzi.txt‘, ‘a‘, encoding=‘utf-8‘) as f: for content in content_list: f.write(content) f.write(‘\n‘) print(‘保存完成‘) def run(self): # 整体流程控制 # 1.请求获取数据 html_str = self.parse_url(self.start_url) # 2.分析数据 content_list = self.get_content_list(html_str) # 3.保存数据 self.save_content_2_file(content_list) # 4.构造下一页URL num = 2 total_page_num = self.get_total_page_num(html_str) while num

五、xpath模块

nodename  选取此节点的所有节点
/ 从根节点选取
// 从匹配选择的当前节点选择文档中的节点,而不考虑它们的路径
. 选取当前节点
.. 选取父节点
@ 选取属性
  • 获取文本:a.text()
  • 选取属性:a/@href
  • 不考虑路径:/html//img[@id="bigpicimg"]
  • 根据文本取百度下一页://a[text()=‘下一页>‘]
//div[@id=‘page‘]/a[last()]
//div[@id=‘page‘]/a[position()>3]
//div[@id=‘page‘]/a[position()5]
//div[contains(@class, ‘i‘)]
from lxml import etree
import requests


def main():
    text = requests.get(‘http://xxx‘, headers={‘User-Agent‘: ‘Mozilla/5.0 (iPhone; CPU iPhone OS 13_2_3 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/13.0.3 Mobile/15E148 Safari/604.1‘}).content.decode()
    html = etree.HTML(text)
    # html = etree.tostring(html).decode()  # 修正HTML代码
    ret = html.xpath(‘//img/@src‘)
    print(ret)


if __name__ == ‘__main__‘:
    main()

六、练习示例2

  • 爬百度贴吧
import requests
from lxml import etree
import json


class TiebaSpider(object):
    def __init__(self, tieba_name):
        self.tieba_name = tieba_name
        self.start_url = ‘https://tieba.baidu.com/mo/q/m?word=%E6%9D%8E%E6%AF%85&tn4=bdKSW&sub4=%E8%BF%9B%E5%90%A7&pn=30&‘
        self.part_url = ‘https://tieba.baidu.com/‘
        self.headers = {
            "User-Agent": "Mozilla/5.0 (iPhone; CPU iPhone OS 13_2_3 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/13.0.3 Mobile/15E148 Safari/604.1"}

    def parse_url(self, url):
        response = requests.get(url, headers=self.headers)
        return response.content.decode()

    def get_content_list(self, html_str):
        html = etree.HTML(html_str)
        li_list = html.xpath(‘//li[@class="tl_shadowtl_shadow_new"]‘)
        content_list = list()
        for li in li_list:
            item = {}
            item["title"] = li.xpath(‘//div[@class="ti_title"]/span/text()‘) if len(
                li.xpath(‘//div[@class="ti_title"]/span/text()‘)) > 0 else None
            item["href"] = self.part_url+li.xpath(‘//a/@href‘) if len(li.xpath(‘//a/@href‘)) > 0 else None
            item["img_list"] = self.get_img_list(item["href"], list())
            content_list.append(item)
        next_url = self.part_url+html.xpath(‘//a[text()="下一页"]‘)[0] if len(html.xpath(‘//a[text()="下一页"]‘)[0]) > 0 else None
        return content_list, next_url

    def get_img_list(self, detail_url, total_img_list):
        # 3.1提取列表页的URL地址和标题
        # 3.2请求列表页的URL,获取详情的第一页
        detail_html_str = self.parse_url(detail_url)
        detail_html = etree.HTML(detail_html_str)
        # 3.3提取详情第一页的图片,提取下一页的地址
        img_list = detail_html.xpath(‘//img[@class="BDF_Image"]/@src‘)
        total_img_list.extend(img_list)
        # 3.4请求详情页下一页的地址,进入循环3.2-3.4
        detail_next_url = detail_html.xpath(‘//a[text()="下一页"]/@href‘)
        if len(detail_html) > 0:
            detail_next_url = self.part_url+detail_next_url[0]
            return self.get_img_list(detail_next_url, total_img_list)
        return total_img_list

    def save_content_list(self, content_list):
        file_path = ‘tieba/‘ + self.tieba_name + ‘.txt‘
        with open(file_path, ‘a‘) as f:
            for content in content_list:
                f.write(json.dumps(content, ensure_ascii=False, indent=2))
                f.write(‘\n‘)

    def run(self):
        next_url = self.start_url
        while next_url is not None:
            # 1.start_url
            # 2.发送请求,获取响应
            html_str = self.parse_url(next_url)
            print(html_str)
            # 3.提取数据,提取下一页的URL地址
            content_list, next_url = self.get_content_list(html_str)
            # 4.保存数据
            self.save_content_list(content_list)
            # 5.请求下一页的URL地址,进入循环2-5步


if __name__ == ‘__main__‘:
    teibaspider = TiebaSpider(‘lol‘)
    teibaspider.run()

  • 爬取糗事百科
import requests
from lxml import etree


class QiubaiSpider(object):
    def __init__(self):
        self.url_temp = "https://www.qiushibaike.com/text/page/{}/"
        self.headers = {
            "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/81.0.4044.138 Safari/537.36"}

    def get_url_list(self):
        return [self.url_temp.format(i) for i in range(1, 14)]

    def parse_url(self, url):
        response = requests.get(url, headers=self.headers)
        return response.content.decode()

    def get_content_list(self, html_str):
        html = etree.HTML(html_str)
        div_list = html.xpath(‘//div[@class="article block untagged mb15 typs_hot"]‘)
        content_list = list()
        for div in div_list:
            item = dict()
            item[‘content‘] = div.xpath(‘./a/div/span/text()‘)
            item[‘content‘] = [i.replace("\n", "") for i in item[‘content‘]]
            # item[‘author_gender‘] = div.xpath(‘.//div[contains(@class, "articleGender")]/@class‘)
            # item[‘author_gender‘] = item[‘author_gender‘][0].split()[-1].replace("Icon", "") if len(item[‘author_gender‘][0]) else None
            content_list.append(item)
        return content_list

    def save_content_list(self, content_list):
        for i in content_list:
            print(i)

    def run(self):
        # 1.获取url_list
        url_list = self.get_url_list()
        # 2.遍历,发送请求,获取响应
        for url in url_list:
            html_str = self.parse_url(url)
            # 3.提取数据
            content_list = self.get_content_list(html_str)
            # 4.保存
            self.save_content_list(content_list)


if __name__ == ‘__main__‘:
    qiubaibpider = QiubaiSpider()
    qiubaibpider.run()

七、多线程爬虫

import requests
from lxml import etree
import threading
from queue import Queue


class QiubaiSpider(object):
    def __init__(self):
        self.url_temp = "https://www.qiushibaike.com/text/page/{}/"
        self.headers = {
            "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/81.0.4044.138 Safari/537.36"}
        self.url_queue = Queue()
        self.html_queue = Queue()
        self.content_queue = Queue()

    def get_url_list(self):
        # return [self.url_temp.format(i) for i in range(1, 14)]
        for i in range(1, 14):
            self.url_queue.put(self.url_temp.format(i))

    def parse_url(self):
        while True:
            url = self.url_queue.get()
            response = requests.get(url, headers=self.headers)
            # return response.content.decode()
            self.html_queue.put(response.content.decode())
            self.url_queue.task_done()

    def get_content_list(self):
        while True:
            html_str = self.html_queue.get()
            html = etree.HTML(html_str)
            div_list = html.xpath(‘//div[@class="article block untagged mb15 typs_hot"]‘)
            content_list = list()
            for div in div_list:
                item = dict()
                item[‘content‘] = div.xpath(‘./a/div/span/text()‘)
                item[‘content‘] = [i.replace("\n", "") for i in item[‘content‘]]
                # item[‘author_gender‘] = div.xpath(‘.//div[contains(@class, "articleGender")]/@class‘)
                # item[‘author_gender‘] = item[‘author_gender‘][0].split()[-1].replace("Icon", "") if len(item[‘author_gender‘][0]) else None
                content_list.append(item)
            # return content_list
            self.content_queue.put(content_list)
            self.html_queue.task_done()

    def save_content_list(self):
        while True:
            content_list = self.content_queue.get()
            for i in content_list:
                print(i)
            self.content_queue.task_done()

    def run(self):
        thread_list = list()
        # 1.获取url_list
        t_url = threading.Thread(target=self.get_url_list)
        thread_list.append(t_url)
        # 2.遍历,发送请求,获取响应
        for i in range(5):
            t_parse = threading.Thread(target=self.parse_url)
            thread_list.append(t_parse)
            #     # 3.提取数据
        t_html = threading.Thread(target=self.get_content_list)
        thread_list.append(t_html)
        #     # 4.保存
        t_save = threading.Thread(target=self.save_content_list)
        thread_list.append(t_save)

        for t in thread_list:
            t.setDaemon(True)  # 把子线程设置守护线程,主线程结束后子线程跟着结束
            t.start()

        for q in [self.url_queue, self.html_queue, self.content_queue]:
            q.join()  # 让主线程阻塞,等待队列的任务完成后继续
        print(‘main thread done‘)


if __name__ == ‘__main__‘:
    qiubaibpider = QiubaiSpider()
    qiubaibpider.run()

1.Python通用Spider

标签:col   dom   提取   with   amp   sig   ast   tostring   多线程爬虫   

原文地址:https://www.cnblogs.com/L-dongf/p/12945141.html

上一篇:线程的生命周期

下一篇:centos安装nodejs


评论


亲,登录后才可以留言!