一百行代码实现异步爬虫


一个优雅的爬虫需要一下这些东西:

  • 请求器
  • 页面解析器
  • 链接生成器
  • 调度器

请求器

负责发送请求。

页面解析器

负责从页面上解析出继续爬的链接。

链接生成器

负责处理继续爬虫的链接并放入队列。

调度器

决定链接是否应该被爬去的核心部件。

异步

同时有多个请求在发送,即时异步爬虫。

代码

相关代码已上传到 Github[https://github.com/EINDEX/100-line-async-spider]

import aiohttp, asyncio, aiofiles, lxml, pathlib, sys, re, lxml.html
from urllib.parse import urlparse
from pathlib import Path
from hashlib import md5

MAX_GET, MAX_QUEUE_SIZE, MAX_WORKER = 10000, 100, 20
spider_url_set, spider_content_set = set(), set()
status = {"success": 0, "all": 0, "same": 0, "same_content":0}

async def fetch(url):
    global MAX_GET
    if MAX_GET < 0:
        return
    async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(ssl=False), timeout=aiohttp.ClientTimeout(total=1)) as session:
        async with session.get(url) as response:
            return await response.text()

async def savefile(path, data):
    path = Path(path)
    path.parent.mkdir(parents=True, exist_ok=True)
    async with aiofiles.open(path, mode='w') as f:
        await f.write(data)

def data_analysis(data):
    try:
        html = lxml.html.fromstring(data)
        return html.xpath('//a/@href')
    except:
        return []

def url_analysis(endpoint, urls):
    for url in urls:
        if not url or url == '#' or 'javascript' in url:
            continue
        elif url.startswith('/'):
            if endpoint.endswith('/'):
                endpoint = endpoint[:-1]
            yield endpoint + url
        elif url.endswith('.html') or url.endswith('.htm') or url.endswith('.shtml') or url.endswith('/') or '?' in url:
            yield url

def path_gene(endpoint):
    result = urlparse(endpoint)
    return f'data/{"/".join(list(filter(lambda x:x, result.hostname.split(".")))[::-1])}{result.path+"/index.html" if result.path!="/" else "/index.html"}{result.query.replace("/","")}'

async def pushurl(url_iter, queue):
    global MAX_GET
    try:
        for url in url_iter:
            if MAX_GET > 0:
                await queue.put(url)
    except Exception as e:
        pass

async def spider(name,queue):
    global MAX_GET, spider_content_set, spider_url_set
    while MAX_GET > 0:
        try:
            endpoint = await queue.get()
            data = await fetch(endpoint)
            md5data = md5(data.encode()).hexdigest()
            if md5data in spider_content_set:
                status['same_content'] += 1
                continue
            MAX_GET -= 1
            status['all'] += 1
            spider_content_set.add(md5data)
            await savefile(path_gene(endpoint), data)
            asyncio.gather(asyncio.create_task(pushurl(url_analysis(endpoint, data_analysis(data)), queue)), return_exceptions=False)
            status['success'] += 1
        except Exception as e:
            pass

def exit():
    print('exit')
    for task in asyncio.Task.all_tasks():
        task.cancel()
    sys.exit(0)

async def main(first_endpoint):
    queue = asyncio.Queue(MAX_QUEUE_SIZE)
    await queue.put(first_endpoint)
    asyncio.gather(*[asyncio.create_task(spider(spider_id,queue)) for spider_id in range(MAX_WORKER)], return_exceptions=False)
    try:
        while True:
            await asyncio.sleep(1)
            print(queue.qsize(), MAX_GET, status)
            if MAX_GET <= 0 or not queue.qsize():
                exit()
    except:
        exit()

if __name__ == "__main__":
    asyncio.run(main('https://eindex.me/'))
    asyncio.get_event_loop().close()