# url管理器
# url管理器import pickleimport hashlibclass UrlManager(): def __init__(self): self.new_urls = self.load_progress('new_urls.txt') # 未爬取url集合 self.old_urls = self.load_progress('old_urls.txt') # 已爬取集合 def has_new_url(self): ''' 判断是否有未爬取的URL :return: ''' return self.new_url_size() != 0 def get_new_url(self): ''' 获取一个未爬取的URL :return: ''' new_url = self.new_urls.pop() m = hashlib.md5() m.update(new_url) self.old_urls.add(m.hexdigest()[8:-8]) return new_url def add_new_url(self, url): ''' 将新的URL添加到未爬取的集合中 :param url: 单个URL :return: ''' if url is None: return m = hashlib.md5() m.update(url) url_md5 = m.hexdigest()[8:-8] if url not in self.new_urls and url_md5 not in self.old_urls: self.new_urls.add(url) def add_new_urls(self, urls): ''' 将新的URL添加到未爬取的URL集合中 :param urls: URL集合 :return: ''' if urls is None or len(urls) == 0: return for url in urls: self.add_new_url(url) def new_url_size(self): ''' 获取未爬取URL集合的大小 :return: ''' return len(self.new_urls) def old_url_size(self): ''' 获取已经爬取的URL集合的大小 :return: ''' return len(self.old_urls) def save_progress(self, path, data): ''' 保存进度 :param path: 文件路径 :param data: 数据 :return: ''' with open(path, 'wb') as f: pickle.dump(data, f) def load_progress(self, path): ''' 从本地文件加载进度 :param path: 文件路径 :return: 返回set集合 ''' print('[+] 从文件加载进度:%s' % path) try: with open(path, 'rb') as f: tmp = pickle.load(f) return tmp except: print('[!] 文件无效,创建:%s' % path) return set()
#爬虫管理器from multiprocessing.managers import BaseManagerfrom .HTML_downloader import HtmlDownloaderfrom .HTML_parser import HtmlParserclass Spiderwork(): def __init__(self): BaseManager.register('get_task_queue') BaseManager.register('get_result_queue') server_addr = '127.0.0.1' print('connect to %s ....'% server_addr) self.m = BaseManager(address=(server_addr,8001),authkey='baike'.encode('utf-8')) self.m.connect() self.task = self.m.get_task_queue() self.result = self.m.get_result_queue() self.downloader = HtmlDownloader() self.parser = HtmlParser() print('init finshed..') def crawl(self): while True: try: if not self.task.empty(): url = self.task.get() if url =='end': print('控制节点通知爬虫节点停止工作。') self.result.put({ 'new_urls':'end','data':'end'}) return print('爬虫节点正在解析:%s' % url.encode('utf-8')) content=self.downloader.download(url) new_urls,data=self.parser.parser(url,content) self.result.put({ 'new_urls':url,'data':data}) except EOFError as e : print('链接工作节点失败') return except Exception as e : print(e) print('crawl fial')if __name__ =='__main__': spider = Spiderwork() spider.crawl()
# HTML解析器import refrom urllib import parsefrom bs4 import BeautifulSoupclass HtmlParser(): def parser(self,page_url,html_cont): ''' 用于解析网页内容,抽取URL和数据 :param page_url: 下载页面的URL :param html_cont: 下载的网页内容 :return: ''' if page_url is None or html_cont is None: return soup = BeautifulSoup(html_cont,'html.parser') new_urls = self._get_new_urls(page_url,soup) new_data = self._get_new_data(page_url,soup) return new_urls,new_data def _get_new_urls(self,page_url,soup): ''' 抽取新的URL集合 :param page_url: 下载页面的URL :param soup: soup :return: ''' new_urls = set() #抽取符合要求的a标签 links = soup.find_all('a',href = re.compile(r'/item/.')) for link in links: # 提取href属性 new_url = link['href'] # 拼接成完整的网址 new_full_url = parse.urljoin(page_url,new_url) new_urls.add(new_full_url) return new_urls def _get_new_data(self,page_url,soup): ''' 抽取有效数据 :param page_url: 下载页面URL :param soup: :return: 返回有效数据 ''' data = {} data['url'] = page_url title = soup.find('dd',class_ = 'lemmaWgt-lemmaTitle-title').find('h1') data['title'] = title.text summary = soup.find('div',class_ = 'lemma-summary') #获取tag中包含的所有文本内容 data['summary'] = summary.text return data
# HTML下载器import requestsclass HtmlDownloader(): def download(self,url): if url is None: return None headers = { 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel …) Gecko/20100101 Firefox/57.0'.encode('utf-8')} r = requests.get(url,headers=headers) if r.status_code ==200: r.encoding = 'utf-8' return r.text return None
# 数据存储器import codecsimport timeclass DataOutput(): def __init__(self): self.filepath = 'baike_%s.html' % (time.strftime('%Y_%m_%d_%H_%M_%S', time.localtime())) self.output_head(self.filepath) self.datas = [] def store_data(self, data): if data is None: return self.datas.append(data) if len(self.datas) > 10: self.output_html(self.filepath) def output_head(self, path): ''' 将HTML头写进去 :param path: :return: ''' fout = codecs.open(path, 'w', encoding='utf-8') fout.write('') fout.write('
%s | ' % data['url']) fout.write('%s | ' % data['title']) fout.write('%s | ' % data['summary']) fout.write('
#控制调度器import random,time,queuefrom multiprocessing.managers import BaseManagerfrom multiprocessing import Processfrom .URLManager import UrlManagerfrom .Data_store import DataOutputclass NodeManager(): def start_Manager(self,url_q,result_q): ''' 创建一个分布式管理器 :param url_q: url队列 :param result_q: 结果队列 :return: ''' BaseManager.register('get_task_queue', callable=lambda: url_q) BaseManager.register('get_result_queue', callable=lambda: result_q) manager = BaseManager(address=('', 8001), authkey='baike'.encode('utf-8')) return manager def url_manager_proc(self,url_q,conn_q,root_url): url_manager = UrlManager() url_manager.add_new_urls(root_url) while True: while (url_manager.has_new_url()): #从URL管理器获取新的URL new_url = url_manager.get_new_url() # 将新URL发送给工作节点 url_q.put(new_url) print('old_url=',url_manager.old_url_size()) # 判断,当爬取2000个链接后关闭并保存 if (url_manager.old_url_size()>2000): url_q.put('end') print('控制节点发起结束通知') # 关闭管理节点。同时存储set状态 url_manager.save_progress('new_urls.txt',url_manager.new_urls) url_manager.save_progress('old_urls.txt',url_manager.old_urls) return try: if not conn_q.empty(): urls = conn_q.get() url_manager.add_new_urls(urls) except BaseException as e: time.sleep(0.1) def result_solve_proc(self,result_q,conn_q,store_q): while True: try: if not result_q.empty(): content= result_q.get(True) if content['new_urls'] =='end': #结果分析进程接受通知然后结束 print('结果分析进程接收通知然后结束') store_q.put('end') return conn_q.put(content['new_urls']) # url为set类型 store_q.put(content['data'])#解析出来的数据为dict类型 else: time.sleep(0.1) except BaseException as e: time.sleep(0.1) def store_proc(self,store_q): output = DataOutput() while True: if not store_q.empty(): data = store_q.get() if data =='end': print('存储进程接受通知然后结束') output.output_end(output.filepath) return output.store_data(data) else: time.sleep(0.1)if __name__ =='__main__': # 初始化4个队列 url_q = queue.Queue() result_q = queue.Queue() store_q = queue.Queue() conn_q = queue.Queue() # 创建分布式管理器 node = NodeManager() manager = node.start_Manager(url_q,result_q) # 创建URL管理进程,数据提取进程和数据存储进程 url_manager_proc = Process(target=node.url_manager_proc,args=(url_q,conn_q,'http://baike.baidu.com/view/284853.htm')) result_solve_proc = Process(target=node.result_solve_proc,args=(result_q,conn_q,store_q)) store_proc = Process(target=node.store_proc,args=(store_q,)) # 启动3个进程和分布式管理器 url_manager_proc.start() result_solve_proc.start() store_proc.start() manager.get_server().serve_forever()