博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
简单分布式爬虫
阅读量:6324 次
发布时间:2019-06-22

本文共 10147 字,大约阅读时间需要 33 分钟。

# url管理器
# url管理器import pickleimport hashlibclass UrlManager():    def __init__(self):        self.new_urls = self.load_progress('new_urls.txt')  # 未爬取url集合        self.old_urls = self.load_progress('old_urls.txt')  # 已爬取集合    def has_new_url(self):        '''        判断是否有未爬取的URL        :return:        '''        return self.new_url_size() != 0    def get_new_url(self):        '''        获取一个未爬取的URL        :return:        '''        new_url = self.new_urls.pop()        m = hashlib.md5()        m.update(new_url)        self.old_urls.add(m.hexdigest()[8:-8])        return new_url    def add_new_url(self, url):        '''        将新的URL添加到未爬取的集合中        :param url: 单个URL        :return:        '''        if url is None:            return        m = hashlib.md5()        m.update(url)        url_md5 = m.hexdigest()[8:-8]        if url not in self.new_urls and url_md5 not in self.old_urls:            self.new_urls.add(url)    def add_new_urls(self, urls):        '''        将新的URL添加到未爬取的URL集合中        :param urls: URL集合        :return:        '''        if urls is None or len(urls) == 0:            return        for url in urls:            self.add_new_url(url)    def new_url_size(self):        '''        获取未爬取URL集合的大小        :return:        '''        return len(self.new_urls)    def old_url_size(self):        '''        获取已经爬取的URL集合的大小        :return:        '''        return len(self.old_urls)    def save_progress(self, path, data):        '''        保存进度        :param path: 文件路径        :param data: 数据        :return:        '''        with open(path, 'wb') as f:            pickle.dump(data, f)    def load_progress(self, path):        '''        从本地文件加载进度        :param path: 文件路径        :return: 返回set集合        '''        print('[+] 从文件加载进度:%s' % path)        try:            with open(path, 'rb') as f:                tmp = pickle.load(f)                return tmp        except:            print('[!] 文件无效,创建:%s' % path)        return set()
#爬虫管理器from multiprocessing.managers import BaseManagerfrom .HTML_downloader import HtmlDownloaderfrom .HTML_parser import HtmlParserclass Spiderwork():    def __init__(self):        BaseManager.register('get_task_queue')        BaseManager.register('get_result_queue')        server_addr = '127.0.0.1'        print('connect to %s ....'% server_addr)        self.m = BaseManager(address=(server_addr,8001),authkey='baike'.encode('utf-8'))        self.m.connect()        self.task = self.m.get_task_queue()        self.result = self.m.get_result_queue()        self.downloader = HtmlDownloader()        self.parser = HtmlParser()        print('init finshed..')    def crawl(self):        while True:            try:                if not self.task.empty():                    url = self.task.get()                    if url =='end':                        print('控制节点通知爬虫节点停止工作。')                        self.result.put({
'new_urls':'end','data':'end'}) return print('爬虫节点正在解析:%s' % url.encode('utf-8')) content=self.downloader.download(url) new_urls,data=self.parser.parser(url,content) self.result.put({
'new_urls':url,'data':data}) except EOFError as e : print('链接工作节点失败') return except Exception as e : print(e) print('crawl fial')if __name__ =='__main__': spider = Spiderwork() spider.crawl()
# HTML解析器import refrom urllib import parsefrom bs4 import BeautifulSoupclass HtmlParser():    def parser(self,page_url,html_cont):        '''        用于解析网页内容,抽取URL和数据        :param page_url: 下载页面的URL        :param html_cont: 下载的网页内容        :return:        '''        if page_url is None or html_cont is None:            return        soup = BeautifulSoup(html_cont,'html.parser')        new_urls = self._get_new_urls(page_url,soup)        new_data = self._get_new_data(page_url,soup)        return new_urls,new_data    def _get_new_urls(self,page_url,soup):        '''        抽取新的URL集合        :param page_url: 下载页面的URL        :param soup: soup        :return:        '''        new_urls = set()        #抽取符合要求的a标签        links = soup.find_all('a',href = re.compile(r'/item/.'))        for link in links:            # 提取href属性            new_url = link['href']            # 拼接成完整的网址            new_full_url = parse.urljoin(page_url,new_url)            new_urls.add(new_full_url)        return new_urls    def _get_new_data(self,page_url,soup):        '''        抽取有效数据        :param page_url: 下载页面URL        :param soup:        :return: 返回有效数据        '''        data = {}        data['url'] = page_url        title = soup.find('dd',class_ = 'lemmaWgt-lemmaTitle-title').find('h1')        data['title'] = title.text        summary = soup.find('div',class_ = 'lemma-summary')        #获取tag中包含的所有文本内容        data['summary'] = summary.text        return data
# HTML下载器import requestsclass HtmlDownloader():    def download(self,url):        if url is None:            return None        headers = {
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel …) Gecko/20100101 Firefox/57.0'.encode('utf-8')} r = requests.get(url,headers=headers) if r.status_code ==200: r.encoding = 'utf-8' return r.text return None
# 数据存储器import codecsimport timeclass DataOutput():    def __init__(self):        self.filepath = 'baike_%s.html' % (time.strftime('%Y_%m_%d_%H_%M_%S', time.localtime()))        self.output_head(self.filepath)        self.datas = []    def store_data(self, data):        if data is None:            return        self.datas.append(data)        if len(self.datas) > 10:            self.output_html(self.filepath)    def output_head(self, path):        '''        将HTML头写进去        :param path:        :return:        '''        fout = codecs.open(path, 'w', encoding='utf-8')        fout.write('')        fout.write('
') fout.write('
') fout.close() def output_html(self, path): ''' 将数据写入HTML文件中 :return: ''' fout = codecs.open(path, 'a', encoding='utf-8') for data in self.datas: fout.write('
') fout.write('
' % data['url']) fout.write('
' % data['title']) fout.write('
' % data['summary']) fout.write('
') self.datas.remove(data) fout.write('
%s %s %s
') fout.write('') fout.write('') fout.close() def output_end(self, path): ''' 将HTML尾写进去 :param path: :return: ''' fout = codecs.open(path, 'a', encoding='utf-8') fout.write('') fout.write('') fout.write('') fout.close()
#控制调度器import random,time,queuefrom multiprocessing.managers import BaseManagerfrom multiprocessing import Processfrom .URLManager import UrlManagerfrom .Data_store import DataOutputclass NodeManager():    def start_Manager(self,url_q,result_q):        '''        创建一个分布式管理器        :param url_q: url队列        :param result_q: 结果队列        :return:        '''        BaseManager.register('get_task_queue', callable=lambda: url_q)        BaseManager.register('get_result_queue', callable=lambda: result_q)        manager = BaseManager(address=('', 8001), authkey='baike'.encode('utf-8'))        return manager    def url_manager_proc(self,url_q,conn_q,root_url):        url_manager = UrlManager()        url_manager.add_new_urls(root_url)        while True:            while (url_manager.has_new_url()):                #从URL管理器获取新的URL                new_url = url_manager.get_new_url()                # 将新URL发送给工作节点                url_q.put(new_url)                print('old_url=',url_manager.old_url_size())                # 判断,当爬取2000个链接后关闭并保存                if (url_manager.old_url_size()>2000):                    url_q.put('end')                    print('控制节点发起结束通知')                    # 关闭管理节点。同时存储set状态                    url_manager.save_progress('new_urls.txt',url_manager.new_urls)                    url_manager.save_progress('old_urls.txt',url_manager.old_urls)                    return            try:                if not conn_q.empty():                    urls = conn_q.get()                    url_manager.add_new_urls(urls)            except BaseException as e:                time.sleep(0.1)    def result_solve_proc(self,result_q,conn_q,store_q):        while True:            try:                if not result_q.empty():                    content= result_q.get(True)                    if content['new_urls'] =='end':                        #结果分析进程接受通知然后结束                        print('结果分析进程接收通知然后结束')                        store_q.put('end')                        return                    conn_q.put(content['new_urls']) # url为set类型                    store_q.put(content['data'])#解析出来的数据为dict类型                else:                    time.sleep(0.1)            except BaseException as e:                time.sleep(0.1)    def store_proc(self,store_q):        output = DataOutput()        while True:            if not store_q.empty():                data = store_q.get()                if data =='end':                    print('存储进程接受通知然后结束')                    output.output_end(output.filepath)                    return                output.store_data(data)            else:                time.sleep(0.1)if __name__ =='__main__':    # 初始化4个队列    url_q = queue.Queue()    result_q = queue.Queue()    store_q = queue.Queue()    conn_q = queue.Queue()    # 创建分布式管理器    node = NodeManager()    manager = node.start_Manager(url_q,result_q)    # 创建URL管理进程,数据提取进程和数据存储进程    url_manager_proc = Process(target=node.url_manager_proc,args=(url_q,conn_q,'http://baike.baidu.com/view/284853.htm'))    result_solve_proc = Process(target=node.result_solve_proc,args=(result_q,conn_q,store_q))    store_proc = Process(target=node.store_proc,args=(store_q,))    # 启动3个进程和分布式管理器    url_manager_proc.start()    result_solve_proc.start()    store_proc.start()    manager.get_server().serve_forever()

 

转载于:https://www.cnblogs.com/Erick-L/p/7719349.html

你可能感兴趣的文章