爬虫


urilib

get请求

import urllib.request
import json

res = urllib.request.urlopen("http://httpbin.org/get")
# 读取response的内容
text = res.read()
# http返回状态码
print(res.status, res.reason)
obj = json.loads(text)
print(obj)

for k, v in res.headers._headers:
    print(f"({k}:{v})")

 输出


自定义头信息

import urllib.request
import json

# 添加自定义的头信息
ua = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/81.0.4044.138 Safari/537.36"
req = urllib.request.Request("http://httpbin.org/user-agent")
req.add_header("User-Agent", ua)
res = urllib.request.urlopen(req)
resp = json.load(res)
print("user-agent", resp["user-agent"])


身份认证

import urllib.request
import json

auth_handler = urllib.request.HTTPBasicAuthHandler()
auth_handler.add_password(realm="httpbin auth", uri="/basic-auth/admin/123456",
                          user="admin", passwd="123456")
opener = urllib.request.build_opener(auth_handler)
urllib.request.install_opener(opener)
res = urllib.request.urlopen("http://httpbin.org")
print(res.read().decode("utf-8"))

带参数(get,post)


import urllib.request
import urllib.parse
import json

# get
params = urllib.parse.urlencode({"param": 1, "eggs": 2, "bacon": 2})
url = f"http://httpbin.org/get?{params}"
res = urllib.request.urlopen(url)
print(json.load(res))

# post
data = urllib.parse.urlencode({"name": "小明", "age": 20})
data = data.encode()
res = urllib.request.urlopen("http://httpbin.org/post", data)
print(json.load(res))


requests

get,post和带参数请求

import requests

# get请求
res = requests.get("http://httpbin.org/get")
print(res.status_code, res.reason)
print(res.text)

# 带参数的get请求
res = requests.get("http://httpbin.org/get", params={"a": 1, "b": "2"})
print(res.json())

# post请求
res = requests.post("http://httpbin.org/post", data={"a": 1})
print(res.json())

 结果


自定义headers和带cookie请求

# 自定义headers请求
ua = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/81.0.4044.138 Safari/537.36"
headers = {"User-Agent": ua}
res = requests.get("http://httpbin.org/headers", headers=headers)
print("自定义headers请求:\n", res.json())

# 带cookie的请求
cookies = dict(userid = "123456", token="xxxxxxxxxxxxxxxxx")
res = requests.get("http://httpbin.org/cookies", cookies=cookies)
print("带cookie的请求:\n", res.json())

 输出


身份认证

res = requests.get("http://httpbin.org/basic-auth/admin/123456", auth=("admin", 123456))
print("Basic-auth认证请求:\n", res.json())

 输出


主动抛出状态异常

bad_r = requests.get("http://httpbin.org/status/404")
print(bad_r.status_code)
bad_r.raise_for_status()

 输出


session使用(带cookie)

# 使用requests.Session()对象请求
s = requests.Session()
# session对象会保存服务器返回的set-cookies头信息里面的内容
s.get("http://httpbin.org/cookies/set/userid/123456789")
# 下一次请求会自动将本地所有的cookies信息自定添加到头信息里面
r = s.get("http://httpbin.org/cookies")
print("检查session中的cookies", r.json())

 输出


延迟

res = requests.get("http://httpbin.org/delay/4", timeout=5)
print(res.text)

 输出



bs4

简单使用

from bs4 import BeautifulSoup
import doc_html

soup = BeautifulSoup(doc_html.html_doc, "html.parser")

# input标签
print("-----input-----")
print(soup.input.attrs)
print(soup.input.attrs['style'])
print(soup.input.has_attr('type'))

# p标签
print("-----p-----")
print(soup.p)
# 获取第一个p标签下的所有的子结点
print(type(soup.p.children))
print(list(soup.p.children)[0])
print(list(soup.p.children)[0].text)


# 指定查找
print("-----指定查找-----")
print(soup.find_all("p"))
print(soup.find(type="text"))
print(soup.find(id="btt"))
# 找到所有满足条件的(集合)
print(soup.select(".first-2"))
print(soup.select("#btt"))
print(soup.select(".first-2 p"))

 输出(从input开始)

-----input-----
{'type': 'text', 'style': 'background-color: transparent; border:0px;'}

background-color: transparent; border:0px;

True


-----p-----
<p><span>这是个盒子模型,运用了HTML和CSS的知识创建,里面含有很多其他知识,比如对图像的理解,页面设计等。
            这些知识想要掌握好,需要经过很久的训练,对各种标签和属性都了解</span>
<span>第二个了</span>
</p>

<class 'list_iterator'>

<span>这是个盒子模型,运用了HTML和CSS的知识创建,里面含有很多其他知识,比如对图像的理解,页面设计等。
            这些知识想要掌握好,需要经过很久的训练,对各种标签和属性都了解</span>

这是个盒子模型,运用了HTML和CSS的知识创建,里面含有很多其他知识,比如对图像的理解,页面设计等。
            这些知识想要掌握好,需要经过很久的训练,对各种标签和属性都了解


-----指定查找-----
[<p><span>这是个盒子模型,运用了HTML和CSS的知识创建,里面含有很多其他知识,比如对图像的理解,页面设计等。
            这些知识想要掌握好,需要经过很久的训练,对各种标签和属性都了解</span>
<span>第二个了</span>
</p>, <p>这是一个用来判断是否是透明色的盒子</p>]

<input style="background-color: transparent; border:0px;" type="text"/>

<h2 id="btt">这是一个标题</h2>


[<div class="first-2">
<h2>这里可能用来有透明色</h2>
<p>这是一个用来判断是否是透明色的盒子</p>
</div>, <div class="first-2">
<button onclick="btn()">点击!</button>
<h2 id="btt">这是一个标题</h2>
</div>]

[<h2 id="btt">这是一个标题</h2>]

[<p>这是一个用来判断是否是透明色的盒子</p>]

使用lxml解析器

from bs4 import BeautifulSoup
from lxml import etree
import doc_html

soup = BeautifulSoup(doc_html.html_doc, "lxml")
print(soup.input)

lxml和xpath

xpath语法

表达式 描述
nodename 选取此节点的所有子节点。
/ 从根节点选取。
// 从匹配选择的当前节点选择文档中的节点,而不考虑它们的位置。
. 选取当前节点。
.. 选取当前节点的父节点。
@ 选取属性。
following-sibling::span[1] 附近的节点

lxml使用

from bs4 import BeautifulSoup
from lxml import etree
import doc_html

soup = BeautifulSoup(doc_html.html_doc, "lxml")
print(soup.input)

print("----------------------------------华丽的分割线--------------------------------------------")

selector = etree.HTML(doc_html.html_doc)
# 取出所有的链接
links = selector.xpath("//div[@class='first-2']/a/@href")
for link in links:
    print(link)

a_ = selector.xpath("//div[@class='first-2']/a")
print(a_)
text = a_[0].xpath("../h2/text()")
print(text)


# 第一个
print(selector.xpath("//select/option[1]/text()"))
# 到处第一个
print(selector.xpath("//select/option[last()]/text()"))
# 到处第二个
print(selector.xpath("//select/option[last()-1]/text()"))
# 前两个
print(selector.xpath("//select/option[position()<3]/text()"))
# 指定标签内部情况和样式名
print(selector.xpath("//table/tr[td>21]/td[@class='age info']/@class"))
# 多样式名
print(selector.xpath("//table/tr/td[contains(@class, 'age info') and contains(@id, 'age')]/text()"))

 输出

<input style="background-color: transparent; border:0px;" type="text"/>
----------------------------------华丽的分割线--------------------------------------------
http://baidu.com
https://blog.csdn.net/
[<Element a at 0x14c3fb8e480>, <Element a at 0x14c3fb8e500>]
['这里可能用来有透明色']
['信息学院']
['药学院']
['体育学院']
['信息学院', '护理学院']
['age info', 'age info', 'age info']
['21']

爬去下厨房的所有图片

requests+bs4

from urllib.parse import urlparse
import os
import requests
from bs4 import BeautifulSoup



ua = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/81.0.4044.138 Safari/537.36"
headers = {"User-Agent": ua}
r = requests.get('http://www.xiachufang.com', headers=headers)
soup = BeautifulSoup(r.text, "lxml")

img_list = []
for img in soup.select("img"):
    if img.has_attr("data-src"):
        img_list.append(img.attrs["data-src"])
    else:
        img_list.append(img.attrs["src"])


# 保存图片
img_dir = "E:\\知识学习\\2019版-千锋爬虫-源码+笔记+作业\\爬虫\\下厨房"
if not os.path.isdir(img_dir):
    os.mkdir(img_dir)

for img in img_list:
    if img.strip() != '':
        o = urlparse(img)
        filename = o.path[1:].split("@")[0]
        filepath = os.path.join(img_dir, filename)

        # 有可能有多重目录,不存在的话就创建
        if not os.path.exists(os.path.dirname(filepath)):
            print(os.path.exists(os.path.dirname(filepath)))
            os.mkdir(os.path.dirname(filepath))

        # 获取图片
        url = f"{o.scheme}://{o.netloc}/{filename}"
        resp = requests.get(url)

        # 保存图片
        with open(filepath, "wb") as file:
            for chunk in resp.iter_content(1024):
                file.write(chunk)
            file.close()

pycurl+re

import re
from pycurl import Curl
from urllib.parse import urlparse
from io import BytesIO
import os

buffer = BytesIO()
c = Curl()
c.setopt(c.URL, 'http://www.xiachufang.com')
c.setopt(c.WRITEDATA, buffer)
c.perform()
c.close()

body = buffer.getvalue()
text = body.decode("utf-8")

img_list = re.findall(r"src=\"(http://i2\.chuimg\.com/\w+\.jpg)", text)

# 保存图片
img_dir = "E:\\知识学习\\2019版-千锋爬虫-源码+笔记+作业\\爬虫\\下厨房"
if not os.path.isdir(img_dir):
    os.mkdir(img_dir)

for img in img_list:
    print(img)
    if img.strip() != '':
        o = urlparse(img)
        filename = o.path[1:]
        filepath = os.path.join(img_dir, filename)

        # 有可能有多重目录,不存在的话就创建
        if not os.path.exists(os.path.dirname(filepath)):
            print(os.path.exists(os.path.dirname(filepath)))
            os.mkdir(os.path.dirname(filepath))

        # 获取图片
        url = f"{o.scheme}://{o.netloc}/{filename}"

        # 保存图片
        with open(filepath, "wb") as file:
           c = Curl()
           c.setopt(c.URL, url)
           c.setopt(c.WRITEDATA, file)
           c.perform()
           c.close()

迁木网(多线程)

普通版本(借用队列)

import requests
from lxml import etree
import os
from queue import Queue
import threading
import time


start_url = "http://www.qianmu.org/ranking/1528.htm"
link_queue = Queue()
threads_num = 10
threads = []
download_pages = 0

def fetch(url):
    """链接请求"""
    resp = requests.get(url)
    if resp.status_code != 200:
        resp.raise_for_status()
    return resp.text.replace("\t", "")



def parse_university(link):
    """处理大学详情页面"""
    resp = fetch(link)
    selector = etree.HTML(resp)
    data = {}

    # 学校名
    data["name"] = selector.xpath("//div[@id='wikiContent']/h1/text()")[0]

    # 信息
    try:
        table = selector.xpath("//div[@class='infobox']//table")[0]
    except IndexError as e:
        print("无表格信息")
        return None

    keys = table.xpath(".//td[1]/p/text()")
    values = table.xpath(".//td[2]/p//text()")

    print(len(keys), len(values))

    if len(keys) > len(values):
        return None

    data.update(zip(keys, values))

    return data

def download():
    while True:
        # 阻塞,直到从队列里获取一条消息
        link = link_queue.get()
        if link is None:
            break

        # 提取详情页的信息
        data = parse_university(link)
        global download_pages
        download_pages += 1
        if data:
            print(data)
        link_queue.task_done()
        print(f'remaining queue is {link_queue.qsize()}')



if __name__ == "__main__":
    # 开始时间
    start_time = time.time()

    # 请求入口页面
    ua = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/81.0.4044.138 Safari/537.36"
    headers = {"User-Agent": ua}
    resp = requests.get(start_url, headers=headers)
    selector = etree.HTML(resp.text)

    # 提取列表页面的链接
    links = selector.xpath("//div[@class='rankItem']//td[2]/a/@href")
    for link in links:
        if not link.startswith("http://www.qianmu.org"):
            link = "http://www.qianmu.org/" + link
        link_queue.put(link)

    # 多线程执行
    for i in range(threads_num):
        t = threading.Thread(target=download)
        t.start()
        threads.append(t)
    # 阻塞队列,直到队列被清空
    link_queue.join()

    for i in range(threads_num):
        link_queue.put(None)

    # 退出线程
    for i in threads:
        t.join()

    finished_time = time.time()
    cost_seconds = finished_time-start_time
    print(f"download finished!!!耗时:{cost_seconds}s,抓取界面:{download_pages}个")

升级版本(借用redis)

import requests
from lxml import etree
import os
import signal
from queue import Queue
import threading
import time
import redis

start_url = "http://www.qianmu.org/ranking/1528.htm"
link_queue = Queue()
threads_num = 10
threads = []
thread_on = True
download_pages = 0
my_redis = redis.Redis(host="host", password="password")


def fetch(url):
    """链接请求"""
    resp = requests.get(url)
    if resp.status_code != 200:
        resp.raise_for_status()
    return resp.text.replace("\t", "")


def parse_university(link):
    """处理大学详情页面"""
    resp = fetch(link)
    selector = etree.HTML(resp)
    data = {}

    # 学校名
    data["name"] = selector.xpath("//div[@id='wikiContent']/h1/text()")[0]

    # 信息
    try:
        table = selector.xpath("//div[@class='infobox']//table")[0]
    except IndexError as e:
        print("无表格信息")
        return None

    keys = table.xpath(".//td[1]/p/text()")
    values = table.xpath(".//td[2]/p//text()")

    print(len(keys), len(values))

    if len(keys) > len(values):
        return None

    data.update(zip(keys, values))

    return data


def download(i):
    while thread_on:
        # 阻塞,直到从队列里获取一条消息
        link = my_redis.lpop("qianmu.queue")
        if link:
            # 提取详情页的信息
            data = parse_university(link)
            global download_pages
            download_pages += 1
            if data:
                print(data)
            print(f'remaining queue is {my_redis.llen("qianmu.queue")}')
    print(f"Thread-{i} exit now")


def signal_handler(signum, frame):
    print("received Ctrl+C, wait for exit gracefully")
    global thread_on
    thread_on = False


def exit_handler(i):
    global thread_on
    while thread_on:
        if input("") == "exit":
            thread_on = False
    print(f"Thread-{i} exit now")


if __name__ == "__main__":
    # 开始时间
    start_time = time.time()

    # 请求入口页面
    ua = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/81.0.4044.138 Safari/537.36"
    headers = {"User-Agent": ua}
    resp = requests.get(start_url, headers=headers)
    selector = etree.HTML(resp.text)

    # 提取列表页面的链接
    links = selector.xpath("//div[@class='rankItem']//td[2]/a/@href")
    for link in links:
        if not link.startswith("http://www.qianmu.org"):
            link = "http://www.qianmu.org/" + link
        if my_redis.sadd("qianmu.seen", link):
            my_redis.rpush("qianmu.queue", link)

    # 多线程执行
    for i in range(threads_num):
        t = threading.Thread(target=download, args=(i + 1,))
        t.start()
        threads.append(t)
    # win系统下没用
    signal.signal(signal.SIGINT, signal_handler)
    # 关闭
    t = threading.Thread(target=exit_handler, args=(threads_num + 1,))
    t.start()
    threads.append(t)

    # 阻塞队列,直到队列被清空
    link_queue.join()

    # 退出线程
    for i in threads:
        t.join()

    finished_time = time.time()
    cost_seconds = finished_time - start_time
    print(f"download finished!!!耗时:{cost_seconds}s,抓取界面:{download_pages}个")

这里注意一下,signal在win下没用,我就多加了一个线程用于监听输入的”exit”,当存在输入时,退出程序。


scrapy的安装和简单实用

安装

  :point_right: https://blog.csdn.net/qq_44766883/article/details/107790504

基本使用

import scrapy


class QuoteSpider(scrapy.Spider):
    name = "quote"
    start_urls = ["http://quotes.toscrape.com/"]

    def parse(self, response):
        # quotes = response.css("div.quote")
        quotes = response.xpath("//div[@class='quote']")
        for quote in quotes:
            yield {
                "text": quote.css("span.text::text").extract_first(),
                "author": quote.xpath("./span/small/text()").extract_first(),
            }
        next_page = response.xpath("//li[@class='next']/a/@href").extract_first()
        if next_page:
            yield response.follow(next_page, self.parse)

 运行命令

  • 控制台输出

    scrapy runspider quotes_spider.py
  • 保存到指定文件

    scrapy runspider scrapy_learn/quotes_spider.py -o ./scrapy_learn/quotes.json
  • 指定文件类型

    scrapy runspider scrapy_learn/quotes_spider.py -o ./scrapy_learn/quotes.csv -t csv

常用命令

  • 创建一个项目

    scrapy startproject qianmu
  • 初始化一个爬虫文件

    # scrapy genspider [爬虫名字] [目标网站域名]
    scrapy genspider qianmu_new qianmu.iguye.com
  • 运行爬虫

    # 运行名为qianmu_new的爬虫
    scrapy crawl qianmu_new
    scrapy crawl qianmu_new -o qianmu_new.json
    scrapy crawl qianmu_new -o qianmu_new.csv -t csv
    
    # 单独运行爬虫文件
    scrapy runspider quotes_spider.py
    scrapy runspider scrapy_learn/quotes_spider.py -o ./scrapy_learn/quotes.json
    scrapy runspider scrapy_learn/quotes_spider.py -o ./scrapy_learn/quotes.csv -t csv
  • 创建以下文件,便于直接运行


调试爬虫

# 进入到scrapy控制台,使用的是项目的环境
scrapy shell
# 带一个URL参数,将会自动请求这个url,并在请求成功后进入控制台
scrapy shell http://www.qianmu.org/ranking/1528.html

# 调用parse方法
result = spider.parse(response)
# result是一个生成器,没什么疑惑好吧
type(result)<generator object QianmuNewSpider.parse at 0x0000025096AEF200>

# one其实就是一个Request对象
one = next(result)
one:<GET http://www.qianmu.org/%E9%BA%BB%E7%9C%81%E7%90%86%E5%B7%A5%E5%AD%A6%E9%99%A2>
type(one)<class 'scrapy.http.request.Request'>

# callback其实就是yield response.follow(link, self.parse_university)中的 parse_university
one.callback:<bound method QianmuNewSpider.parse_university of <QianmuNewSpider 'qianmu_new' at 0x25096aa3640>>

# 继续请求
fetch(one)  # 输出:2020-08-04 20:54:46 [scrapy.core.engine] DEBUG: Crawled (200) <GET http://www.qianmu.org/%E9%BA%BB%E7%9C%81%E7%90%86%E5%B7%A5%E5%AD%A6%E9%99%A2> (referer: None) ['cached']
data = next(response)  # 输出了:18 26
data  # 输出一个请求抓取的数据

# 可以进行循环爬取
for req in result:
...   fetch(req)

进入到控制台以后,可以使用以下函数和对象

A B
fetch 请求url或者Requesrt对象,注意:请求成功以后会自动将当前作用域内的request和responsne对象重新赋值
view 用浏览器打开response对象内的网页
shelp 打印帮助信息
spider 相应的Spider类的实例
settings 保存所有配置信息的Settings对象
crawler 当前Crawler对象
scrapy scrapy模块
# 用项目配置下载网页,然后用浏览器打开网页
scrapy view url
# 用项目配置下载网页,然后输出至控制台
scrapy fetch url



迁木网(scrapy)

主要代码

import scrapy

from items import UniversityItem


class QianmuNewSpider(scrapy.Spider):
    name = 'qianmu_new'
    # 允许爬的域名内的url,比如qianmu.org,那么www.qianmu,org,mall.qianmu.org都能爬
    allowed_domains = ['qianmu.org']
    # 爬虫的入口地址,可以多些几个
    start_urls = ['http://www.qianmu.org/ranking/1528.html']

    # 当框架请求start_urls内的链接成功以后,就会调用该方法
    def parse(self, response):
        # 解析链接,并提取,extract返回的是一个列表,extract_first返回的是列表中的第一个
        links = response.xpath("//div[@class='rankItem']//td[2]/a/@href").extract()
        for link in links:
            if not link.startswith("http://www.qianmu.org"):
                link = "http://www.qianmu.org/" + link
            # 让框架继续跟着这个链接,也就是说会再次发起请求
            # 请求成功以后会调用指定的callback函数
            yield response.follow(link, self.parse_university)


    def parse_university(self, response):
        """处理大学详情页面"""
        response = response.replace(body=response.text.replace("\t", "").replace("\r\n", ""))
        item = UniversityItem()
        data = {}

        # 学校名
        item["name"] = response.xpath("//div[@id='wikiContent']/h1/text()").extract_first()

        # 信息

        table = response.xpath("//div[@id='wikiContent']/div[@class='infobox']/table")

        if table:
            table = table[0]
            keys = table.xpath(".//td[1]/p/text()").extract()
            cols = table.xpath('.//td[2]')
            # values = table.xpath(".//td[2]/p//text()").extract_first()
            values = [' '.join(col.xpath('.//text()').extract_first()) for col in cols]

            print(len(keys), len(values))

            if len(keys) == len(values):
                data.update(zip(keys, values))
        print(data)
        item["rank"] = data.get("排名")
        item["country"] = data.get("国家")
        item["state"] = data.get("州省")
        item["city"] = data.get("城市")
        item["undergraduate_num"] = data.get("本科生人数")
        item["postgraduate_num"] = data.get("研究生人数")
        item["website"] = data.get("网址")
        yield item

items

import scrapy


class UniversityItem(scrapy.Item):
    # define the fields for your item here like:
    # name = scrapy.Field()
    name = scrapy.Field()
    rank = scrapy.Field()
    country = scrapy.Field()
    state = scrapy.Field()
    city = scrapy.Field()
    undergraduate_num = scrapy.Field()
    postgraduate_num = scrapy.Field()
    website = scrapy.Field()

pipelines

import pymysql
import redis
from scrapy.exceptions import DropItem

# 保存在redis中
class RedisPipeline:
    # 开始调用一次
    def open_spider(self, spider):
        self.redis = redis.Redis(host="host", password="password")

    # 关闭调用一次
    def close_spider(self, spider):
        self.redis.close()

    # 每产生一个调用一次
    def process_item(self, item, spider):
       if self.redis.sadd(spider.name, item['name']):
           return item
       raise  DropItem



# 保存在mysql中
class MysqlPipeline:
    # 开始调用一次
    def open_spider(self, spider):
        self.conn = pymysql.connect(
            host="127.0.0.1",
            port=3306,
            db="spider",
            user="jiang",
            password="jiang",
            charset="utf8"
        )
        self.cur = self.conn.cursor()

    # 关闭调用一次
    def close_spider(self, spider):
        self.cur.close()
        slice.conn.close()

    # 每产生一个调用一次
    def process_item(self, item, spider):
        # keys = item.keys()
        # values = list(item.values)   # 是一个元组-->集合
        keys, values = zip(*item.items())
        sql = "insert into universities({0}) values({1})".format(
            ','.join(keys),
            ','.join(['%s']*len(keys))
        )
        self.cur.execute(sql, values)
        self.conn.commit()
        # 输出语句
        print(self.cur._last_executed)
        return item

配置settings

ITEM_PIPELINES = {
   'qianmu.pipelines.MysqlPipeline': 301,
   'qianmu.pipelines.RedisPipeline': 300,
}

entrypoint

 程序入口

from scrapy import cmdline

cmdline.execute(["scrapy", "crawl", "qianmu_new"])

数据库表

 create table  `universities`(
     `name` varchar(256) NOT NULL COMMENT '学校名称',
     `rank` varchar(32) DEFAULT NULLCOMMENT '学校排名',
     `country` varchar(128) DEFAULT NULL COMMENT '国家',
     `state` varchar(128) DEFAULT NULL COMMENT '州省',
      `city` varchar(128) DEFAULT NULL COMMENT '城市',
      `undergraduate_num` varchar(128) DEFAULT NULL COMMENT '本科生人数',
      `postgraduate_num` varchar(128) DEFAULT NULL COMMENT '研究生人数',
     `website` varchar(128) DEFAULT NULL COMMENT '网站地址',
     primary key(`name`)
 )ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 comment="大学信息表";

一张图

这张图一定要熟记呀!!!

The data flow in Scrapy is controlled by the execution engine, and goes like this:

  1. The Engine gets the initial Requests to crawl from the Spider.
  2. The Engine schedules the Requests in the Scheduler and asks for the next Requests to crawl.
  3. The Scheduler returns the next Requests to the Engine.
  4. The Engine sends the Requests to the Downloader, passing through the Downloader Middlewares (see process_request()).
  5. Once the page finishes downloading the Downloader generates a Response (with that page) and sends it to the Engine, passing through the Downloader Middlewares (see process_response()).
  6. The Engine receives the Response from the Downloader and sends it to the Spider for processing, passing through the Spider Middleware (see process_spider_input()).
  7. The Spider processes the Response and returns scraped items and new Requests (to follow) to the Engine, passing through the Spider Middleware (see process_spider_output()).
  8. The Engine sends processed items to Item Pipelines, then send processed Requests to the Scheduler and asks for possible next Requests to crawl.
  9. The process repeats (from step 1) until there are no more requests from the Scheduler.



中间件

process_request

在request对象传往downloader的过程中调用。当返回不同类型的值的时候,行为也不一样:

返回值 行为
None 一切正常,继续执行其他的中间件链
Response 停止调用其他process_request和process_exception函数,也不再继续下载该请求,然后走调用process_response的流程
Request 不再继续调用其他process_request函数,交由调度器重新安排下载。
IgnoreRequest process_exception函数会被调用,如果没有此方法,则request.errback会被调用,如果errback也没有,则此异常会被忽略,甚至连日志都没有。

process_response

在将下载结果返回给engine过程中被调用

返回值 行为
Response 续续调用其他中间件的process_response
Request 不再继续调用其他process_request函数,交由调度器重新安排下载。
IgnoreRequest 则request.errback会被调用,如果errback也没有,则此异常会被忽略,甚至连日志都没有。

process_exception

在下载过程中出现异常,或者在process_request中抛出IgnoreRequest异常的时候调用。

返回值 行为
Response 开始中间件链的process_response处理流程
Request 不再继续调用其他process_request函数,交由调度器重新安排下载。
None 继续调用其他中间件里的process_exception函数

from_crawler(cls, crawler)

如果存在该函数,则调用该函数创建中间件的实例。如果要写这个函数,一定要返回一个中间件的对象。


循环动态代理

settings

 这是一个代理IP集合

PROXIES = [
   # "http://54.243.170.209:8080",
   "http://165.225.210.96:10605"
]

# 749是有讲究的,因为系统的默认代理中间件是750,我们自定义的要运行在默认的前面,而系统会先运行数字小的中间件
# 而为了防止影响其他的中间件,所以紧挨着750就可以了
DOWNLOADER_MIDDLEWARES = {
   'qianmu.middlewares.RandomProxyMiddleware': 749,
}


middlewares

 定义一个类创建一个中间件实例

class RandomProxyMiddleware(object):

    def __init__(self, settings):
        # 初始化变量和配置
        self.proxies = settings.getlist("PROXIES")
        self.state = defaultdict(int)
        self.max_failed = 3

    @classmethod
    def from_crawler(cls, crawler):
        # 1.创建中间件对象
        if not crawler.settings.getbool("HTTPPROXY_ENABLED"):
            raise NotConfigured
        return cls(crawler.settings)

    def process_request(self, request, spider):
        # 3.为每个request对象分配一个随机的ip代理
        if self.proxies and not request.meta.get("proxy"):
            request.meta["proxy"] = random.choice(self.proxies)

    def process_response(self, request, response, spider):
        # 4.请求成功,调用process_response
        cur_proxy = request.meta.get("proxy")
        # 判断是否被对方封禁
        if response.status in (401, 403):
            print(f"{cur_proxy} got wrong code {self.state[cur_proxy]} times")
            # 给相应的IP失败次数+1
            self.state[cur_proxy] += 1
        # 当某个IP的失败次数累计到一定数量
        if self.state[cur_proxy] >= self.max_failed:
            print("got wrong http code {%s} when use %s" % (response.status, request.get("proxy")))
            # 可以认为该IP已经被对方封禁了,从代理池中将该IP删除
            self.remove_proxy(cur_proxy)
            del request.meta["proxy"]
            # 重新请求重新安排调度下载
            return request
        return response

    def process_exception(self, request, exception, spider):
        # 4.请求失败,调用process_exception
        cur_proxy = request.meta.get("proxy")

        # 如果本次请求使用了代理,并且网络请求报错,认为该IP出现问题了
        if cur_proxy and isinstance(exception, (ConnectionRefusedError, TimeoutError)):
            print(f"error occur where use proxy {exception} {cur_proxy}")
            self.remove_proxy(cur_proxy)
            del request.meta["proxy"]
            return request


    def remove_proxy(self, proxy):
        """在代理IP列表中删除指定代理"""
        if proxy in self.proxies:
            self.proxies.remove(proxy)
            print(f"remove {proxy} from proxy list")

内置中间件

在这里插入图片描述

  1. scrapy.downloadermiddlewares.robotstxt.RobotsTxtMiddleware

    请求robots.txt文件,并解析其中的规则。

  2. scrapy.downloadermiddlewares.httpauth.HttpAuthMiddleware

    执行带Basic-auth验证的请求

  3. scrapy.downloadermiddlewares.downloadtimeout.DownloadTimeoutMiddleware

    下载请求超时最大时长

  4. scrapy.downloadermiddlewares.defaultheaders.DefaultHeadersMiddleware

    设置默认的请求头信息

  5. scrapy.downloadermiddlewares.useragent.UserAgentMiddleware

    设置请求头信息里的User-Agent

  6. scrapy.downloadermiddlewares.retry.RetryMiddleware

    如果下载失败,是否重试,重试几次

  7. scrapy.downloadermiddlewares.redirect.MetaRefreshMiddleware

    实现Meta标签重定向

  8. scrapy.downloadermiddlewares.httpcompression.HttpCompressionMiddleware

    实现压缩内容的解析(比如gzip)

  9. scrapy.downloadermiddlewares.redirect.RedirectMiddleware

    实现30x的HTTP code的重定向

  10. scrapy.downloadermiddlewares.cookies.CookiesMiddleware

    实现对cookies的设置管理

  11. scrapy.downloadermiddlewares.httpproxy.HttpProxyMiddleware

    实现IP代理

  12. scrapy.downloadermiddlewares.stats.DownloaderStats

    下载信息的统计

  13. scrapy.downloadermiddlewares.httpcache.HttpCacheMiddleware

    下载结果的缓存


扩展

介绍

 扩展框架提供一个机制,使得你能将自定义功能绑定到Scrapy。

 扩展只是正常的类,它们在Scrapy启动时被实例化、初始化。

扩展一般分为三种状态:可用的(Available)、开启的(enabled)和禁用的(disabled)。一些扩展经常需要依赖一些特别的配置,比如HTTP Cache扩展是可用的但默认是禁用的,除非设置了HTTPCACHE_ENABLED配置项。通过将其顺序设置为None,即可禁用。

telnet

 在cmd中

telnet 127.0.0.1 6023

 此时会让你输入用户名和密码,用户名就是scrapy,密码在我们的日志中输出了,如下

 此时我们可以通过黑窗口查看爬虫的运行情况

# 查看方法和属性
dir()

from pprint import pprint 
# 查看爬取情况
pprint(stats.get_stats)   

当然还有很多的信息可以查看


日志插件

创建文件


代码

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author     : codekiller
# @Time       : 2020/8/5 23:36
# @Email      : jcl1345414527@163.com
# @File       : extensions.py
# @Description: 日志扩展

import logging
from collections import defaultdict
import datetime
from scrapy import signals
from scrapy.exceptions import NotConfigured

logger = logging.getLogger(__name__)


class SpiderOpenCloseLogging:

    def __init__(self, item_count):
        self.item_count = item_count
        self.items_scraped = 0
        self.items_dropped = 0

        self.stats = defaultdict(int)
        self.error_stats = defaultdict(int)

    @classmethod
    def from_crawler(cls, crawler):
        # first check if the extension should be enabled and raise
        # NotConfigured otherwise
        if not crawler.settings.getbool('MYEXT_ENABLED'):
            raise NotConfigured

        # get the number of items from settings
        item_count = crawler.settings.getint('MYEXT_ITEMCOUNT', 1000)

        # instantiate the extension object
        ext = cls(item_count)

        # connect the extension object to signals
        # 将对象中的指定方法绑定一个监听事件
        crawler.signals.connect(ext.spider_opened, signal=signals.spider_opened)
        crawler.signals.connect(ext.spider_closed, signal=signals.spider_closed)
        crawler.signals.connect(ext.item_scraped, signal=signals.item_scraped)
        crawler.signals.connect(ext.item_dropped, signal=signals.item_dropped)
        crawler.signals.connect(ext.response_received, signal=signals.response_received)

        # return the extension object
        return ext

    # 当spider开始爬取时发送该信号。该信号一般用来分配spider的资源,不过其也能做任何事
    def spider_opened(self, spider):
        print("====" * 20, "opened spider %s" % spider.name)

    # 当某个spider被关闭时,该信号被发送。该信号可以用来释放每个spider在 spider_opened 时占用的资源
    def spider_closed(self, spider):
        print("====" * 20, "closed spider %s" % spider.name)

    # 当item被爬取,并通过所有 Item Pipeline 后(没有被丢弃(dropped),发送该信号
    def item_scraped(self, item, spider):
        self.items_scraped += 1
        if self.items_scraped % self.item_count == 0:
            print("====" * 20, "scraped %d items" % self.items_scraped)

    # 当item通过 Item Pipeline ,有些pipeline抛出 DropItem 异常,丢弃item时,该信号被发送
    def item_dropped(self, item, spider, response, exception):
        self.items_dropped += 1
        if self.items_dropped % self.item_count == 0:
            print("====" * 20, "dropped %d items" % self.items_dropped)

    # 当引擎从downloader获取到一个新的 Response 时发送该信号
    def response_received(self, response, request, spider):
        now = datetime.datetime.now().strftime("%Y%m%d%H%M")
        self.stats[now] += 1
        # 记录响应异常的个数
        if response.status in [401, 403, 404, 500, 501, 502]:
            self.error_stats[now] += 1
        # 当响应异常的比例大于0.2输出日志信息
        if float(self.error_stats[now]) / self.stats[now] > 0.2:
            logger.warning("received %s response, "
                           "and %s of item is none 200 in %s" % \
                           (self.stats[now], self.error_stats[now], now))

settings

 增加一个配置,看我们的代码,有一个判断逻辑if not crawler.settings.getbool(‘MYEXT_ENABLED’):

# 使用自定义的插件
MYEXT_ENABLED =True

 在EXTENSIONS数组中增加我们的自定义扩展

EXTENSIONS = {
   #'scrapy.extensions.telnet.TelnetConsole': None,
   'qianmu.extensions.SpiderOpenCloseLogging': 1
}

为了禁用一个默认开启的扩展(比如,包含在 EXTENSIONS_BASE 中的扩展), 需要将其顺序(order)设置为 None


内置扩展

 扩展在扩展类被实例化时加载和激活,实例化代码必须在类的构造函数(init)中执行。

  1. ‘scrapy.extensions.corestats.CoreStats’:0

    • 名称:核心统计扩展

    • 说明:如果统计收集器(stats collection)启用了,该扩展开启核心统计收集(参考 数据收集(Stats Collection))

  2. ‘scrapy.telnet.TelnetConsole’:0

    • 名称:Telnet控制台扩展
    • 说明:提供了一个telnet控制台,telnet控制台通过TELNETCONSOLE_ENABLED配置项开启,服务器会监听TELNETCONSOLE_PORT指定的端口
  3. ‘scrapy.extensions.memusage.MemoryUsage’:0

    • 名称:内存使用扩展
    • 说明:监听Scrapy进程内存使用量,如果使用内存量超过某个指定值,发送提醒邮件,如果超过某个指定值,关闭spider
  4. ‘scrapy.extensions.memdebug.MemoryDebugger’:0

    • 名称:内存调试扩展
    • 说明:该扩展用于调试内存使用量,开启该扩展,需要打开MEMDEBUG_ENABLED配置项
  5. ‘scrapy.extensions.closespider.CloseSpider’:0

    • 名称:当某些状况发生,spider会自动关闭,用来为状况指定关闭方式
  6. ‘scrapy.extensions.feedexport.FeedExporter’:0

  7. ‘scrapy.extensions.logstats.LogStats’:0

    • 名称:记录统计扩展
    • 说明:记录基本的统计信息,比如爬取的页面和条目(items)
  8. ‘scrapy.extensions.spiderstate.SpiderState’:0

  9. ‘scrapy.extensions.throttle.AutoThrottle’:0

  10. ‘scrapy.extensions.statsmailer.StatsMailer’:0

    • 名称:StatsMailer扩展
    • 说明:这个简单的扩展可用来在一个域名爬取完毕时发送提醒邮件,包含Scrapy收集的统计信息。邮件会发送给通过STATSMAILER_RCPTS指定的所有接收人

新片场

创建项目

scrapy startproject xpc

创建requirements.txt

 在xpc目录下创建requirements.txt文件

scrapy
redis
requests
pymysql

 创建完成后,输入以下导入模块

pip install -r requirement.txt

初始化爬虫

scrapy genspider discovery xinpianchang.com

爬取逻辑

import scrapy
from scrapy import Request
import json
import random
import re
from items import PostItem, CommentItem, ComposerItem, CopyrightItem
from scrapy_redis.spiders import RedisSpider



def my_strip(info):
    if info:
        return info.strip()
    return ""


cookies = {
    'Authorization': 'A26F51084B88500BF4B885427B4B8858B394B885B7E7169365C9'
}


def gen_session_id():
    return "".join(random.sample([chr(i) for i in range(97, 97 + 26)], 26))


def convert_int(s):
    if type(s) is str:
        return int(s.replace(",", ""))
    return 0


class DiscoverySpider(scrapy.Spider):
    name = 'discovery'
    allowed_domains = ['xinpianchang.com', 'openapi-vtom.vmovier.com', 'app.xinpianchang.com']
    start_urls = ['https://www.xinpianchang.com/channel/index/type-/sort-like/duration_type-0/resolution_type-/page-1']
    # 翻页计数器
    page_count = 0

    # def start_requests(self):
    #     for url in self.start_urls:
    #         c = cookies.copy()
    #         c.update(PHPSESSID=gen_session_id(),
    #                  channel_page="apU%3D")  # 第20页
    #         request = Request(url, cookies=c, dont_filter=True)
    #         yield request

    def parse(self, response):
        self.page_count += 1
        if self.page_count >= 50:
            self.page_count = 0
            cookies.update(PHPSESSID=gen_session_id())

        post_list = response.xpath("//ul['video-list']/li")
        url = "https://www.xinpianchang.com/a%s?from=ArticleList"

        for post in post_list:
            pid = post.xpath("./@data-articleid").get()
            request = response.follow(url % pid, self.parse_post)
            request.meta["pid"] = pid
            request.meta["thumbnail"] = post.xpath("./a/img/@_src").get()
            yield request
        pages = response.xpath("//div[@class='page']/a/@href").extract()
        for page in pages[1:]:
            next_page = f"https://www.xinpianchang.com{page}"
            yield response.follow(next_page, self.parse, cookies=cookies)

    # 解析每个视频的信息
    def parse_post(self, response):
        pid = response.meta["pid"]

        post = PostItem()
        post["pid"] = pid
        post["thumbnail"] = response.meta["thumbnail"]
        post["title"] = response.xpath("//div[@class='title-wrap']/h3/text()").get()
        cates = response.xpath("//span[contains(@class, 'cate')]//text()").extract()
        post["category"] = "".join([cate.strip() for cate in cates])
        post["created_at"] = response.xpath("//span[contains(@class, 'update-time')]/i/text()").get()
        post["play_counts"] = response.xpath("//i[contains(@class, 'play-counts')]/@data-curplaycounts").get()
        post["like_counts"] = response.xpath("//span[contains(@class, 'like-counts')]/@data-counts").get()
        tags = response.xpath("//div[contains(@class, 'tag-wrapper')]/a/text()").extract()
        post["tag"] = "-".join([tag.strip() for tag in tags])
        desc = response.xpath("//p[contains(@class, 'desc')]/text()").get()
        post["description"] = my_strip(desc)

        # 视频
        vid = response.xpath(
            "//div[@class='filmplay-data-btn fs_12']//a[@class='collection-star hollow-star']/@data-vid").get()
        video_url = f"https://openapi-vtom.vmovier.com/v3/video/{vid}?expand=resource&usage=xpc_web&appKey=61a2f329348b3bf77"
        request = Request(video_url, callback=self.parse_video)
        request.meta["post"] = post
        yield request

        # 评论
        comment_url = f"https://app.xinpianchang.com/comments?resource_id={pid}&type=article&page=1&per_page=24"
        request = Request(comment_url, callback=self.parse_comment)
        request.meta["pid"] = pid
        yield request

        # 创作者
        creator_url = response.xpath("//ul[@class='creator-list']/li/a/@href").extract()
        for urn creator_url:
            if url.startswith("/article"):
                continue

            cid = url[2:url.index("?")]
            url = f"https://www.xinpianchang.com{url}"
            request = response.follow(url, callback=self.parse_composer)
            request.meta["dont_merge_cookies"] = True
            request.meta["cid"] = cid
            yield request

    # 解析视频信息请求
    def parse_video(self, response):
        post = response.meta["post"]
        result = json.loads(response.text)
        post["video"] = result["data"]["resource"]["progressive"][0]["url"]
        post["preview"] = result["data"]["video"]["cover"]
        yield post

    # 解析评论信息请求
    def parse_comment(self, response):
        result = json.loads(response.text)

        for c in result["data"]["list"]:
            comment = CommentItem()
            comment["uname"] = c["userInfo"]["username"]
            comment["avatar"] = c["userInfo"]["avatar"]
            comment["uid"] = c["userInfo"]["id"]
            comment["comment_id"] = c["id"]
            comment["pid"] = c["resource_id"]
            comment["content"] = c["content"]
            comment["created_at"] = c["addtime"]
            comment["like_counts"] = c["count_approve"]
            if c["referid"]:
                comment["referid"] = c["referid"]
            yield comment

        next_page = result["data"]["next_page_url"]
        if next_page:
            next_page = f"https://app.xinpianchang.com{next_page}"
            yield response.follow(next_page, self.parse_comment)

    # 解析创作者请求
    def parse_composer(self, response):
        banner, = re.findall("background-image:url\((.+?)\)",
                             response.xpath("//div[@class='banner-wrap']/@style").get())
        composer = ComposerItem()
        composer["banner"] = banner
        composer["cid"] = response.meta["cid"]
        composer["name"] = my_strip(response.xpath("//p[contains(@class,'creator-name')]/text()").get())
        composer["intro"] = my_strip(response.xpath("//p[contains(@class,'creator-desc')]/text()").get())
        composer["like_counts"] = convert_int(response.xpath("//span[contains(@class,'like-counts')]/text()").get())
        composer["fans_counts"] = convert_int(response.xpath("//span[contains(@class,'fans-counts')]/text()").get())
        composer["follow_counts"] = convert_int(
            response.xpath("//span[@class='follow-wrap']/span[contains(@class,'fw_600')]/text()").get())
        location = response.xpath("//span[contains(@class, 'icon-location')]/following-sibling::span[1]/text()").get()
        if location:
            composer["location"] = location.replace("\xa0", "")
        else:
            composer["location"] = ""
        composer["career"] = response.xpath(
            "//span[contains(@class, 'icon-career')]/following-sibling::span[1]/text()").get()
        yield composer

item

import scrapy
from scrapy import Field


class PostItem(scrapy.Item):
    """保存视频信息的item"""
    table_name = 'posts'
    pid = Field()
    title = Field()
    thumbnail = Field()
    preview = Field()
    video = Field()
    video_format = Field()
    duration = Field()
    category = Field()
    created_at = Field()
    play_counts = Field()
    like_counts = Field()
    description = Field()
    tag = Field()


class CommentItem(scrapy.Item):
    table_name = 'comments'
    comment_id = Field()
    pid = Field()
    uid = Field()
    avatar = Field()
    uname = Field()
    created_at = Field()
    content = Field()
    like_counts = Field()
    referid = Field()


class ComposerItem(scrapy.Item):
    table_name = 'composers'
    cid = Field()
    banner = Field()
    avatar = Field()
    verified = Field()
    name = Field()
    intro = Field()
    like_counts = Field()
    fans_counts = Field()
    follow_counts = Field()
    location = Field()
    career = Field()


class CopyrightItem(scrapy.Item):
    table_name = 'copyrights'
    pcid = Field()
    pid = Field()
    cid = Field()
    roles = Field()

pipelines

 存储在mysql中

import pymysql

# 保存在mysql中
class MysqlPipeline:
    # 开始调用一次
    def open_spider(self, spider):
        self.conn = pymysql.connect(
            host="127.0.0.1",
            port=3306,
            db="spider",
            user="jiang",
            password="jiang",
            charset="utf8"
        )
        self.cur = self.conn.cursor()

    # 关闭调用一次
    def close_spider(self, spider):
        self.cur.close()
        slice.conn.close()

    # 每产生一个调用一次
    def process_item(self, item, spider):
        # keys = item.keys()
        # values = list(item.values)   # 是一个元组-->集合
        keys, values = zip(*item.items())
        sql = "insert into {}({}) values({}) ON DUPLICATE KEY UPDATE {}".format(
            item.table_name,
            ','.join(keys),
            ','.join(['%s']*len(keys)),
            ",".join(["`{}`=%s".format(key) for key in keys])
        )
        self.cur.execute(sql, values*2)
        self.conn.commit()
        # 输出语句
        print(self.cur._last_executed)
        return item

 settings

ITEM_PIPELINES = {
   'xpc.pipelines.MysqlPipeline': 300,
}

middleware

 动态ip代理

from collections import defaultdict
from scrapy import signals
from scrapy.exceptions import NotConfigured
import random
import redis
from twisted.internet.error import ConnectionRefusedError, TimeoutError

class RandomProxyMiddleware(object):

    def __init__(self, settings):
        # 初始化变量和配置
        self.r = redis.Redis(host="host", password="password")
        self.proxy_key = settings.get("PROXY_REDIS_KEY")
        self.proxy_stats_key = self.proxy_key+"_stats"
        self.state = defaultdict(int)
        self.max_failed = 3

    @property
    def proxies(self):
        proxies_b = self.r.lrange(self.proxy_key, 0, -1)
        proxies = []
        for proxy_b in proxies_b:
            proxies.append(bytes.decode(proxy_b))
        print("proxy是:", proxies)
        return proxies


    @classmethod
    def from_crawler(cls, crawler):
        # 1.创建中间件对象
        if not crawler.settings.getbool("HTTPPROXY_ENABLED"):
            raise NotConfigured
        return cls(crawler.settings)

    def process_request(self, request, spider):
        # 3.为每个request对象分配一个随机的ip代理
        if self.proxies and not request.meta.get("proxy"):
            request.meta["proxy"] = random.choice(self.proxies)

    def process_response(self, request, response, spider):
        # 4.请求成功,调用process_response
        cur_proxy = request.meta.get("proxy")
        # 判断是否被对方封禁
        if response.status in (401, 403):
            print(f"{cur_proxy} got wrong code {self.state[cur_proxy]} times")
            # 给相应的IP失败次数+1
            # self.state[cur_proxy] += 1
            self.r.hincrby(self.proxy_stats_key, cur_proxy, 1)
        # 当某个IP的失败次数累计到一定数量
        failed_times = self.r.hget(self.proxy_stats_key, cur_proxy) or 0
        if int(failed_times) >= self.max_failed:
            print("got wrong http code {%s} when use %s" % (response.status, request.get("proxy")))
            # 可以认为该IP已经被对方封禁了,从代理池中将该IP删除
            self.remove_proxy(cur_proxy)
            del request.meta["proxy"]
            # 重新请求重新安排调度下载
            return request
        return response

    def process_exception(self, request, exception, spider):
        # 4.请求失败,调用process_exception
        cur_proxy = request.meta.get("proxy")

        # 如果本次请求使用了代理,并且网络请求报错,认为该IP出现问题了
        if cur_proxy and isinstance(exception, (ConnectionRefusedError, TimeoutError)):
            print(f"error occur where use proxy {exception} {cur_proxy}")
            self.remove_proxy(cur_proxy)
            del request.meta["proxy"]
            return request


    def remove_proxy(self, proxy):
        """在代理IP列表中删除指定代理"""
        if proxy in self.proxies:
            self.r.lrem(self.proxy_key, 1, proxy)
            print("remove %s from proxy list" % proxy)

 settings

DOWNLOADER_MIDDLEWARES = {
   'xpc.middlewares.RandomProxyMiddleware': 749,
}

# 代理ip
PROXY_REDIS_KEY = "discovery:proxy"

scrapy-redis的使用

pip install scrapy-redis

 settings中配置

# Enables scheduling storing requests queue in redis.
SCHEDULER = "scrapy_redis.scheduler.Scheduler"

# Ensure all spiders share same duplicates filter through redis.
DUPEFILTER_CLASS = "scrapy_redis.dupefilter.RFPDupeFilter"
# REDIS_URL = 'redis://ip:6379'
REDIS_HOST = 'host'
REDIS_PORT = 6379
REDIS_PARAMS = {
   'password': 'pass',
}

会根据redis中的数据进行继续爬取


使用RedisSpider

from scrapy_redis.spiders import RedisSpider

会根据redis中的url,自动进行抓取,是不需要start_urls的,我们只要在redis中的discovety:start_urls中增加一条url数据。(lpush diccovery:start_urls url)。

这样我们就可以使用运行多个爬虫,只要一有url,一起进行爬取


selenium

简单使用

from selenium import webdriver
driver = webdriver.Chrome()
driver.get("http://baidu.com")

进行关键字搜素

kw = driver.find_element_by_id("kw")
kw.send_keys("Python")
su = driver.get_element_by_id("su")
su.click()    

获取标题

h3_list = driver.find_elements_by_tag_name("h3")
for h3 in h3_list:
     print(h3.text)

 输出

python官方下载_飞桨PaddlePaddle-开源深度学习平台
2020新版python_免费下载
python-python下载免费
python_万和-打造Python全栈开发工程师
python编程_在家就能让孩子学习编程的教育平台
Welcome to Python.org官方
Python(计算机程序设计语言)_百度百科
python官网 - Download Python | Python.org
Python 基础教程 | 菜鸟教程
Python还能火多久?
Python教程 - 廖雪峰的官方网站
你都用 Python 来做什么? - 知乎
Python3 *** 运算符_极客点儿-CSDN博客_python **
Python基础教程,Python入门教程(非常详细)
Python-薯条编程-在线教程-小班授课高薪就业培训

运行js

driver.execute_script("alert('123')")
driver.execute_script("window.scrollTo(300, document.body.scrollHeight)")

启动浏览器

from selenium import webdriver
# 启动chrome浏览器
driver = webdriver.Chrome()
# 指定chromedriver的路径并启动Chrome
driver = webdriver.Chrome(executable_path='/home/user/chromedirver')
#启动chrome-headless
from selenium.webdriver.chrome.options import Options
option = Options()
option.add_argument('--headless')
driver = webdriver.Chrome(chrome_options=option)

# 启动phantomjs
driver = webdriver.PhantomJs()

chrome-headless是无界面版的chrome,它替代了停止维护的phantomjs.


控制浏览器

# 访问某个url
driver.get('https://www.baidu.com')
# 刷新
driver.refersh()
# 前进
driver.forward()
# 后退
driver.back()
#退出
driver.quit()
# 当前的url
driver.current_url
# 截图
driver.save_screenshot('/tmp/test.png')

元素查找

 18个find函数

# 根据元素的class属性的值查找 
driver.find_element_by_class_name
# 用CSS选择器查找
driver.find_element_by_css_selector
# 根据元素的ID
 driver.find_element_by_id
# 根据链接内的文本查找
find_element_by_link_text
# 根据元素的name属性查找 
find_element_by_name
# 根据链接内的文本是否包含指定的查找文字
find_element_by_partial_link_text
# 根据标签名查找
find_element_by_tag_name
# 根据xpath表达示查找 
find_element_by_xpath

执行js

# 将网页滚动到最底部
driver.execute_script('window.scrollTo(0, document.body.scrollHeight)')
# 执行异步的js函数
driver.execute_async_script('send_xml_request()')

等待,wait

  • 隐式等待

    # 查找某个(某些)元素,如果没有立即查找到,则等待10秒
    driver.implicityly_wait(10)
  • 显式等待

    from selenium.webdriver.support.ui import WebDriverWait
    from selenium.webdriver.support import expected_conditions as EC
    from selenium.webdriver.common.by import By
    # 查找一个按钮
    # 最长等待10秒,直到找到查找条件中指定的元素
    sort_btn = WebDriverWait(driver, 10).until(
            EC.presence_of_element_located((By.XPATH, './/div[@class="f-sort"]/a[2]'))
    )

    这两个等待,显示等待通常更符合我们的程序逻辑。当我们对页面的加载方式还不太确定的时候,也可以隐式等待。

爬取京东

import sys
import time
from selenium import webdriver
from selenium.webdriver.common.keys import Keys
from selenium.common.exceptions import NoSuchElementException
from selenium.webdriver.chrome.options import Options
import pyexcel


if __name__ == '__main__':
    keyword = '小米mix2s'
    if len(sys.argv) > 1:
        keyword = sys.argv[1]

    # 设置不打开浏览器
    options = Options()
    options.add_argument("--headless")
    driver = webdriver.Chrome(chrome_options=options)
    driver.get("https://www.jd.com/")

    # 输入关键字
    kw = driver.find_element_by_id("key")
    kw.send_keys(keyword)
    kw.send_keys(Keys.ENTER)
    # 截个图
    driver.save_screenshot("1.png")

    # 销量进行排序
    time.sleep(2)
    sort_btn = driver.find_element_by_xpath(".//div[@class='f-sort']/a[2]")
    sort_btn.click()
    driver.save_screenshot("2.png")

    has_next = True
    rows = []
    while has_next:
        time.sleep(3)
        curr_page = driver.find_element_by_xpath("//div[contains(@class,'page')]//a[@class='curr']").text
        print("----------------current page is %s----------------" % curr_page)
        # 先获取整个商品区域的尺寸坐标
        goods_list = driver.find_element_by_id("J_goodsList")
        # 根据区域的大小决定往下滑动多少
        y = goods_list.rect["y"] + goods_list.rect["height"]
        driver.execute_script("window.scrollTo(0, %s)" % y)

        # 获取所有的商品节点
        products = driver.find_elements_by_class_name("gl-item")
        for product in products:
            row = {}
            sku = product.get_attribute("data-sku")
            row["price"] = product.find_element_by_css_selector(f"strong.J_{sku}").text
            row["name"] = product.find_element_by_css_selector("div.p-name>a>em").text
            row["comments"] = product.find_element_by_id(f"J_comment_{sku}").text
            try:
                row["shop"] = product.find_element_by_css_selector("div.p-shop>span>a").text
            except NoSuchElementException as e:
                row["shop"] = ""
            rows.append(row)
            print(row)
        next_page = driver.find_element_by_css_selector("a.pn-next")

        if "disabled" in next_page.get_attribute("class"):
            has_next = False
        else:
            next_page.click()

    pyexcel.save_as(records=rows, dest_file_name=f"{keyword}.xls")
    # 退出
    driver.quit()

 结果:两张png图片和一个excel表格


抓取去哪儿网

import sys
import time
from selenium import webdriver
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys


if __name__ == "__main__":
    start_name = "北京"
    dest_name = "青岛"
    if len(sys.argv) > 1:
        start_name = sys.argv[1]
        dest_name = sys.argv[2]
    driver = webdriver.Chrome()
    driver.get("https://www.qunar.com/?ex_track=auto_4e0d874a")

    # 起始地
    start = WebDriverWait(driver, 10).until(
        EC.presence_of_element_located((By.XPATH, "//input[@name='fromCity']"))
    )
    start.clear()
    start.send_keys(start_name)
    time.sleep(0.5)
    start.send_keys(Keys.ENTER)

    # 目的地
    dest = WebDriverWait(driver, 10).until(
        EC.presence_of_element_located((By.XPATH, "//input[@name='toCity']"))
    )
    # dest = driver.find_element_by_xpath("//input[@name='toCity']")
    dest.send_keys(dest_name)
    time.sleep(0.5)
    dest.send_keys(Keys.ENTER)

    search = driver.find_element_by_css_selector("button.button-search")
    search.click()

    # 获取航班数据
    flights = WebDriverWait(driver, 10).until(
        EC.presence_of_all_elements_located(By.XPATH, "//div[@class='m-airfly-lst']/div[@class='b-airfly']")
    )

    for flight in flights:
        f_data = {}
        airlines = flight.find_elements_by_xpath(".//div[@class='d-air']")
        f_data["airlines"] = [airlines.text for airlines in airlines]
        f_data["depart"] = flight.find_element_by_xpath(".//div[@class='sep-lf']").text
        f_data["duration"] = flight.find_element_by_xpath(".//div[@class='sep-ct']").text
        f_data["dest"] = flight.find_element_by_xpath(".//div[@class='sep-rt']").text
        # 对价格的处理,价格有一个基础值和真实值的偏移
        fake_price = list(flight.find_element_by_xpath(".//span[@class='prc_wp']/em/b[1]").text)
        covers = flight.find_elements_by_xpath(".//span[@class='prc_wp']/em/b[position()>1]")
        for c in covers:
            index = int(c.value_of_css_property('left')[:-2]) // c.size['width']
            fake_price[index] = c.text
        f_data["price"] = "".join(fake_price)

 真实价格的计算


splash

文档

  :point_right: 官方文档

安装

docker pull scrapinghub/splash
docker run -it -d  -p 8050:8050 --rm scrapinghub/splash

使用

  1. 在浏览器输入ip+host,并请求京东

  2. 可以看到

  3. 输入http://localhost:8050/render.html?url=https://search.jd.com/Search?keyword=%E5%B0%8F%E7%B1%B310&enc=utf-8&suggest=1.def.0.V08–38s0&wq=%E5%B0%8F%E7%B1%B3&pvid=c18d37ab55764cc4ac71e124bc496035


cmd使用

  1. curl "http://codekiller.top:8050/render.html?url=https://search.jd.com/Search?keyword=%E5%B0%8F%E7%B1%B310&enc=utf-8&suggest=1.def.0.V08--38s0&wq=%E5%B0%8F%E7%B1%B3&pvid=c18d37ab55764cc4ac71e124bc496035" -o 小米.html
  2. 打开htm文件

  3. 操作(获取所有价格)

    from lxml import etree
    
    file = open('C:\\Users\\MyPC\\小米.html', "r", encoding="UTF-8")
    text = file.read()
    selector = etree.HTML(text)
    prices = selector.xpath("//div[@class='p-price']/strong/i/text()")
    print(prices)

爬取京东

from urllib.parse import urlparse, urlencode, quote
from lxml import etree
import requests

ua = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/81.0.4044.138 Safari/537.36"
headers = {"User-Agent": ua}

keyword = "小米"
params = dict(
    keyword=keyword,
    enc="utf-8",
    wq=keyword,
    pvid="57486a4adb40455dbba829de75133672"
)
query_string = "&".join(("%s=%s" % (K, V)) for K, V in params.items())
jd_url = "https://search.jd.com/Search?" + query_string
url = "http://codekiller.top:8050/render.html?url=" + quote(jd_url)

r = requests.get(url, headers=headers)

selector = etree.HTML(r.text)

price_list = selector.xpath("//div[@class='p-price']/strong/i/text()")
name_list = selector.xpath("//div[contains(@class,'p-name')]/a/em/text()")

for name, price in zip(price_list, name_list):
    print(name, price)

反爬虫

User-Agent识别

修改请求头信息里的User-Agent

请求头信息识别

比如说referer, content-type,请求方法(POST, GET)

构造相应的请求头信息。比如说referer,我们在提取URL的时候,要把URL所在页面的URL也存储起来,并放到request.headers。

异步加载

我们需要分析页面的网络请求,从中找出和我们想要的数据相关的请求,并分析它的请求头信息、参数、cookie,然后根据这些信息构造我们的请求。通常来说都是ajax请求,也有图片请求,比如图片的lazy load,通过js在页面加载后修改图片的src属性。一般都会有其他的自定义属性存在,比如说”_src”。总之,可以找到一些ID或者链接。注意观察相关dom节点树上的特殊属性。

请求参数加密

一般是在前端通过一定的计算,构造出一个哈希值。需要分析前端的代码,找出具体计算的代码逻辑,并用python再实现。如果前端的代码经过混淆,并且代码量十分巨大,可以使用selenium或者splash等引擎去请求。但是,如果爬取的数据需求量比较大,我们还是要通过直接调用对方接口的形式去获取数据。


请求结果加密

json数据里面加密

比如携程酒店房型列表接口,用它自己的js解密,或者分析它的js前端逻辑,用python代码实现出来。

CSS加密

比如大众点评,通过CSS样式去代替某个字符。我们需要同时爬取CSS文件,并且分析CSS文件内的样式,最后定位到svg文件,并分析提取svg内的内容,完成替换。

字体加密

比如猫眼电影。每次随机返回一个字体文件,并且字符也是随机的。需要每次下载对应的字体文件,并解析字体文件和字符之间的对应关系。

Cookie限制

登录、session限制,比如新片场,拿到登录以后的cookie,然后set到头信息里面,这样请求的时候就相当于登录了。

IP频率限制

需要准备大量的IP代理,获得IP代理的方式有:

  1. 自己搭建代理服务器(tinyproxy, squid+动态拨号,DDNS)
  2. 付费购买
  3. 爬取公开网络上代理(可用性比较低)

控制爬取频率,保持不被封的情况下的最合适的并发数量。

验证码

  1. 尝试可否绕过前端验证,直接请求具体的接口,以绕开验证码.
  2. 可以用图片识别库去识别某些比较简单的验证码
  3. 接入云打码平台
  4. 用机器学习训练验证码的图片库,然后识别

文章作者: 小莫の咕哒君
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 小莫の咕哒君 !
评论
  目录