urilib
get请求
import urllib.request
import json
res = urllib.request.urlopen("http://httpbin.org/get")
# 读取response的内容
text = res.read()
# http返回状态码
print(res.status, res.reason)
obj = json.loads(text)
print(obj)
for k, v in res.headers._headers:
print(f"({k}:{v})")
输出
自定义头信息
import urllib.request
import json
# 添加自定义的头信息
ua = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/81.0.4044.138 Safari/537.36"
req = urllib.request.Request("http://httpbin.org/user-agent")
req.add_header("User-Agent", ua)
res = urllib.request.urlopen(req)
resp = json.load(res)
print("user-agent", resp["user-agent"])
身份认证
import urllib.request
import json
auth_handler = urllib.request.HTTPBasicAuthHandler()
auth_handler.add_password(realm="httpbin auth", uri="/basic-auth/admin/123456",
user="admin", passwd="123456")
opener = urllib.request.build_opener(auth_handler)
urllib.request.install_opener(opener)
res = urllib.request.urlopen("http://httpbin.org")
print(res.read().decode("utf-8"))
带参数(get,post)
import urllib.request
import urllib.parse
import json
# get
params = urllib.parse.urlencode({"param": 1, "eggs": 2, "bacon": 2})
url = f"http://httpbin.org/get?{params}"
res = urllib.request.urlopen(url)
print(json.load(res))
# post
data = urllib.parse.urlencode({"name": "小明", "age": 20})
data = data.encode()
res = urllib.request.urlopen("http://httpbin.org/post", data)
print(json.load(res))
requests
get,post和带参数请求
import requests
# get请求
res = requests.get("http://httpbin.org/get")
print(res.status_code, res.reason)
print(res.text)
# 带参数的get请求
res = requests.get("http://httpbin.org/get", params={"a": 1, "b": "2"})
print(res.json())
# post请求
res = requests.post("http://httpbin.org/post", data={"a": 1})
print(res.json())
结果
自定义headers和带cookie请求
# 自定义headers请求
ua = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/81.0.4044.138 Safari/537.36"
headers = {"User-Agent": ua}
res = requests.get("http://httpbin.org/headers", headers=headers)
print("自定义headers请求:\n", res.json())
# 带cookie的请求
cookies = dict(userid = "123456", token="xxxxxxxxxxxxxxxxx")
res = requests.get("http://httpbin.org/cookies", cookies=cookies)
print("带cookie的请求:\n", res.json())
输出
身份认证
res = requests.get("http://httpbin.org/basic-auth/admin/123456", auth=("admin", 123456))
print("Basic-auth认证请求:\n", res.json())
输出
主动抛出状态异常
bad_r = requests.get("http://httpbin.org/status/404")
print(bad_r.status_code)
bad_r.raise_for_status()
输出
session使用(带cookie)
# 使用requests.Session()对象请求
s = requests.Session()
# session对象会保存服务器返回的set-cookies头信息里面的内容
s.get("http://httpbin.org/cookies/set/userid/123456789")
# 下一次请求会自动将本地所有的cookies信息自定添加到头信息里面
r = s.get("http://httpbin.org/cookies")
print("检查session中的cookies", r.json())
输出
延迟
res = requests.get("http://httpbin.org/delay/4", timeout=5)
print(res.text)
输出
bs4
简单使用
from bs4 import BeautifulSoup
import doc_html
soup = BeautifulSoup(doc_html.html_doc, "html.parser")
# input标签
print("-----input-----")
print(soup.input.attrs)
print(soup.input.attrs['style'])
print(soup.input.has_attr('type'))
# p标签
print("-----p-----")
print(soup.p)
# 获取第一个p标签下的所有的子结点
print(type(soup.p.children))
print(list(soup.p.children)[0])
print(list(soup.p.children)[0].text)
# 指定查找
print("-----指定查找-----")
print(soup.find_all("p"))
print(soup.find(type="text"))
print(soup.find(id="btt"))
# 找到所有满足条件的(集合)
print(soup.select(".first-2"))
print(soup.select("#btt"))
print(soup.select(".first-2 p"))
输出(从input开始)
-----input-----
{'type': 'text', 'style': 'background-color: transparent; border:0px;'}
background-color: transparent; border:0px;
True
-----p-----
<p><span>这是个盒子模型,运用了HTML和CSS的知识创建,里面含有很多其他知识,比如对图像的理解,页面设计等。
这些知识想要掌握好,需要经过很久的训练,对各种标签和属性都了解</span>
<span>第二个了</span>
</p>
<class 'list_iterator'>
<span>这是个盒子模型,运用了HTML和CSS的知识创建,里面含有很多其他知识,比如对图像的理解,页面设计等。
这些知识想要掌握好,需要经过很久的训练,对各种标签和属性都了解</span>
这是个盒子模型,运用了HTML和CSS的知识创建,里面含有很多其他知识,比如对图像的理解,页面设计等。
这些知识想要掌握好,需要经过很久的训练,对各种标签和属性都了解
-----指定查找-----
[<p><span>这是个盒子模型,运用了HTML和CSS的知识创建,里面含有很多其他知识,比如对图像的理解,页面设计等。
这些知识想要掌握好,需要经过很久的训练,对各种标签和属性都了解</span>
<span>第二个了</span>
</p>, <p>这是一个用来判断是否是透明色的盒子</p>]
<input style="background-color: transparent; border:0px;" type="text"/>
<h2 id="btt">这是一个标题</h2>
[<div class="first-2">
<h2>这里可能用来有透明色</h2>
<p>这是一个用来判断是否是透明色的盒子</p>
</div>, <div class="first-2">
<button onclick="btn()">点击!</button>
<h2 id="btt">这是一个标题</h2>
</div>]
[<h2 id="btt">这是一个标题</h2>]
[<p>这是一个用来判断是否是透明色的盒子</p>]
使用lxml解析器
from bs4 import BeautifulSoup
from lxml import etree
import doc_html
soup = BeautifulSoup(doc_html.html_doc, "lxml")
print(soup.input)
lxml和xpath
xpath语法
表达式 | 描述 |
---|---|
nodename | 选取此节点的所有子节点。 |
/ | 从根节点选取。 |
// | 从匹配选择的当前节点选择文档中的节点,而不考虑它们的位置。 |
. | 选取当前节点。 |
.. | 选取当前节点的父节点。 |
@ | 选取属性。 |
following-sibling::span[1] | 附近的节点 |
lxml使用
from bs4 import BeautifulSoup
from lxml import etree
import doc_html
soup = BeautifulSoup(doc_html.html_doc, "lxml")
print(soup.input)
print("----------------------------------华丽的分割线--------------------------------------------")
selector = etree.HTML(doc_html.html_doc)
# 取出所有的链接
links = selector.xpath("//div[@class='first-2']/a/@href")
for link in links:
print(link)
a_ = selector.xpath("//div[@class='first-2']/a")
print(a_)
text = a_[0].xpath("../h2/text()")
print(text)
# 第一个
print(selector.xpath("//select/option[1]/text()"))
# 到处第一个
print(selector.xpath("//select/option[last()]/text()"))
# 到处第二个
print(selector.xpath("//select/option[last()-1]/text()"))
# 前两个
print(selector.xpath("//select/option[position()<3]/text()"))
# 指定标签内部情况和样式名
print(selector.xpath("//table/tr[td>21]/td[@class='age info']/@class"))
# 多样式名
print(selector.xpath("//table/tr/td[contains(@class, 'age info') and contains(@id, 'age')]/text()"))
输出
<input style="background-color: transparent; border:0px;" type="text"/>
----------------------------------华丽的分割线--------------------------------------------
http://baidu.com
https://blog.csdn.net/
[<Element a at 0x14c3fb8e480>, <Element a at 0x14c3fb8e500>]
['这里可能用来有透明色']
['信息学院']
['药学院']
['体育学院']
['信息学院', '护理学院']
['age info', 'age info', 'age info']
['21']
爬去下厨房的所有图片
requests+bs4
from urllib.parse import urlparse
import os
import requests
from bs4 import BeautifulSoup
ua = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/81.0.4044.138 Safari/537.36"
headers = {"User-Agent": ua}
r = requests.get('http://www.xiachufang.com', headers=headers)
soup = BeautifulSoup(r.text, "lxml")
img_list = []
for img in soup.select("img"):
if img.has_attr("data-src"):
img_list.append(img.attrs["data-src"])
else:
img_list.append(img.attrs["src"])
# 保存图片
img_dir = "E:\\知识学习\\2019版-千锋爬虫-源码+笔记+作业\\爬虫\\下厨房"
if not os.path.isdir(img_dir):
os.mkdir(img_dir)
for img in img_list:
if img.strip() != '':
o = urlparse(img)
filename = o.path[1:].split("@")[0]
filepath = os.path.join(img_dir, filename)
# 有可能有多重目录,不存在的话就创建
if not os.path.exists(os.path.dirname(filepath)):
print(os.path.exists(os.path.dirname(filepath)))
os.mkdir(os.path.dirname(filepath))
# 获取图片
url = f"{o.scheme}://{o.netloc}/{filename}"
resp = requests.get(url)
# 保存图片
with open(filepath, "wb") as file:
for chunk in resp.iter_content(1024):
file.write(chunk)
file.close()
pycurl+re
import re
from pycurl import Curl
from urllib.parse import urlparse
from io import BytesIO
import os
buffer = BytesIO()
c = Curl()
c.setopt(c.URL, 'http://www.xiachufang.com')
c.setopt(c.WRITEDATA, buffer)
c.perform()
c.close()
body = buffer.getvalue()
text = body.decode("utf-8")
img_list = re.findall(r"src=\"(http://i2\.chuimg\.com/\w+\.jpg)", text)
# 保存图片
img_dir = "E:\\知识学习\\2019版-千锋爬虫-源码+笔记+作业\\爬虫\\下厨房"
if not os.path.isdir(img_dir):
os.mkdir(img_dir)
for img in img_list:
print(img)
if img.strip() != '':
o = urlparse(img)
filename = o.path[1:]
filepath = os.path.join(img_dir, filename)
# 有可能有多重目录,不存在的话就创建
if not os.path.exists(os.path.dirname(filepath)):
print(os.path.exists(os.path.dirname(filepath)))
os.mkdir(os.path.dirname(filepath))
# 获取图片
url = f"{o.scheme}://{o.netloc}/{filename}"
# 保存图片
with open(filepath, "wb") as file:
c = Curl()
c.setopt(c.URL, url)
c.setopt(c.WRITEDATA, file)
c.perform()
c.close()
迁木网(多线程)
普通版本(借用队列)
import requests
from lxml import etree
import os
from queue import Queue
import threading
import time
start_url = "http://www.qianmu.org/ranking/1528.htm"
link_queue = Queue()
threads_num = 10
threads = []
download_pages = 0
def fetch(url):
"""链接请求"""
resp = requests.get(url)
if resp.status_code != 200:
resp.raise_for_status()
return resp.text.replace("\t", "")
def parse_university(link):
"""处理大学详情页面"""
resp = fetch(link)
selector = etree.HTML(resp)
data = {}
# 学校名
data["name"] = selector.xpath("//div[@id='wikiContent']/h1/text()")[0]
# 信息
try:
table = selector.xpath("//div[@class='infobox']//table")[0]
except IndexError as e:
print("无表格信息")
return None
keys = table.xpath(".//td[1]/p/text()")
values = table.xpath(".//td[2]/p//text()")
print(len(keys), len(values))
if len(keys) > len(values):
return None
data.update(zip(keys, values))
return data
def download():
while True:
# 阻塞,直到从队列里获取一条消息
link = link_queue.get()
if link is None:
break
# 提取详情页的信息
data = parse_university(link)
global download_pages
download_pages += 1
if data:
print(data)
link_queue.task_done()
print(f'remaining queue is {link_queue.qsize()}')
if __name__ == "__main__":
# 开始时间
start_time = time.time()
# 请求入口页面
ua = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/81.0.4044.138 Safari/537.36"
headers = {"User-Agent": ua}
resp = requests.get(start_url, headers=headers)
selector = etree.HTML(resp.text)
# 提取列表页面的链接
links = selector.xpath("//div[@class='rankItem']//td[2]/a/@href")
for link in links:
if not link.startswith("http://www.qianmu.org"):
link = "http://www.qianmu.org/" + link
link_queue.put(link)
# 多线程执行
for i in range(threads_num):
t = threading.Thread(target=download)
t.start()
threads.append(t)
# 阻塞队列,直到队列被清空
link_queue.join()
for i in range(threads_num):
link_queue.put(None)
# 退出线程
for i in threads:
t.join()
finished_time = time.time()
cost_seconds = finished_time-start_time
print(f"download finished!!!耗时:{cost_seconds}s,抓取界面:{download_pages}个")
升级版本(借用redis)
import requests
from lxml import etree
import os
import signal
from queue import Queue
import threading
import time
import redis
start_url = "http://www.qianmu.org/ranking/1528.htm"
link_queue = Queue()
threads_num = 10
threads = []
thread_on = True
download_pages = 0
my_redis = redis.Redis(host="host", password="password")
def fetch(url):
"""链接请求"""
resp = requests.get(url)
if resp.status_code != 200:
resp.raise_for_status()
return resp.text.replace("\t", "")
def parse_university(link):
"""处理大学详情页面"""
resp = fetch(link)
selector = etree.HTML(resp)
data = {}
# 学校名
data["name"] = selector.xpath("//div[@id='wikiContent']/h1/text()")[0]
# 信息
try:
table = selector.xpath("//div[@class='infobox']//table")[0]
except IndexError as e:
print("无表格信息")
return None
keys = table.xpath(".//td[1]/p/text()")
values = table.xpath(".//td[2]/p//text()")
print(len(keys), len(values))
if len(keys) > len(values):
return None
data.update(zip(keys, values))
return data
def download(i):
while thread_on:
# 阻塞,直到从队列里获取一条消息
link = my_redis.lpop("qianmu.queue")
if link:
# 提取详情页的信息
data = parse_university(link)
global download_pages
download_pages += 1
if data:
print(data)
print(f'remaining queue is {my_redis.llen("qianmu.queue")}')
print(f"Thread-{i} exit now")
def signal_handler(signum, frame):
print("received Ctrl+C, wait for exit gracefully")
global thread_on
thread_on = False
def exit_handler(i):
global thread_on
while thread_on:
if input("") == "exit":
thread_on = False
print(f"Thread-{i} exit now")
if __name__ == "__main__":
# 开始时间
start_time = time.time()
# 请求入口页面
ua = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/81.0.4044.138 Safari/537.36"
headers = {"User-Agent": ua}
resp = requests.get(start_url, headers=headers)
selector = etree.HTML(resp.text)
# 提取列表页面的链接
links = selector.xpath("//div[@class='rankItem']//td[2]/a/@href")
for link in links:
if not link.startswith("http://www.qianmu.org"):
link = "http://www.qianmu.org/" + link
if my_redis.sadd("qianmu.seen", link):
my_redis.rpush("qianmu.queue", link)
# 多线程执行
for i in range(threads_num):
t = threading.Thread(target=download, args=(i + 1,))
t.start()
threads.append(t)
# win系统下没用
signal.signal(signal.SIGINT, signal_handler)
# 关闭
t = threading.Thread(target=exit_handler, args=(threads_num + 1,))
t.start()
threads.append(t)
# 阻塞队列,直到队列被清空
link_queue.join()
# 退出线程
for i in threads:
t.join()
finished_time = time.time()
cost_seconds = finished_time - start_time
print(f"download finished!!!耗时:{cost_seconds}s,抓取界面:{download_pages}个")
这里注意一下,signal在win下没用,我就多加了一个线程用于监听输入的”exit”,当存在输入时,退出程序。
scrapy的安装和简单实用
安装
:point_right: https://blog.csdn.net/qq_44766883/article/details/107790504
基本使用
import scrapy
class QuoteSpider(scrapy.Spider):
name = "quote"
start_urls = ["http://quotes.toscrape.com/"]
def parse(self, response):
# quotes = response.css("div.quote")
quotes = response.xpath("//div[@class='quote']")
for quote in quotes:
yield {
"text": quote.css("span.text::text").extract_first(),
"author": quote.xpath("./span/small/text()").extract_first(),
}
next_page = response.xpath("//li[@class='next']/a/@href").extract_first()
if next_page:
yield response.follow(next_page, self.parse)
运行命令
控制台输出
scrapy runspider quotes_spider.py
保存到指定文件
scrapy runspider scrapy_learn/quotes_spider.py -o ./scrapy_learn/quotes.json
指定文件类型
scrapy runspider scrapy_learn/quotes_spider.py -o ./scrapy_learn/quotes.csv -t csv
常用命令
创建一个项目
scrapy startproject qianmu
初始化一个爬虫文件
# scrapy genspider [爬虫名字] [目标网站域名] scrapy genspider qianmu_new qianmu.iguye.com
运行爬虫
# 运行名为qianmu_new的爬虫 scrapy crawl qianmu_new scrapy crawl qianmu_new -o qianmu_new.json scrapy crawl qianmu_new -o qianmu_new.csv -t csv # 单独运行爬虫文件 scrapy runspider quotes_spider.py scrapy runspider scrapy_learn/quotes_spider.py -o ./scrapy_learn/quotes.json scrapy runspider scrapy_learn/quotes_spider.py -o ./scrapy_learn/quotes.csv -t csv
创建以下文件,便于直接运行
调试爬虫
# 进入到scrapy控制台,使用的是项目的环境
scrapy shell
# 带一个URL参数,将会自动请求这个url,并在请求成功后进入控制台
scrapy shell http://www.qianmu.org/ranking/1528.html
# 调用parse方法
result = spider.parse(response)
# result是一个生成器,没什么疑惑好吧
type(result):<generator object QianmuNewSpider.parse at 0x0000025096AEF200>
# one其实就是一个Request对象
one = next(result)
one:<GET http://www.qianmu.org/%E9%BA%BB%E7%9C%81%E7%90%86%E5%B7%A5%E5%AD%A6%E9%99%A2>
type(one):<class 'scrapy.http.request.Request'>
# callback其实就是yield response.follow(link, self.parse_university)中的 parse_university
one.callback:<bound method QianmuNewSpider.parse_university of <QianmuNewSpider 'qianmu_new' at 0x25096aa3640>>
# 继续请求
fetch(one) # 输出:2020-08-04 20:54:46 [scrapy.core.engine] DEBUG: Crawled (200) <GET http://www.qianmu.org/%E9%BA%BB%E7%9C%81%E7%90%86%E5%B7%A5%E5%AD%A6%E9%99%A2> (referer: None) ['cached']
data = next(response) # 输出了:18 26
data # 输出一个请求抓取的数据
# 可以进行循环爬取
for req in result:
... fetch(req)
进入到控制台以后,可以使用以下函数和对象
A | B |
---|---|
fetch | 请求url或者Requesrt对象,注意:请求成功以后会自动将当前作用域内的request和responsne对象重新赋值 |
view | 用浏览器打开response对象内的网页 |
shelp | 打印帮助信息 |
spider | 相应的Spider类的实例 |
settings | 保存所有配置信息的Settings对象 |
crawler | 当前Crawler对象 |
scrapy | scrapy模块 |
# 用项目配置下载网页,然后用浏览器打开网页
scrapy view url
# 用项目配置下载网页,然后输出至控制台
scrapy fetch url
迁木网(scrapy)
主要代码
import scrapy
from items import UniversityItem
class QianmuNewSpider(scrapy.Spider):
name = 'qianmu_new'
# 允许爬的域名内的url,比如qianmu.org,那么www.qianmu,org,mall.qianmu.org都能爬
allowed_domains = ['qianmu.org']
# 爬虫的入口地址,可以多些几个
start_urls = ['http://www.qianmu.org/ranking/1528.html']
# 当框架请求start_urls内的链接成功以后,就会调用该方法
def parse(self, response):
# 解析链接,并提取,extract返回的是一个列表,extract_first返回的是列表中的第一个
links = response.xpath("//div[@class='rankItem']//td[2]/a/@href").extract()
for link in links:
if not link.startswith("http://www.qianmu.org"):
link = "http://www.qianmu.org/" + link
# 让框架继续跟着这个链接,也就是说会再次发起请求
# 请求成功以后会调用指定的callback函数
yield response.follow(link, self.parse_university)
def parse_university(self, response):
"""处理大学详情页面"""
response = response.replace(body=response.text.replace("\t", "").replace("\r\n", ""))
item = UniversityItem()
data = {}
# 学校名
item["name"] = response.xpath("//div[@id='wikiContent']/h1/text()").extract_first()
# 信息
table = response.xpath("//div[@id='wikiContent']/div[@class='infobox']/table")
if table:
table = table[0]
keys = table.xpath(".//td[1]/p/text()").extract()
cols = table.xpath('.//td[2]')
# values = table.xpath(".//td[2]/p//text()").extract_first()
values = [' '.join(col.xpath('.//text()').extract_first()) for col in cols]
print(len(keys), len(values))
if len(keys) == len(values):
data.update(zip(keys, values))
print(data)
item["rank"] = data.get("排名")
item["country"] = data.get("国家")
item["state"] = data.get("州省")
item["city"] = data.get("城市")
item["undergraduate_num"] = data.get("本科生人数")
item["postgraduate_num"] = data.get("研究生人数")
item["website"] = data.get("网址")
yield item
items
import scrapy
class UniversityItem(scrapy.Item):
# define the fields for your item here like:
# name = scrapy.Field()
name = scrapy.Field()
rank = scrapy.Field()
country = scrapy.Field()
state = scrapy.Field()
city = scrapy.Field()
undergraduate_num = scrapy.Field()
postgraduate_num = scrapy.Field()
website = scrapy.Field()
pipelines
import pymysql
import redis
from scrapy.exceptions import DropItem
# 保存在redis中
class RedisPipeline:
# 开始调用一次
def open_spider(self, spider):
self.redis = redis.Redis(host="host", password="password")
# 关闭调用一次
def close_spider(self, spider):
self.redis.close()
# 每产生一个调用一次
def process_item(self, item, spider):
if self.redis.sadd(spider.name, item['name']):
return item
raise DropItem
# 保存在mysql中
class MysqlPipeline:
# 开始调用一次
def open_spider(self, spider):
self.conn = pymysql.connect(
host="127.0.0.1",
port=3306,
db="spider",
user="jiang",
password="jiang",
charset="utf8"
)
self.cur = self.conn.cursor()
# 关闭调用一次
def close_spider(self, spider):
self.cur.close()
slice.conn.close()
# 每产生一个调用一次
def process_item(self, item, spider):
# keys = item.keys()
# values = list(item.values) # 是一个元组-->集合
keys, values = zip(*item.items())
sql = "insert into universities({0}) values({1})".format(
','.join(keys),
','.join(['%s']*len(keys))
)
self.cur.execute(sql, values)
self.conn.commit()
# 输出语句
print(self.cur._last_executed)
return item
配置settings
ITEM_PIPELINES = {
'qianmu.pipelines.MysqlPipeline': 301,
'qianmu.pipelines.RedisPipeline': 300,
}
entrypoint
程序入口
from scrapy import cmdline
cmdline.execute(["scrapy", "crawl", "qianmu_new"])
数据库表
create table `universities`(
`name` varchar(256) NOT NULL COMMENT '学校名称',
`rank` varchar(32) DEFAULT NULLCOMMENT '学校排名',
`country` varchar(128) DEFAULT NULL COMMENT '国家',
`state` varchar(128) DEFAULT NULL COMMENT '州省',
`city` varchar(128) DEFAULT NULL COMMENT '城市',
`undergraduate_num` varchar(128) DEFAULT NULL COMMENT '本科生人数',
`postgraduate_num` varchar(128) DEFAULT NULL COMMENT '研究生人数',
`website` varchar(128) DEFAULT NULL COMMENT '网站地址',
primary key(`name`)
)ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 comment="大学信息表";
一张图
这张图一定要熟记呀!!!
The data flow in Scrapy is controlled by the execution engine, and goes like this:
- The Engine gets the initial Requests to crawl from the Spider.
- The Engine schedules the Requests in the Scheduler and asks for the next Requests to crawl.
- The Scheduler returns the next Requests to the Engine.
- The Engine sends the Requests to the Downloader, passing through the Downloader Middlewares (see
process_request()
). - Once the page finishes downloading the Downloader generates a Response (with that page) and sends it to the Engine, passing through the Downloader Middlewares (see
process_response()
). - The Engine receives the Response from the Downloader and sends it to the Spider for processing, passing through the Spider Middleware (see
process_spider_input()
). - The Spider processes the Response and returns scraped items and new Requests (to follow) to the Engine, passing through the Spider Middleware (see
process_spider_output()
). - The Engine sends processed items to Item Pipelines, then send processed Requests to the Scheduler and asks for possible next Requests to crawl.
- The process repeats (from step 1) until there are no more requests from the Scheduler.
中间件
process_request
在request对象传往downloader的过程中调用。当返回不同类型的值的时候,行为也不一样:
返回值 | 行为 |
---|---|
None | 一切正常,继续执行其他的中间件链 |
Response | 停止调用其他process_request和process_exception函数,也不再继续下载该请求,然后走调用process_response的流程 |
Request | 不再继续调用其他process_request函数,交由调度器重新安排下载。 |
IgnoreRequest | process_exception函数会被调用,如果没有此方法,则request.errback会被调用,如果errback也没有,则此异常会被忽略,甚至连日志都没有。 |
process_response
在将下载结果返回给engine过程中被调用
返回值 | 行为 |
---|---|
Response | 续续调用其他中间件的process_response |
Request | 不再继续调用其他process_request函数,交由调度器重新安排下载。 |
IgnoreRequest | 则request.errback会被调用,如果errback也没有,则此异常会被忽略,甚至连日志都没有。 |
process_exception
在下载过程中出现异常,或者在process_request中抛出IgnoreRequest异常的时候调用。
返回值 | 行为 |
---|---|
Response | 开始中间件链的process_response处理流程 |
Request | 不再继续调用其他process_request函数,交由调度器重新安排下载。 |
None | 继续调用其他中间件里的process_exception函数 |
from_crawler(cls, crawler)
如果存在该函数,则调用该函数创建中间件的实例。如果要写这个函数,一定要返回一个中间件的对象。
循环动态代理
settings
这是一个代理IP集合
PROXIES = [
# "http://54.243.170.209:8080",
"http://165.225.210.96:10605"
]
# 749是有讲究的,因为系统的默认代理中间件是750,我们自定义的要运行在默认的前面,而系统会先运行数字小的中间件
# 而为了防止影响其他的中间件,所以紧挨着750就可以了
DOWNLOADER_MIDDLEWARES = {
'qianmu.middlewares.RandomProxyMiddleware': 749,
}
middlewares
定义一个类创建一个中间件实例
class RandomProxyMiddleware(object):
def __init__(self, settings):
# 初始化变量和配置
self.proxies = settings.getlist("PROXIES")
self.state = defaultdict(int)
self.max_failed = 3
@classmethod
def from_crawler(cls, crawler):
# 1.创建中间件对象
if not crawler.settings.getbool("HTTPPROXY_ENABLED"):
raise NotConfigured
return cls(crawler.settings)
def process_request(self, request, spider):
# 3.为每个request对象分配一个随机的ip代理
if self.proxies and not request.meta.get("proxy"):
request.meta["proxy"] = random.choice(self.proxies)
def process_response(self, request, response, spider):
# 4.请求成功,调用process_response
cur_proxy = request.meta.get("proxy")
# 判断是否被对方封禁
if response.status in (401, 403):
print(f"{cur_proxy} got wrong code {self.state[cur_proxy]} times")
# 给相应的IP失败次数+1
self.state[cur_proxy] += 1
# 当某个IP的失败次数累计到一定数量
if self.state[cur_proxy] >= self.max_failed:
print("got wrong http code {%s} when use %s" % (response.status, request.get("proxy")))
# 可以认为该IP已经被对方封禁了,从代理池中将该IP删除
self.remove_proxy(cur_proxy)
del request.meta["proxy"]
# 重新请求重新安排调度下载
return request
return response
def process_exception(self, request, exception, spider):
# 4.请求失败,调用process_exception
cur_proxy = request.meta.get("proxy")
# 如果本次请求使用了代理,并且网络请求报错,认为该IP出现问题了
if cur_proxy and isinstance(exception, (ConnectionRefusedError, TimeoutError)):
print(f"error occur where use proxy {exception} {cur_proxy}")
self.remove_proxy(cur_proxy)
del request.meta["proxy"]
return request
def remove_proxy(self, proxy):
"""在代理IP列表中删除指定代理"""
if proxy in self.proxies:
self.proxies.remove(proxy)
print(f"remove {proxy} from proxy list")
内置中间件
scrapy.downloadermiddlewares.robotstxt.RobotsTxtMiddleware
请求robots.txt文件,并解析其中的规则。
scrapy.downloadermiddlewares.httpauth.HttpAuthMiddleware
执行带Basic-auth验证的请求
scrapy.downloadermiddlewares.downloadtimeout.DownloadTimeoutMiddleware
下载请求超时最大时长
scrapy.downloadermiddlewares.defaultheaders.DefaultHeadersMiddleware
设置默认的请求头信息
scrapy.downloadermiddlewares.useragent.UserAgentMiddleware
设置请求头信息里的User-Agent
scrapy.downloadermiddlewares.retry.RetryMiddleware
如果下载失败,是否重试,重试几次
scrapy.downloadermiddlewares.redirect.MetaRefreshMiddleware
实现Meta标签重定向
scrapy.downloadermiddlewares.httpcompression.HttpCompressionMiddleware
实现压缩内容的解析(比如gzip)
scrapy.downloadermiddlewares.redirect.RedirectMiddleware
实现30x的HTTP code的重定向
scrapy.downloadermiddlewares.cookies.CookiesMiddleware
实现对cookies的设置管理
scrapy.downloadermiddlewares.httpproxy.HttpProxyMiddleware
实现IP代理
scrapy.downloadermiddlewares.stats.DownloaderStats
下载信息的统计
scrapy.downloadermiddlewares.httpcache.HttpCacheMiddleware
下载结果的缓存
扩展
介绍
扩展框架提供一个机制,使得你能将自定义功能绑定到Scrapy。
扩展只是正常的类,它们在Scrapy启动时被实例化、初始化。
扩展一般分为三种状态:可用的(Available)、开启的(enabled)和禁用的(disabled)。一些扩展经常需要依赖一些特别的配置,比如HTTP Cache扩展是可用的但默认是禁用的,除非设置了HTTPCACHE_ENABLED配置项。通过将其顺序设置为None,即可禁用。
telnet
在cmd中
telnet 127.0.0.1 6023
此时会让你输入用户名和密码,用户名就是scrapy
,密码在我们的日志中输出了,如下
此时我们可以通过黑窗口查看爬虫的运行情况
# 查看方法和属性
dir()
from pprint import pprint
# 查看爬取情况
pprint(stats.get_stats)
当然还有很多的信息可以查看
日志插件
创建文件
代码
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author : codekiller
# @Time : 2020/8/5 23:36
# @Email : jcl1345414527@163.com
# @File : extensions.py
# @Description: 日志扩展
import logging
from collections import defaultdict
import datetime
from scrapy import signals
from scrapy.exceptions import NotConfigured
logger = logging.getLogger(__name__)
class SpiderOpenCloseLogging:
def __init__(self, item_count):
self.item_count = item_count
self.items_scraped = 0
self.items_dropped = 0
self.stats = defaultdict(int)
self.error_stats = defaultdict(int)
@classmethod
def from_crawler(cls, crawler):
# first check if the extension should be enabled and raise
# NotConfigured otherwise
if not crawler.settings.getbool('MYEXT_ENABLED'):
raise NotConfigured
# get the number of items from settings
item_count = crawler.settings.getint('MYEXT_ITEMCOUNT', 1000)
# instantiate the extension object
ext = cls(item_count)
# connect the extension object to signals
# 将对象中的指定方法绑定一个监听事件
crawler.signals.connect(ext.spider_opened, signal=signals.spider_opened)
crawler.signals.connect(ext.spider_closed, signal=signals.spider_closed)
crawler.signals.connect(ext.item_scraped, signal=signals.item_scraped)
crawler.signals.connect(ext.item_dropped, signal=signals.item_dropped)
crawler.signals.connect(ext.response_received, signal=signals.response_received)
# return the extension object
return ext
# 当spider开始爬取时发送该信号。该信号一般用来分配spider的资源,不过其也能做任何事
def spider_opened(self, spider):
print("====" * 20, "opened spider %s" % spider.name)
# 当某个spider被关闭时,该信号被发送。该信号可以用来释放每个spider在 spider_opened 时占用的资源
def spider_closed(self, spider):
print("====" * 20, "closed spider %s" % spider.name)
# 当item被爬取,并通过所有 Item Pipeline 后(没有被丢弃(dropped),发送该信号
def item_scraped(self, item, spider):
self.items_scraped += 1
if self.items_scraped % self.item_count == 0:
print("====" * 20, "scraped %d items" % self.items_scraped)
# 当item通过 Item Pipeline ,有些pipeline抛出 DropItem 异常,丢弃item时,该信号被发送
def item_dropped(self, item, spider, response, exception):
self.items_dropped += 1
if self.items_dropped % self.item_count == 0:
print("====" * 20, "dropped %d items" % self.items_dropped)
# 当引擎从downloader获取到一个新的 Response 时发送该信号
def response_received(self, response, request, spider):
now = datetime.datetime.now().strftime("%Y%m%d%H%M")
self.stats[now] += 1
# 记录响应异常的个数
if response.status in [401, 403, 404, 500, 501, 502]:
self.error_stats[now] += 1
# 当响应异常的比例大于0.2输出日志信息
if float(self.error_stats[now]) / self.stats[now] > 0.2:
logger.warning("received %s response, "
"and %s of item is none 200 in %s" % \
(self.stats[now], self.error_stats[now], now))
settings
增加一个配置,看我们的代码,有一个判断逻辑if not crawler.settings.getbool(‘MYEXT_ENABLED’):
# 使用自定义的插件
MYEXT_ENABLED =True
在EXTENSIONS数组中增加我们的自定义扩展
EXTENSIONS = {
#'scrapy.extensions.telnet.TelnetConsole': None,
'qianmu.extensions.SpiderOpenCloseLogging': 1
}
为了禁用一个默认开启的扩展(比如,包含在
EXTENSIONS_BASE
中的扩展), 需要将其顺序(order)设置为None
内置扩展
扩展在扩展类被实例化时加载和激活,实例化代码必须在类的构造函数(init)中执行。
‘scrapy.extensions.corestats.CoreStats’:0
名称:核心统计扩展
说明:如果统计收集器(stats collection)启用了,该扩展开启核心统计收集(参考 数据收集(Stats Collection))
‘scrapy.telnet.TelnetConsole’:0
- 名称:Telnet控制台扩展
- 说明:提供了一个telnet控制台,telnet控制台通过TELNETCONSOLE_ENABLED配置项开启,服务器会监听TELNETCONSOLE_PORT指定的端口
‘scrapy.extensions.memusage.MemoryUsage’:0
- 名称:内存使用扩展
- 说明:监听Scrapy进程内存使用量,如果使用内存量超过某个指定值,发送提醒邮件,如果超过某个指定值,关闭spider
‘scrapy.extensions.memdebug.MemoryDebugger’:0
- 名称:内存调试扩展
- 说明:该扩展用于调试内存使用量,开启该扩展,需要打开MEMDEBUG_ENABLED配置项
‘scrapy.extensions.closespider.CloseSpider’:0
- 名称:当某些状况发生,spider会自动关闭,用来为状况指定关闭方式
‘scrapy.extensions.feedexport.FeedExporter’:0
‘scrapy.extensions.logstats.LogStats’:0
- 名称:记录统计扩展
- 说明:记录基本的统计信息,比如爬取的页面和条目(items)
‘scrapy.extensions.spiderstate.SpiderState’:0
‘scrapy.extensions.throttle.AutoThrottle’:0
‘scrapy.extensions.statsmailer.StatsMailer’:0
- 名称:StatsMailer扩展
- 说明:这个简单的扩展可用来在一个域名爬取完毕时发送提醒邮件,包含Scrapy收集的统计信息。邮件会发送给通过STATSMAILER_RCPTS指定的所有接收人
新片场
创建项目
scrapy startproject xpc
创建requirements.txt
在xpc目录下创建requirements.txt文件
scrapy
redis
requests
pymysql
创建完成后,输入以下导入模块
pip install -r requirement.txt
初始化爬虫
scrapy genspider discovery xinpianchang.com
爬取逻辑
import scrapy
from scrapy import Request
import json
import random
import re
from items import PostItem, CommentItem, ComposerItem, CopyrightItem
from scrapy_redis.spiders import RedisSpider
def my_strip(info):
if info:
return info.strip()
return ""
cookies = {
'Authorization': 'A26F51084B88500BF4B885427B4B8858B394B885B7E7169365C9'
}
def gen_session_id():
return "".join(random.sample([chr(i) for i in range(97, 97 + 26)], 26))
def convert_int(s):
if type(s) is str:
return int(s.replace(",", ""))
return 0
class DiscoverySpider(scrapy.Spider):
name = 'discovery'
allowed_domains = ['xinpianchang.com', 'openapi-vtom.vmovier.com', 'app.xinpianchang.com']
start_urls = ['https://www.xinpianchang.com/channel/index/type-/sort-like/duration_type-0/resolution_type-/page-1']
# 翻页计数器
page_count = 0
# def start_requests(self):
# for url in self.start_urls:
# c = cookies.copy()
# c.update(PHPSESSID=gen_session_id(),
# channel_page="apU%3D") # 第20页
# request = Request(url, cookies=c, dont_filter=True)
# yield request
def parse(self, response):
self.page_count += 1
if self.page_count >= 50:
self.page_count = 0
cookies.update(PHPSESSID=gen_session_id())
post_list = response.xpath("//ul['video-list']/li")
url = "https://www.xinpianchang.com/a%s?from=ArticleList"
for post in post_list:
pid = post.xpath("./@data-articleid").get()
request = response.follow(url % pid, self.parse_post)
request.meta["pid"] = pid
request.meta["thumbnail"] = post.xpath("./a/img/@_src").get()
yield request
pages = response.xpath("//div[@class='page']/a/@href").extract()
for page in pages[1:]:
next_page = f"https://www.xinpianchang.com{page}"
yield response.follow(next_page, self.parse, cookies=cookies)
# 解析每个视频的信息
def parse_post(self, response):
pid = response.meta["pid"]
post = PostItem()
post["pid"] = pid
post["thumbnail"] = response.meta["thumbnail"]
post["title"] = response.xpath("//div[@class='title-wrap']/h3/text()").get()
cates = response.xpath("//span[contains(@class, 'cate')]//text()").extract()
post["category"] = "".join([cate.strip() for cate in cates])
post["created_at"] = response.xpath("//span[contains(@class, 'update-time')]/i/text()").get()
post["play_counts"] = response.xpath("//i[contains(@class, 'play-counts')]/@data-curplaycounts").get()
post["like_counts"] = response.xpath("//span[contains(@class, 'like-counts')]/@data-counts").get()
tags = response.xpath("//div[contains(@class, 'tag-wrapper')]/a/text()").extract()
post["tag"] = "-".join([tag.strip() for tag in tags])
desc = response.xpath("//p[contains(@class, 'desc')]/text()").get()
post["description"] = my_strip(desc)
# 视频
vid = response.xpath(
"//div[@class='filmplay-data-btn fs_12']//a[@class='collection-star hollow-star']/@data-vid").get()
video_url = f"https://openapi-vtom.vmovier.com/v3/video/{vid}?expand=resource&usage=xpc_web&appKey=61a2f329348b3bf77"
request = Request(video_url, callback=self.parse_video)
request.meta["post"] = post
yield request
# 评论
comment_url = f"https://app.xinpianchang.com/comments?resource_id={pid}&type=article&page=1&per_page=24"
request = Request(comment_url, callback=self.parse_comment)
request.meta["pid"] = pid
yield request
# 创作者
creator_url = response.xpath("//ul[@class='creator-list']/li/a/@href").extract()
for urn creator_url:
if url.startswith("/article"):
continue
cid = url[2:url.index("?")]
url = f"https://www.xinpianchang.com{url}"
request = response.follow(url, callback=self.parse_composer)
request.meta["dont_merge_cookies"] = True
request.meta["cid"] = cid
yield request
# 解析视频信息请求
def parse_video(self, response):
post = response.meta["post"]
result = json.loads(response.text)
post["video"] = result["data"]["resource"]["progressive"][0]["url"]
post["preview"] = result["data"]["video"]["cover"]
yield post
# 解析评论信息请求
def parse_comment(self, response):
result = json.loads(response.text)
for c in result["data"]["list"]:
comment = CommentItem()
comment["uname"] = c["userInfo"]["username"]
comment["avatar"] = c["userInfo"]["avatar"]
comment["uid"] = c["userInfo"]["id"]
comment["comment_id"] = c["id"]
comment["pid"] = c["resource_id"]
comment["content"] = c["content"]
comment["created_at"] = c["addtime"]
comment["like_counts"] = c["count_approve"]
if c["referid"]:
comment["referid"] = c["referid"]
yield comment
next_page = result["data"]["next_page_url"]
if next_page:
next_page = f"https://app.xinpianchang.com{next_page}"
yield response.follow(next_page, self.parse_comment)
# 解析创作者请求
def parse_composer(self, response):
banner, = re.findall("background-image:url\((.+?)\)",
response.xpath("//div[@class='banner-wrap']/@style").get())
composer = ComposerItem()
composer["banner"] = banner
composer["cid"] = response.meta["cid"]
composer["name"] = my_strip(response.xpath("//p[contains(@class,'creator-name')]/text()").get())
composer["intro"] = my_strip(response.xpath("//p[contains(@class,'creator-desc')]/text()").get())
composer["like_counts"] = convert_int(response.xpath("//span[contains(@class,'like-counts')]/text()").get())
composer["fans_counts"] = convert_int(response.xpath("//span[contains(@class,'fans-counts')]/text()").get())
composer["follow_counts"] = convert_int(
response.xpath("//span[@class='follow-wrap']/span[contains(@class,'fw_600')]/text()").get())
location = response.xpath("//span[contains(@class, 'icon-location')]/following-sibling::span[1]/text()").get()
if location:
composer["location"] = location.replace("\xa0", "")
else:
composer["location"] = ""
composer["career"] = response.xpath(
"//span[contains(@class, 'icon-career')]/following-sibling::span[1]/text()").get()
yield composer
item
import scrapy
from scrapy import Field
class PostItem(scrapy.Item):
"""保存视频信息的item"""
table_name = 'posts'
pid = Field()
title = Field()
thumbnail = Field()
preview = Field()
video = Field()
video_format = Field()
duration = Field()
category = Field()
created_at = Field()
play_counts = Field()
like_counts = Field()
description = Field()
tag = Field()
class CommentItem(scrapy.Item):
table_name = 'comments'
comment_id = Field()
pid = Field()
uid = Field()
avatar = Field()
uname = Field()
created_at = Field()
content = Field()
like_counts = Field()
referid = Field()
class ComposerItem(scrapy.Item):
table_name = 'composers'
cid = Field()
banner = Field()
avatar = Field()
verified = Field()
name = Field()
intro = Field()
like_counts = Field()
fans_counts = Field()
follow_counts = Field()
location = Field()
career = Field()
class CopyrightItem(scrapy.Item):
table_name = 'copyrights'
pcid = Field()
pid = Field()
cid = Field()
roles = Field()
pipelines
存储在mysql中
import pymysql
# 保存在mysql中
class MysqlPipeline:
# 开始调用一次
def open_spider(self, spider):
self.conn = pymysql.connect(
host="127.0.0.1",
port=3306,
db="spider",
user="jiang",
password="jiang",
charset="utf8"
)
self.cur = self.conn.cursor()
# 关闭调用一次
def close_spider(self, spider):
self.cur.close()
slice.conn.close()
# 每产生一个调用一次
def process_item(self, item, spider):
# keys = item.keys()
# values = list(item.values) # 是一个元组-->集合
keys, values = zip(*item.items())
sql = "insert into {}({}) values({}) ON DUPLICATE KEY UPDATE {}".format(
item.table_name,
','.join(keys),
','.join(['%s']*len(keys)),
",".join(["`{}`=%s".format(key) for key in keys])
)
self.cur.execute(sql, values*2)
self.conn.commit()
# 输出语句
print(self.cur._last_executed)
return item
settings
ITEM_PIPELINES = {
'xpc.pipelines.MysqlPipeline': 300,
}
middleware
动态ip代理
from collections import defaultdict
from scrapy import signals
from scrapy.exceptions import NotConfigured
import random
import redis
from twisted.internet.error import ConnectionRefusedError, TimeoutError
class RandomProxyMiddleware(object):
def __init__(self, settings):
# 初始化变量和配置
self.r = redis.Redis(host="host", password="password")
self.proxy_key = settings.get("PROXY_REDIS_KEY")
self.proxy_stats_key = self.proxy_key+"_stats"
self.state = defaultdict(int)
self.max_failed = 3
@property
def proxies(self):
proxies_b = self.r.lrange(self.proxy_key, 0, -1)
proxies = []
for proxy_b in proxies_b:
proxies.append(bytes.decode(proxy_b))
print("proxy是:", proxies)
return proxies
@classmethod
def from_crawler(cls, crawler):
# 1.创建中间件对象
if not crawler.settings.getbool("HTTPPROXY_ENABLED"):
raise NotConfigured
return cls(crawler.settings)
def process_request(self, request, spider):
# 3.为每个request对象分配一个随机的ip代理
if self.proxies and not request.meta.get("proxy"):
request.meta["proxy"] = random.choice(self.proxies)
def process_response(self, request, response, spider):
# 4.请求成功,调用process_response
cur_proxy = request.meta.get("proxy")
# 判断是否被对方封禁
if response.status in (401, 403):
print(f"{cur_proxy} got wrong code {self.state[cur_proxy]} times")
# 给相应的IP失败次数+1
# self.state[cur_proxy] += 1
self.r.hincrby(self.proxy_stats_key, cur_proxy, 1)
# 当某个IP的失败次数累计到一定数量
failed_times = self.r.hget(self.proxy_stats_key, cur_proxy) or 0
if int(failed_times) >= self.max_failed:
print("got wrong http code {%s} when use %s" % (response.status, request.get("proxy")))
# 可以认为该IP已经被对方封禁了,从代理池中将该IP删除
self.remove_proxy(cur_proxy)
del request.meta["proxy"]
# 重新请求重新安排调度下载
return request
return response
def process_exception(self, request, exception, spider):
# 4.请求失败,调用process_exception
cur_proxy = request.meta.get("proxy")
# 如果本次请求使用了代理,并且网络请求报错,认为该IP出现问题了
if cur_proxy and isinstance(exception, (ConnectionRefusedError, TimeoutError)):
print(f"error occur where use proxy {exception} {cur_proxy}")
self.remove_proxy(cur_proxy)
del request.meta["proxy"]
return request
def remove_proxy(self, proxy):
"""在代理IP列表中删除指定代理"""
if proxy in self.proxies:
self.r.lrem(self.proxy_key, 1, proxy)
print("remove %s from proxy list" % proxy)
settings
DOWNLOADER_MIDDLEWARES = {
'xpc.middlewares.RandomProxyMiddleware': 749,
}
# 代理ip
PROXY_REDIS_KEY = "discovery:proxy"
scrapy-redis的使用
pip install scrapy-redis
settings中配置
# Enables scheduling storing requests queue in redis.
SCHEDULER = "scrapy_redis.scheduler.Scheduler"
# Ensure all spiders share same duplicates filter through redis.
DUPEFILTER_CLASS = "scrapy_redis.dupefilter.RFPDupeFilter"
# REDIS_URL = 'redis://ip:6379'
REDIS_HOST = 'host'
REDIS_PORT = 6379
REDIS_PARAMS = {
'password': 'pass',
}
会根据redis中的数据进行继续爬取
使用RedisSpider
from scrapy_redis.spiders import RedisSpider
会根据redis中的url,自动进行抓取,是不需要start_urls的,我们只要在redis中的discovety:start_urls中增加一条url数据。(lpush diccovery:start_urls url)。
这样我们就可以使用运行多个爬虫,只要一有url,一起进行爬取
selenium
简单使用
from selenium import webdriver
driver = webdriver.Chrome()
driver.get("http://baidu.com")

进行关键字搜素
kw = driver.find_element_by_id("kw")
kw.send_keys("Python")
su = driver.get_element_by_id("su")
su.click()

获取标题
h3_list = driver.find_elements_by_tag_name("h3")
for h3 in h3_list:
print(h3.text)
输出
python官方下载_飞桨PaddlePaddle-开源深度学习平台
2020新版python_免费下载
python-python下载免费
python_万和-打造Python全栈开发工程师
python编程_在家就能让孩子学习编程的教育平台
Welcome to Python.org官方
Python(计算机程序设计语言)_百度百科
python官网 - Download Python | Python.org
Python 基础教程 | 菜鸟教程
Python还能火多久?
Python教程 - 廖雪峰的官方网站
你都用 Python 来做什么? - 知乎
Python3 * 和 ** 运算符_极客点儿-CSDN博客_python **
Python基础教程,Python入门教程(非常详细)
Python-薯条编程-在线教程-小班授课高薪就业培训
运行js
driver.execute_script("alert('123')")
driver.execute_script("window.scrollTo(300, document.body.scrollHeight)")
启动浏览器
from selenium import webdriver
# 启动chrome浏览器
driver = webdriver.Chrome()
# 指定chromedriver的路径并启动Chrome
driver = webdriver.Chrome(executable_path='/home/user/chromedirver')
#启动chrome-headless
from selenium.webdriver.chrome.options import Options
option = Options()
option.add_argument('--headless')
driver = webdriver.Chrome(chrome_options=option)
# 启动phantomjs
driver = webdriver.PhantomJs()
chrome-headless是无界面版的chrome,它替代了停止维护的phantomjs.
控制浏览器
# 访问某个url
driver.get('https://www.baidu.com')
# 刷新
driver.refersh()
# 前进
driver.forward()
# 后退
driver.back()
#退出
driver.quit()
# 当前的url
driver.current_url
# 截图
driver.save_screenshot('/tmp/test.png')
元素查找
18个find函数
# 根据元素的class属性的值查找
driver.find_element_by_class_name
# 用CSS选择器查找
driver.find_element_by_css_selector
# 根据元素的ID
driver.find_element_by_id
# 根据链接内的文本查找
find_element_by_link_text
# 根据元素的name属性查找
find_element_by_name
# 根据链接内的文本是否包含指定的查找文字
find_element_by_partial_link_text
# 根据标签名查找
find_element_by_tag_name
# 根据xpath表达示查找
find_element_by_xpath
执行js
# 将网页滚动到最底部
driver.execute_script('window.scrollTo(0, document.body.scrollHeight)')
# 执行异步的js函数
driver.execute_async_script('send_xml_request()')
等待,wait
隐式等待
# 查找某个(某些)元素,如果没有立即查找到,则等待10秒 driver.implicityly_wait(10)
显式等待
from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from selenium.webdriver.common.by import By # 查找一个按钮 # 最长等待10秒,直到找到查找条件中指定的元素 sort_btn = WebDriverWait(driver, 10).until( EC.presence_of_element_located((By.XPATH, './/div[@class="f-sort"]/a[2]')) )
这两个等待,显示等待通常更符合我们的程序逻辑。当我们对页面的加载方式还不太确定的时候,也可以隐式等待。
爬取京东
import sys
import time
from selenium import webdriver
from selenium.webdriver.common.keys import Keys
from selenium.common.exceptions import NoSuchElementException
from selenium.webdriver.chrome.options import Options
import pyexcel
if __name__ == '__main__':
keyword = '小米mix2s'
if len(sys.argv) > 1:
keyword = sys.argv[1]
# 设置不打开浏览器
options = Options()
options.add_argument("--headless")
driver = webdriver.Chrome(chrome_options=options)
driver.get("https://www.jd.com/")
# 输入关键字
kw = driver.find_element_by_id("key")
kw.send_keys(keyword)
kw.send_keys(Keys.ENTER)
# 截个图
driver.save_screenshot("1.png")
# 销量进行排序
time.sleep(2)
sort_btn = driver.find_element_by_xpath(".//div[@class='f-sort']/a[2]")
sort_btn.click()
driver.save_screenshot("2.png")
has_next = True
rows = []
while has_next:
time.sleep(3)
curr_page = driver.find_element_by_xpath("//div[contains(@class,'page')]//a[@class='curr']").text
print("----------------current page is %s----------------" % curr_page)
# 先获取整个商品区域的尺寸坐标
goods_list = driver.find_element_by_id("J_goodsList")
# 根据区域的大小决定往下滑动多少
y = goods_list.rect["y"] + goods_list.rect["height"]
driver.execute_script("window.scrollTo(0, %s)" % y)
# 获取所有的商品节点
products = driver.find_elements_by_class_name("gl-item")
for product in products:
row = {}
sku = product.get_attribute("data-sku")
row["price"] = product.find_element_by_css_selector(f"strong.J_{sku}").text
row["name"] = product.find_element_by_css_selector("div.p-name>a>em").text
row["comments"] = product.find_element_by_id(f"J_comment_{sku}").text
try:
row["shop"] = product.find_element_by_css_selector("div.p-shop>span>a").text
except NoSuchElementException as e:
row["shop"] = ""
rows.append(row)
print(row)
next_page = driver.find_element_by_css_selector("a.pn-next")
if "disabled" in next_page.get_attribute("class"):
has_next = False
else:
next_page.click()
pyexcel.save_as(records=rows, dest_file_name=f"{keyword}.xls")
# 退出
driver.quit()
结果:两张png图片和一个excel表格

抓取去哪儿网
import sys
import time
from selenium import webdriver
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
if __name__ == "__main__":
start_name = "北京"
dest_name = "青岛"
if len(sys.argv) > 1:
start_name = sys.argv[1]
dest_name = sys.argv[2]
driver = webdriver.Chrome()
driver.get("https://www.qunar.com/?ex_track=auto_4e0d874a")
# 起始地
start = WebDriverWait(driver, 10).until(
EC.presence_of_element_located((By.XPATH, "//input[@name='fromCity']"))
)
start.clear()
start.send_keys(start_name)
time.sleep(0.5)
start.send_keys(Keys.ENTER)
# 目的地
dest = WebDriverWait(driver, 10).until(
EC.presence_of_element_located((By.XPATH, "//input[@name='toCity']"))
)
# dest = driver.find_element_by_xpath("//input[@name='toCity']")
dest.send_keys(dest_name)
time.sleep(0.5)
dest.send_keys(Keys.ENTER)
search = driver.find_element_by_css_selector("button.button-search")
search.click()
# 获取航班数据
flights = WebDriverWait(driver, 10).until(
EC.presence_of_all_elements_located(By.XPATH, "//div[@class='m-airfly-lst']/div[@class='b-airfly']")
)
for flight in flights:
f_data = {}
airlines = flight.find_elements_by_xpath(".//div[@class='d-air']")
f_data["airlines"] = [airlines.text for airlines in airlines]
f_data["depart"] = flight.find_element_by_xpath(".//div[@class='sep-lf']").text
f_data["duration"] = flight.find_element_by_xpath(".//div[@class='sep-ct']").text
f_data["dest"] = flight.find_element_by_xpath(".//div[@class='sep-rt']").text
# 对价格的处理,价格有一个基础值和真实值的偏移
fake_price = list(flight.find_element_by_xpath(".//span[@class='prc_wp']/em/b[1]").text)
covers = flight.find_elements_by_xpath(".//span[@class='prc_wp']/em/b[position()>1]")
for c in covers:
index = int(c.value_of_css_property('left')[:-2]) // c.size['width']
fake_price[index] = c.text
f_data["price"] = "".join(fake_price)
真实价格的计算
splash
文档
:point_right: 官方文档
安装
docker pull scrapinghub/splash
docker run -it -d -p 8050:8050 --rm scrapinghub/splash
使用
在浏览器输入ip+host,并请求京东
可以看到
输入http://localhost:8050/render.html?url=https://search.jd.com/Search?keyword=%E5%B0%8F%E7%B1%B310&enc=utf-8&suggest=1.def.0.V08–38s0&wq=%E5%B0%8F%E7%B1%B3&pvid=c18d37ab55764cc4ac71e124bc496035

cmd使用
curl "http://codekiller.top:8050/render.html?url=https://search.jd.com/Search?keyword=%E5%B0%8F%E7%B1%B310&enc=utf-8&suggest=1.def.0.V08--38s0&wq=%E5%B0%8F%E7%B1%B3&pvid=c18d37ab55764cc4ac71e124bc496035" -o 小米.html
打开htm文件
操作(获取所有价格)
from lxml import etree file = open('C:\\Users\\MyPC\\小米.html', "r", encoding="UTF-8") text = file.read() selector = etree.HTML(text) prices = selector.xpath("//div[@class='p-price']/strong/i/text()") print(prices)
爬取京东
from urllib.parse import urlparse, urlencode, quote
from lxml import etree
import requests
ua = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/81.0.4044.138 Safari/537.36"
headers = {"User-Agent": ua}
keyword = "小米"
params = dict(
keyword=keyword,
enc="utf-8",
wq=keyword,
pvid="57486a4adb40455dbba829de75133672"
)
query_string = "&".join(("%s=%s" % (K, V)) for K, V in params.items())
jd_url = "https://search.jd.com/Search?" + query_string
url = "http://codekiller.top:8050/render.html?url=" + quote(jd_url)
r = requests.get(url, headers=headers)
selector = etree.HTML(r.text)
price_list = selector.xpath("//div[@class='p-price']/strong/i/text()")
name_list = selector.xpath("//div[contains(@class,'p-name')]/a/em/text()")
for name, price in zip(price_list, name_list):
print(name, price)
反爬虫
User-Agent识别
修改请求头信息里的User-Agent
请求头信息识别
比如说referer, content-type,请求方法(POST, GET)
构造相应的请求头信息。比如说referer,我们在提取URL的时候,要把URL所在页面的URL也存储起来,并放到request.headers。
异步加载
我们需要分析页面的网络请求,从中找出和我们想要的数据相关的请求,并分析它的请求头信息、参数、cookie,然后根据这些信息构造我们的请求。通常来说都是ajax请求,也有图片请求,比如图片的lazy load,通过js在页面加载后修改图片的src属性。一般都会有其他的自定义属性存在,比如说”_src”。总之,可以找到一些ID或者链接。注意观察相关dom节点树上的特殊属性。
请求参数加密
一般是在前端通过一定的计算,构造出一个哈希值。需要分析前端的代码,找出具体计算的代码逻辑,并用python再实现。如果前端的代码经过混淆,并且代码量十分巨大,可以使用selenium或者splash等引擎去请求。但是,如果爬取的数据需求量比较大,我们还是要通过直接调用对方接口的形式去获取数据。
请求结果加密
json数据里面加密
比如携程酒店房型列表接口,用它自己的js解密,或者分析它的js前端逻辑,用python代码实现出来。
CSS加密
比如大众点评,通过CSS样式去代替某个字符。我们需要同时爬取CSS文件,并且分析CSS文件内的样式,最后定位到svg文件,并分析提取svg内的内容,完成替换。
字体加密
比如猫眼电影。每次随机返回一个字体文件,并且字符也是随机的。需要每次下载对应的字体文件,并解析字体文件和字符之间的对应关系。
Cookie限制
登录、session限制,比如新片场,拿到登录以后的cookie,然后set到头信息里面,这样请求的时候就相当于登录了。
IP频率限制
需要准备大量的IP代理,获得IP代理的方式有:
- 自己搭建代理服务器(tinyproxy, squid+动态拨号,DDNS)
- 付费购买
- 爬取公开网络上代理(可用性比较低)
控制爬取频率,保持不被封的情况下的最合适的并发数量。
验证码
- 尝试可否绕过前端验证,直接请求具体的接口,以绕开验证码.
- 可以用图片识别库去识别某些比较简单的验证码
- 接入云打码平台
- 用机器学习训练验证码的图片库,然后识别