“等了好久终于等到今天,梦里好久终于把梦实现”,脑海里不禁响起来刘德华这首歌。是啊,终于可以写我最喜欢的异步爬虫了。前面那么多章节,一步一步、循序渐进的讲解,实在是“唠叨”了不少,可是为了小猿们能由浅入深的学习爬虫,老猿我又不得不说那么多“唠叨”,可把我给憋死了,今天就大书特书异步爬虫,说个痛快!
关于异步IO这个概念,可能有些小猿们不是非常明白,那就先来看看异步IO是怎么回事儿。
为了大家能够更形象得理解这个概念,我们拿放羊来打个比方:
- 下载请求开始,就是放羊出去吃草;
- 下载任务完成,就是羊吃饱回羊圈。
同步放羊的过程就是这样的:
羊倌儿小同要放100只羊,他就先放一只羊出去吃草,等羊吃饱了回来在放第二只羊,等第二只羊吃饱了回来再放第三只羊出去吃草……这样放羊的羊倌儿实在是……
再看看异步放羊的过程:
羊倌儿小异也要放100只羊,他观察后发现,小同放羊的方法比较笨,他觉得草地一下能容下10只羊(带宽)吃草,所以它就一次放出去10只羊等它们回来,然后他还可以给羊剪剪羊毛。有的羊吃得快回来的早,他就把羊关到羊圈接着就再放出去几只,尽量保证草地上都有10只羊在吃草。
很明显,异步放羊的效率高多了。同样的,网络世界里也是异步的效率高。
到了这里,可能有小猿要问,为什么不用多线程、多进程实现爬虫呢? 没错,多线程和多进程也可以提高前面那个同步爬虫的抓取效率,但是异步IO提高的更多,也更适合爬虫这个场景。后面机会我们可以对比一下三者抓取的效率。
1. 异步的downloader
还记得我们之前使用requests实现的那个downloader吗?同步情况下,它很好用,但不适合异步,所以我们要先改造它。幸运的是,已经有aiohttp模块来支持异步http请求了,那么我们就用aiohttp来实现异步downloader。
async def fetch(session, url, headers=None, timeout=9):
_headers = {
'User-Agent': ('Mozilla/5.0 (compatible; MSIE 9.0; '
'Windows NT 6.1; Win64; x64; Trident/5.0)'),
}
if headers:
_headers = headers
try:
async with session.get(url, headers=_headers, timeout=timeout) as response:
status = response.status
html = await response.read()
encoding = response.get_encoding()
if encoding == 'gb2312':
encoding = 'gbk'
html = html.decode(encoding, errors='ignore')
redirected_url = str(response.url)
except Exception as e:
msg = 'Failed download: {} | exception: {}, {}'.format(url, str(type(e)), str(e))
print(msg)
html = ''
status = 0
redirected_url = url
return status, html, redirected_url
这个异步的downloader,我们称之为fetch(),它有两个必须参数:
- seesion: 这是一个aiohttp.ClientSession的对象,这个对象的初始化在crawler里面完成,每次调用fetch()时,作为参数传递。
- url:这是需要下载的网址。
实现中使用了异步上下文管理器(async with),编码的判断我们还是用cchardet来实现。
有了异步下载器,我们的异步爬虫就可以写起来啦~
2. 异步新闻爬虫
跟同步爬虫一样,我们还是把整个爬虫定义为一个类,它的主要成员有:
- self.urlpool 网址池
- self.loop 异步的事件循环
- self.seesion aiohttp.ClientSession的对象,用于异步下载
- self.db 基于aiomysql的异步数据库连接
self._workers
当前并发下载(放出去的羊)的数量
通过这几个主要成员来达到异步控制、异步下载、异步存储(数据库)的目的,其它成员作为辅助。爬虫类的相关方法,参加下面的完整实现代码:
#!/usr/bin/env python3
# File: news-crawler-async.py
# Author: veelion
import traceback
import time
import asyncio
import aiohttp
import urllib.parse as urlparse
import farmhash
import lzma
import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
import sanicdb
from urlpool import UrlPool
import functions as fn
import config
class NewsCrawlerAsync:
def __init__(self, name):
self._workers = 0
self._workers_max = 30
self.logger = fn.init_file_logger(name+ '.log')
self.urlpool = UrlPool(name)
self.loop = asyncio.get_event_loop()
self.session = aiohttp.ClientSession(loop=self.loop)
self.db = sanicdb.SanicDB(
config.db_host,
config.db_db,
config.db_user,
config.db_password,
loop=self.loop
)
async def load_hubs(self,):
sql = 'select url from crawler_hub'
data = await self.db.query(sql)
self.hub_hosts = set()
hubs = []
for d in data:
host = urlparse.urlparse(d['url']).netloc
self.hub_hosts.add(host)
hubs.append(d['url'])
self.urlpool.set_hubs(hubs, 300)
async def save_to_db(self, url, html):
urlhash = farmhash.hash64(url)
sql = 'select url from crawler_html where urlhash=%s'
d = await self.db.get(sql, urlhash)
if d:
if d['url'] != url:
msg = 'farmhash collision: %s <=> %s' % (url, d['url'])
self.logger.error(msg)
return True
if isinstance(html, str):
html = html.encode('utf8')
html_lzma = lzma.compress(html)
sql = ('insert into crawler_html(urlhash, url, html_lzma) '
'values(%s, %s, %s)')
good = False
try:
await self.db.execute(sql, urlhash, url, html_lzma)
good = True
except Exception as e:
if e.args[0] == 1062:
# Duplicate entry
good = True
pass
else:
traceback.print_exc()
raise e
return good
def filter_good(self, urls):
goodlinks = []
for url in urls:
host = urlparse.urlparse(url).netloc
if host in self.hub_hosts:
goodlinks.append(url)
return goodlinks
async def process(self, url, ishub):
status, html, redirected_url = await fn.fetch(self.session, url)
self.urlpool.set_status(url, status)
if redirected_url != url:
self.urlpool.set_status(redirected_url, status)
# 提取hub网页中的链接, 新闻网页中也有“相关新闻”的链接,按需提取
if status != 200:
return
if ishub:
newlinks = fn.extract_links_re(redirected_url, html)
goodlinks = self.filter_good(newlinks)
print("%s/%s, goodlinks/newlinks" % (len(goodlinks), len(newlinks)))
self.urlpool.addmany(goodlinks)
else:
await self.save_to_db(redirected_url, html)
self._workers -= 1
async def loop_crawl(self,):
await self.load_hubs()
last_rating_time = time.time()
counter = 0
while 1:
tasks = self.urlpool.pop(self._workers_max)
if not tasks:
print('no url to crawl, sleep')
await asyncio.sleep(3)
continue
for url, ishub in tasks.items():
self._workers += 1
counter += 1
print('crawl:', url)
asyncio.ensure_future(self.process(url, ishub))
gap = time.time() - last_rating_time
if gap > 5:
rate = counter / gap
print('\tloop_crawl() rate:%s, counter: %s, workers: %s' % (round(rate, 2), counter, self._workers))
last_rating_time = time.time()
counter = 0
if self._workers > self._workers_max:
print('====== got workers_max, sleep 3 sec to next worker =====')
await asyncio.sleep(3)
def run(self):
try:
self.loop.run_until_complete(self.loop_crawl())
except KeyboardInterrupt:
print('stopped by yourself!')
del self.urlpool
pass
if __name__ == '__main__':
nc = NewsCrawlerAsync('yrx-async')
nc.run()
爬虫的主流程是在方法loop_crawl()里面实现的。它的主体是一个while循环,每次从self.urlpool里面获取定量的爬虫作为下载任务(从羊圈里面选出一批羊),通过ensure_future()开始异步下载(把这些羊都放出去)。而process()这个方法的流程是下载网页并存储、提取新的url,这就类似羊吃草、下崽等。
通过self._workers
和self._workers_max
来控制并发量。不能一直并发,给本地CPU、网络带宽带来压力,同样也会给目标服务器带来压力。
至此,我们实现了同步和异步两个新闻爬虫,分别实现了NewsCrawlerSync和NewsCrawlerAsync两个爬虫类,他们的结构几乎完全一样,只是抓取流程一个是顺序的,一个是并发的。小猿们可以通过对比两个类的实现,来更好的理解异步的流程。
爬虫知识点
1. uvloop模块
uvloop这个模块是用Cython编写建立在libuv库之上,它是asyncio内置事件循环的替代,使用它仅仅是多两行代码而已:
import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
uvloop使得asyncio很快,比odejs、gevent和其它Python异步框架的快至少2倍,接近于Go语言的性能。
这是uvloop作者的性能对比测试。
目前,uvloop不支持Windows系统和Python 3.5 及其以上版本,这在它源码的setup.py文件中可以看到:
if sys.platform in ('win32', 'cygwin', 'cli'):
raise RuntimeError('uvloop does not support Windows at the moment')
vi = sys.version_info
if vi < (3, 5):
raise RuntimeError('uvloop requires Python 3.5 or greater')
所以,使用Windows的小猿们要运行异步爬虫,就要把uvloop那两行注释掉哦。
思考题
1. 给同步的downloader()或异步的fetch()添加功能
或许有些小猿还没见过这样的html代码,它出现在<head>
里面:
<meta http-equiv="refresh" content="5; url=https://example.com/">
它的意思是,告诉浏览器在5秒之后跳转到另外一个url:https://example.com/
。
那么问题来了,请给downloader(fetch())添加代码,让它支持这个跳转。
2. 如何控制hub的刷新频率,及时发现最新新闻
这是我们写新闻爬虫要考虑的一个很重要的问题,我们实现的新闻爬虫中并没有实现这个机制,小猿们来思考一下,并对手实现实现。
到这老猿要讲的实现一个异步定向新闻爬虫已经讲完了,感谢你的阅读,有任何建议和问题请再下方留言,我会一一回复你,你也可以关注 猿人学 公众号,那里可以及时看到我新发的文章。
后面的章节,是介绍如何使用工具,比如如何使用charles抓包,如何管理浏览器cookie,如何使用selenium等等,也欢迎你的阅读。

我的公众号:猿人学 Python 上会分享更多心得体会,敬请关注。
***版权申明:若没有特殊说明,文章皆是猿人学 yuanrenxue.con 原创,没有猿人学授权,请勿以任何形式转载。***
请问asyncio.new_event_loop() 和asyncio.get_event_loop()啥区别啊