对于比较大型的爬虫来说,URL管理的管理是个核心问题,管理不好,就可能重复下载,也可能遗漏下载。这里,我们设计一个URL Pool来管理URL。
这个URL Pool就是一个生产者-消费者模式:
依葫芦画瓢,URLPool就是这样的
我们从网址池的使用目的出发来设计网址池的接口,它应该具有以下功能:
- 往池子里面添加URL;
- 从池子里面取URL以下载;
- 池子内部要管理URL状态;
前面我提到URL的状态有以下4中:
- 已经下载成功
- 下载多次失败无需再下载
- 正在下载
- 下载失败要再次尝试
前两个是永久状态,也就是已经下载成功的不再下载,多次尝试后仍失败的也就不再下载,它们需要永久存储起来,以便爬虫重启后,这种永久状态记录不会消失,已经成功下载的URL不再被重复下载。永久存储的方法有很多种:
比如,直接写入文本文件,但它不利于查找某个URL是否已经存在文本中;
比如,直接写入MySQL等关系型数据库,它利用查找,但是速度又比较慢;
比如,使用key-value数据库,查找和速度都符合要求,是不错的选择!
我们这个URL Pool选用LevelDB来作为URL状态的永久存储。LevelDB是Google开源的一个key-value数据库,速度非常快,同时自动压缩数据。我们用它先来实现一个UrlDB作为永久存储数据库。
UrlDB 的实现
import leveldb
class UrlDB:
'''Use LevelDB to store URLs what have been done(succeed or faile)
'''
status_failure = b'0'
status_success = b'1'
def __init__(self, db_name):
self.name = db_name + '.urldb'
self.db = leveldb.LevelDB(self.name)
def set_success(self, url):
if isinstance(url, str):
url = url.encode('utf8')
try:
self.db.Put(url, self.status_success)
s = True
except:
s = False
return s
def set_failure(self, url):
if isinstance(url, str):
url = url.encode('utf8')
try:
self.db.Put(url, self.status_failure)
s = True
except:
s = False
return s
def has(self, url):
if isinstance(url, str):
url = url.encode('utf8')
try:
attr = self.db.Get(url)
return attr
except:
pass
return False
UrlDB将被UrlPool使用,主要有三个方法被使用:
- has(url) 查看是否已经存在某url
- set_success(url) 存储url状态为成功
- set_failure(url) 存储url状态为失败
UrlPool 的实现
而正在下载和下载失败次数这两个URL的状态只需暂时保存在内容即可,我们把它们放到UrlPool这个类中进行管理。接着我们来实现网址池:
#Author: veelion
import pickle
import leveldb
import time
import urllib.parse as urlparse
class UrlPool:
'''URL Pool for crawler to manage URLs
'''
def __init__(self, pool_name):
self.name = pool_name
self.db = UrlDB(pool_name)
self.waiting = {} # {host: set([urls]), } 按host分组,记录等待下载的URL
self.pending = {} # {url: pended_time, } 记录已被取出(self.pop())但还未被更新状态(正在下载)的URL
self.failure = {} # {url: times,} 记录失败的URL的次数
self.failure_threshold = 3
self.pending_threshold = 10 # pending的最大时间,过期要重新下载
self.waiting_count = 0 # self.waiting 字典里面的url的个数
self.max_hosts = ['', 0] # [host: url_count] 目前pool中url最多的host及其url数量
self.hub_pool = {} # {url: last_query_time, } 存放hub url
self.hub_refresh_span = 0
self.load_cache()
def __del__(self):
self.dump_cache()
def load_cache(self,):
path = self.name + '.pkl'
try:
with open(path, 'rb') as f:
self.waiting = pickle.load(f)
cc = [len(v) for k, v in self.waiting.items()]
print('saved pool loaded! urls:', sum(cc))
except:
pass
def dump_cache(self):
path = self.name + '.pkl'
try:
with open(path, 'wb') as f:
pickle.dump(self.waiting, f)
print('self.waiting saved!')
except:
pass
def set_hubs(self, urls, hub_refresh_span):
self.hub_refresh_span = hub_refresh_span
self.hub_pool = {}
for url in urls:
self.hub_pool[url] = 0
def set_status(self, url, status_code):
if url in self.pending:
self.pending.pop(url)
if status_code == 200:
self.db.set_success(url)
return
if status_code == 404:
self.db.set_failure(url)
return
if url in self.failure:
self.failure[url] += 1
if self.failure[url] > self.failure_threshold:
self.db.set_failure(url)
self.failure.pop(url)
else:
self.add(url)
else:
self.failure[url] = 1
self.add(url)
def push_to_pool(self, url):
host = urlparse.urlparse(url).netloc
if not host or '.' not in host:
print('try to push_to_pool with bad url:', url, ', len of ur:', len(url))
return False
if host in self.waiting:
if url in self.waiting[host]:
return True
self.waiting[host].add(url)
if len(self.waiting[host]) > self.max_hosts[1]:
self.max_hosts[1] = len(self.waiting[host])
self.max_hosts[0] = host
else:
self.waiting[host] = set([url])
self.waiting_count += 1
return True
def add(self, url, always=False):
if always:
return self.push_to_pool(url)
pended_time = self.pending.get(url, 0)
if time.time() - pended_time < self.pending_threshold:
print('being downloading:', url)
return
if self.db.has(url):
return
if pended_time:
self.pending.pop(url)
return self.push_to_pool(url)
def addmany(self, urls, always=False):
if isinstance(urls, str):
print('urls is a str !!!!', urls)
self.add(urls, always)
else:
for url in urls:
self.add(url, always)
def pop(self, count, hub_percent=50):
print('\n\tmax of host:', self.max_hosts)
# 取出的url有两种类型:hub=1, 普通=0
url_attr_url = 0
url_attr_hub = 1
# 1. 首先取出hub,保证获取hub里面的最新url.
hubs = {}
hub_count = count * hub_percent // 100
for hub in self.hub_pool:
span = time.time() - self.hub_pool[hub]
if span < self.hub_refresh_span:
continue
hubs[hub] = url_attr_hub # 1 means hub-url
self.hub_pool[hub] = time.time()
if len(hubs) >= hub_count:
break
# 2. 再取出普通url
left_count = count - len(hubs)
urls = {}
for host in self.waiting:
if not self.waiting[host]:
continue
url = self.waiting[host].pop()
urls[url] = url_attr_url
self.pending[url] = time.time()
if self.max_hosts[0] == host:
self.max_hosts[1] -= 1
if len(urls) >= left_count:
break
self.waiting_count -= len(urls)
print('To pop:%s, hubs: %s, urls: %s, hosts:%s' % (count, len(hubs), len(urls), len(self.waiting)))
urls.update(hubs)
return urls
def size(self,):
return self.waiting_count
def empty(self,):
return self.waiting_count == 0
UrlPool的实现有些复杂,且听我一一分解。
UrlPool 的使用
先看看它的主要成员及其用途:
- self.db 是一个UrlDB的示例,用来永久存储url的永久状态
- self.pool 是用来存放url的,它是一个字典(dict)结构,key是url的host,value是一个用来存储这个host的所有url的集合(set)。
- self.pending 用来管理正在下载的url状态。它是一个字典结构,key是url,value是它被pop的时间戳。当一个url被pop()时,就是它被下载的开始。当该url被set_status()时,就是下载结束的时刻。如果一个url被add() 入pool时,发现它已经被pended的时间超过pending_threshold时,就可以再次入库等待被下载。否则,暂不入池。
- self.failue 是一个字典,key是url,value是识别的次数,超过failure_threshold就会被永久记录为失败,不再尝试下载。
- hub_pool 是一个用来存储hub页面的字典,key是hub url,value是上次刷新该hub页面的时间.
以上成员就构成了我们这个网址池的数据结构,再通过以下成员方法对这个网址池进行操作:
1. load_cache() 和 dump_cache() 对网址池进行缓存
load_cache() 在 init()中调用,创建pool的时候,尝试去加载上次退出时缓存的URL pool;
dump_cache() 在 del() 中调用,也就是在网址池销毁前(比如爬虫意外退出),把内存中的URL pool缓存到硬盘。
这里使用了pickle 模块,这是一个把内存数据序列化到硬盘的工具。
** 2. set_hubs() 方法设置hub URL**
hub网页就是像百度新闻那样的页面,整个页面都是新闻的标题和链接,是我们真正需要的新闻的聚合页面,并且这样的页面会不断更新,把最新的新闻聚合到这样的页面,我们称它们为hub页面,其URL就是hub url。在新闻爬虫中添加大量的这样的url,有助于爬虫及时发现并抓取最新的新闻。
该方法就是将这样的hub url列表传给网址池,在爬虫从池中取URL时,根据时间间隔(self.hub_refresh_span)来取hub url。
** 3. add(), addmany(), push_to_pool() 对网址池进行入池操作**
把url放入网址池时,先检查内存中的self.pending是否存在该url,即是否正在下载该url。如果正在下载就不入池;如果正下载或已经超时,就进行到下一步;
接着检查该url是否已经在leveldb中存在,存在就表明之前已经成功下载或彻底失败,不再下载了也不入池。如果没有则进行到下一步;
最后通过push_to_pool() 把url放入self.pool中。存放的规则是,按照url的host进行分类,相同host的url放到一起,在取出时每个host取一个url,尽量保证每次取出的一批url都是指向不同的服务器的,这样做的目的也是为了尽量减少对抓取目标服务器的请求压力。力争做一个服务器友好的爬虫 O(∩_∩)O
** 4. pop() 对网址池进行出池操作**
爬虫通过该方法,从网址池中获取一批url去下载。取出url分两步:
第一步,先从self.hub_pool中获得,方法是遍历hub_pool,检查每个hub-url距上次被pop的时间间隔是否超过hub页面刷新间隔(self.hub_refresh_span),来决定hub-url是否应该被pop。
第二步,从self.pool中获取。前面push_to_pool中,介绍了pop的原则,就是每次取出的一批url都是指向不同服务器的,有了self.pool的特殊数据结构,安装这个原则获取url就简单了,按host(self.pool的key)遍历self.pool即可。
** 5. set_status() 方法设置网址池中url的状态**
其参数status_code 是http响应的状态码。爬虫在下载完URL后进行url状态设置。
首先,把该url成self.pending中删除,已经下载完毕,不再是pending状态;
接着,根据status_code来设置url状态,200和404的直接设置为永久状态;其它status就记录失败次数,并再次入池进行后续下载尝试。
通过以上成员变量和方法,我们把这个网址池(UrlPool)解析的清清楚楚。小猿们可以毫不客气的收藏起来,今后在写爬虫时可以用它方便的管理URL,并且这个实现只有一个py文件,方便加入到任何项目中。
爬虫知识点
1. 网址的管理
网址的管理,其目的就是为了:不重抓,不漏抓。
2. pickle 模块
把内存数据保存到硬盘,再把硬盘数据重新加载到内存,这是很多程序停止和启动的必要步骤。pickle就是实现数据在内存和硬盘之间转移的模块。
3. leveldb 模块
这是一个经典且强大的硬盘型key-value数据库,非常适合url-status这种结构的存储。
4. urllib.parse
解析网址的模块,在处理url时首先想到的模块就应该是它。
下一篇我们把mysql再封装一下。

我的公众号:猿人学 Python 上会分享更多心得体会,敬请关注。
***版权申明:若没有特殊说明,文章皆是猿人学 yuanrenxue.con 原创,没有猿人学授权,请勿以任何形式转载。***
老猿,我看你这个网站就做的不错,教教我们怎么爬你的网站?哈哈
hub_count = count * hubpercent // 100
大佬您好!上面这行是做何考虑的呢?
大佬们,有在windows上编译好的leveldb库吗
leveldb不会用……我来试下换成redis……
self.pool[host].add(url)
这条语句中add是字典的内置函数?(查了资料没有发现字典有add方法)还是我们自己实现的add(self, url, always)函数(但是这个函数提供两个参数), 不明白这条语句的执行,希望可以帮忙解释一下,感谢
老猿您好,请教您一个问题
我在anaconda里创建一个环境env1.
在这个环境下
我conda list显示的包比pip list要多。
比如snappy这个包,在conda list下是有的,但是在pip list下没有
import snappy时会报错ModuleNotFoundError: No module named ‘snappy’
困扰了我两天了,冒昧求教 希望能得到解答 谢谢
您好 ~
UrlDB 的实现里面有一行代码拼写错了:
try:
self.db.Put(url, self.state_success)
s = True
…
应为 : self.db.Put(url, self.status_success)