使用Python爬取小说章节并存储到MongoDB数据库

1. 背景介绍

在本篇文章中,我们将展示如何使用Python爬取小说网站的章节内容并将其存储到MongoDB数据库中。我们会使用requests进行网页请求,lxml处理HTML,re进行正则表达式匹配,threading实现并发处理,以及pymongo连接MongoDB。

真实情况是写课设没数据,获取取一点

2. 环境和依赖

请确保你的环境已经安装了以下依赖包:

pip install requests lxml pymongo

3. 代码详解

下面是完整的代码实现。我们将按照功能模块进行解释。

3.1 引入必要的库

首先,我们需要引入所需的库:

import requests
from lxml import etree
import re
import threading
import time
from pymongo import MongoClient
3.2 设置请求头

为了防止被网站封禁,我们设置了请求头:

headers = {
    'User-Agent': '写你自己的'
}
3.3 初始化全局变量和信号量

为了控制并发和存储章节内容,我们设置了全局变量和信号量:

zhangjie_content = []  # 存储章节内容
semaphore = threading.Semaphore(20)  # 限制并发数量为20
3.4 MongoDB连接设置

连接到MongoDB数据库

client = MongoClient('mongodb://localhost:27017/')
db = client.novel_database
collection = db.novels
3.5 插入数据到MongoDB

定义一个辅助函数,将数据插入到MongoDB:

def insert_to_mongodb(title, novel_type, author, update_time, chapters):
    data = {
        "title": title,
        "novel_type": novel_type,
        "author": author,
        "update_time": update_time,
        "zhangjie": chapters
    }
    collection.insert_one(data)
    print(f"插入 {len(chapters)} 章成功:{title}")
3.6 爬取章节内容

定义爬取章节内容的函数:

def neirong(ur, url, s, retries=3):
    while retries > 0:
        try:
            reps = requests.get(rf'{ur}{url}', headers=headers)
            reps.raise_for_status()
            html = etree.HTML(reps.text)
            if html is None:
                print(f"解析 HTML 内容错误,URL: {ur}/{url}")
                return

            chapter = html.xpath('//*[@id="content"]/h1/text()')
            if not chapter:
                print(f"未找到章节标题,URL: {ur}/{url}")
                return

            chapter = chapter[0].strip()
            text = html.xpath('//*[@id="htmlContent"]/text()')
            if not text:
                print(f"未找到章节内容,URL: {ur}/{url}")
                return

            text = ''.join(text[1:])  # 连接文本内容
            zhangjie_content.append({"chapter": chapter, "text": text})

            return
        except requests.RequestException as e:
            print(f"请求错误,URL: {ur}{url}, 错误: {e}")
            retries -= 1
            time.sleep(1)  # 等待一段时间后重试
    print(f"重试次数过多,放弃 URL: {ur}{url}")
3.7 爬取章节列表

定义爬取章节列表的函数:

def zhangjie(url, retries=3):
    while retries > 0:
        try:
            reps = requests.get(url, headers=headers, timeout=10)
            reps.raise_for_status()
            html = etree.HTML(reps.text)
            if html is None:
                print(f"解析 HTML 内容错误,URL: {url}")
                return

            title = html.xpath('//*[@id="info"]/h1/text()')
            title = title[0].strip() if title else "未知书名"
            novel_type = html.xpath('//*[@id="maininfo"]/div[1]/a[2]/text()')
            novel_type = novel_type[0].strip() if novel_type else "未知类型"
            author = html.xpath('//*[@id="info"]/p[1]/a/text()')
            author = author[0].strip() if author else "未知作者"
            update_time = html.xpath('//*[@id="info"]/p[3]/text()')
            update_time = update_time[0].strip() if update_time else "未知时间"

            option_texts = html.xpath('/html/body/div[4]/div/div/select/option/text()')
            if not option_texts:
                print(f"未找到页码信息,URL: {url}")
                return

            zhang = re.findall(r'第\s*(\d+)\s*页\(末页\)', option_texts[-1])
            if not zhang:
                print(f"未找到页码匹配,URL: {url}")
                return
            zhang = int(zhang[0])
            print('开始爬取:', title)

            s = 0  # 设置爬取多少章
            for i in range(1, zhang + 1):
                if s >= 100:
                    break  # 已经爬取100章,跳出循环

                zhangjie_url = f'{url}/index_{i}.html'
                zhangjie_reps = requests.get(zhangjie_url, headers=headers, timeout=10)
                zhangjie_reps.raise_for_status()
                zhangjie_html = etree.HTML(zhangjie_reps.text)
                if zhangjie_html is None:
                    print(f"解析 HTML 内容错误,URL: {zhangjie_url}")
                    break

                zhangjieLis = zhangjie_html.xpath('/html/body/div[4]/div/ul/li/a/@href')
                if not zhangjieLis:
                    print(f"未找到章节列表,URL: {zhangjie_url}")
                    break

                threads = []

                for j in zhangjieLis:
                    if s >= 100:
                        break  # 已经爬取100章,跳出循环
                    thread = threading.Thread(target=crawl_with_semaphore, args=(neirong, url, j, s))
                    threads.append(thread)
                    thread.start()
                    time.sleep(0.1)
                    s += 1  # 统计章节数目

                for thread in threads:
                    thread.join()

            # 插入所有爬取的章节内容到MongoDB
            insert_to_mongodb(title, novel_type, author, update_time, list(zhangjie_content))
            zhangjie_content.clear()  # 清空章节内容列表

            print(f"已成功记录数据:{title}")

            return
        except requests.RequestException as e:
            print(f"请求错误,URL: {url}, 错误: {e}")
            retries -= 1
            time.sleep(1)  # 等待一段时间后重试
    print(f"重试次数过多,放弃 URL: {url}")
3.8 使用信号量控制并发

定义一个辅助函数,使用信号量控制并发数量:

def crawl_with_semaphore(target, *args):
    with semaphore:  # 使用信号量来控制并发数量
        target(*args)
3.9 主函数

定义主函数,从主页爬取小说列表并调用爬取章节的函数:

def main(i):
    main_url = rf'http://www.biqule.net/top/monthvisit/{i}.html'  # 月访问榜主页链接
    try:
        reps = requests.get(main_url, headers=headers, timeout=10)
        reps.raise_for_status()
        html = etree.HTML(reps.text)
        if html is None:
            print("解析 HTML 内容错误,主页 URL")
            return

        novels = html.xpath('//div/ul/li/span[@class="sp_2"]/a')
        novel_urls = [{"title": novel.text.strip(), "url": novel.attrib['href']} for novel in novels if
                      novel.text and 'href' in novel.attrib]
        for i in novel_urls:
            global lis
            lis = []
            zhangjie(i['url'])

    except requests.RequestException as e:
        print(f"请求错误,URL: {main_url}, 错误: {e}")
3.10 程序入口

定义程序入口,并调用主函数:

if __name__ == "__main__":
    for i in range(1, 51): # 1,51是从第一页爬到第五十页
        main(i)

4. 总结

通过本文的示例,我们展示了如何使用Python爬取小说网站的章节内容并将其存储到Mongo

5.完整代码

import requests
from lxml import etree
import re
import threading
import time
from pymongo import MongoClient

# 设置请求头部,防止被网站封禁
headers = {
    'User-Agent': '改成你自己的'
}

# 全局变量和信号量用于控制并发和存储章节内容
zhangjie_content = []  # 存储章节内容
semaphore = threading.Semaphore(20)  # 限制并发数量为20

# MongoDB连接设置
client = MongoClient('mongodb://localhost:27017/')
db = client.novel_database
collection = db.novels

def insert_to_mongodb(title, novel_type, author, update_time, chapters):
    """
    辅助函数,用于将数据插入到MongoDB中
    """
    data = {
        "title": title,
        "novel_type": novel_type,
        "author": author,
        "update_time": update_time,
        "zhangjie": chapters
    }
    collection.insert_one(data)
    print(f"插入 {len(chapters)} 章成功:{title}")

def neirong(ur, url, s, retries=3):
    while retries > 0:
        try:
            reps = requests.get(rf'{ur}{url}', headers=headers)
            reps.raise_for_status()
            html = etree.HTML(reps.text)
            if html is None:
                print(f"解析 HTML 内容错误,URL: {ur}/{url}")
                return

            chapter = html.xpath('//*[@id="content"]/h1/text()')
            if not chapter:
                print(f"未找到章节标题,URL: {ur}/{url}")
                return

            chapter = chapter[0].strip()
            text = html.xpath('//*[@id="htmlContent"]/text()')
            if not text:
                print(f"未找到章节内容,URL: {ur}/{url}")
                return

            text = ''.join(text[1:])  # 连接文本内容
            zhangjie_content.append({"chapter": chapter, "text": text})

            return
        except requests.RequestException as e:
            print(f"请求错误,URL: {ur}{url}, 错误: {e}")
            retries -= 1
            time.sleep(1)  # 等待一段时间后重试
    print(f"重试次数过多,放弃 URL: {ur}{url}")


def zhangjie(url, retries=3):
    while retries > 0:
        try:
            reps = requests.get(url, headers=headers, timeout=10)
            reps.raise_for_status()
            html = etree.HTML(reps.text)
            if html is None:
                print(f"解析 HTML 内容错误,URL: {url}")
                return

            title = html.xpath('//*[@id="info"]/h1/text()')
            title = title[0].strip() if title else "未知书名"
            novel_type = html.xpath('//*[@id="maininfo"]/div[1]/a[2]/text()')
            novel_type = novel_type[0].strip() if novel_type else "未知类型"
            author = html.xpath('//*[@id="info"]/p[1]/a/text()')
            author = author[0].strip() if author else "未知作者"
            update_time = html.xpath('//*[@id="info"]/p[3]/text()')
            update_time = update_time[0].strip() if update_time else "未知时间"

            option_texts = html.xpath('/html/body/div[4]/div/div/select/option/text()')
            if not option_texts:
                print(f"未找到页码信息,URL: {url}")
                return

            zhang = re.findall(r'第\s*(\d+)\s*页\(末页\)', option_texts[-1])
            if not zhang:
                print(f"未找到页码匹配,URL: {url}")
                return
            zhang = int(zhang[0])
            print('开始爬取:', title)

            s = 0  # 设置爬取多少章
            for i in range(1, zhang + 1):
                if s >= 100:
                    break  # 已经爬取100章,跳出循环

                zhangjie_url = f'{url}/index_{i}.html'
                zhangjie_reps = requests.get(zhangjie_url, headers=headers, timeout=10)
                zhangjie_reps.raise_for_status()
                zhangjie_html = etree.HTML(zhangjie_reps.text)
                if zhangjie_html is None:
                    print(f"解析 HTML 内容错误,URL: {zhangjie_url}")
                    break

                zhangjieLis = zhangjie_html.xpath('/html/body/div[4]/div/ul/li/a/@href')
                if not zhangjieLis:
                    print(f"未找到章节列表,URL: {zhangjie_url}")
                    break

                threads = []

                for j in zhangjieLis:
                    if s >= 100:
                        break  # 已经爬取100章,跳出循环
                    thread = threading.Thread(target=crawl_with_semaphore, args=(neirong, url, j, s))
                    threads.append(thread)
                    thread.start()
                    time.sleep(0.1)
                    s += 1  # 统计章节数目

                for thread in threads:
                    thread.join()

            # 插入所有爬取的章节内容到MongoDB
            insert_to_mongodb(title, novel_type, author, update_time, list(zhangjie_content))
            zhangjie_content.clear()  # 清空章节内容列表

            print(f"已成功记录数据:{title}")

            return
        except requests.RequestException as e:
            print(f"请求错误,URL: {url}, 错误: {e}")
            retries -= 1
            time.sleep(1)  # 等待一段时间后重试
    print(f"重试次数过多,放弃 URL: {url}")

def crawl_with_semaphore(target, *args):
    with semaphore:  # 使用信号量来控制并发数量
        target(*args)



# 主函数
def main(i):
    main_url = rf'http://www.biqule.net/top/monthvisit/{i}.html'  # 月访问榜主页链接
    try:
        reps = requests.get(main_url, headers=headers, timeout=10)
        reps.raise_for_status()
        html = etree.HTML(reps.text)
        if html is None:
            print("解析 HTML 内容错误,主页 URL")
            return

        novels = html.xpath('//div/ul/li/span[@class="sp_2"]/a')
        novel_urls = [{"title": novel.text.strip(), "url": novel.attrib['href']} for novel in novels if
                      novel.text and 'href' in novel.attrib]
        for i in novel_urls:
            global lis
            lis = []
            zhangjie(i['url'])

    except requests.RequestException as e:
        print(f"请求错误,URL: {main_url}, 错误: {e}")

if __name__ == "__main__":
    for i in range(1, 51):
        main(i)

5.2.升级版

  1. 使用异步 I/O
    使用异步库如 aiohttp 和 asyncio 可以显著提高网络请求的效率。

  2. 减少等待时间
    减少每次请求之间的等待时间。

  3. 使用连接池
    使用连接池可以重用连接,减少建立新连接的开销。

说明:

  1. 改用了 aiohttp 和 asyncio,以实现异步 I/O 操作。
  2. 使用信号量(semaphore)仍然控制并发数量,以避免过多请求导致的封禁。
  3. 将 fetch 函数封装请求逻辑,并且所有的网络请求都使用这个函数。
  4. 采用 asyncio.gather 同时处理多个任务。

这样,可以显著提升爬取速度,并且享受到异步 I/O 带来的性能提升。

import aiohttp
import asyncio
from lxml import etree
import re
from pymongo import MongoClient

# 全局变量和信号量用于控制并发和存储章节内容
zhangjie_content = []  # 存储章节内容
semaphore = asyncio.Semaphore(50)  # 限制并发数量为50

# MongoDB连接设置
client = MongoClient('mongodb://localhost:27017/')
db = client.novel_database
collection = db.novels


def insert_to_mongodb(title, novel_type, author, update_time, chapters, img_url, jianjie):
    """
    辅助函数,用于将数据插入到MongoDB中
    """
    data = {
        "title": title,
        "novel_type": novel_type,
        "author": author,
        "update_time": update_time,
        "zhangjie": chapters,
        'img_url': img_url,
        'jianjie': jianjie
    }
    collection.insert_one(data)
    print(f"插入 {len(chapters)} 章成功:{title}")


async def fetch(session, url):
    async with semaphore:  # 使用信号量来控制并发数量
        try:
            async with session.get(url) as response:
                return await response.text()
        except Exception as e:
            print(f"请求错误,URL: {url}, 错误: {e}")


async def neirong(session, base_url, url):
    try:
        html_str = await fetch(session, f'{base_url}{url}')
        html = etree.HTML(html_str)
        if html is None:
            print(f"解析 HTML 内容错误,URL: {base_url}{url}")
            return

        chapter = html.xpath('//*[@id="content"]/h1/text()')
        if not chapter:
            print(f"未找到章节标题,URL: {base_url}{url}")
            return

        chapter = chapter[0].strip()
        text = html.xpath('//*[@id="htmlContent"]/text()')
        if not text:
            print(f"未找到章节内容,URL: {base_url}{url}")
            return

        text = ''.join(text[1:])  # 连接文本内容
        zhangjie_content.append({"chapter": chapter, "text": text})

    except Exception as e:
        print(f"处理章节内容错误,URL: {base_url}{url}, 错误: {e}")


async def zhangjie(session, url):
    try:
        html_str = await fetch(session, url)
        html = etree.HTML(html_str)
        if html is None:
            print(f"解析 HTML 内容错误,URL: {url}")
            return

        title = html.xpath('//*[@id="info"]/h1/text()')
        title = title[0].strip() if title else "未知书名"
        novel_type = html.xpath('//*[@id="maininfo"]/div[1]/a[2]/text()')
        novel_type = novel_type[0].strip() if novel_type else "未知类型"
        author = html.xpath('//*[@id="info"]/p[1]/a/text()')
        author = author[0].strip() if author else "未知作者"
        update_time = html.xpath('//*[@id="info"]/p[3]/text()')
        update_time = update_time[0].strip() if update_time else "未知时间"
        img_url = html.xpath('//*[@id="fmimg"]/img/@src')
        img_url = img_url[0].strip() if img_url else "未知图片"
        jianjie = ''.join(html.xpath('//*[@id="intro"]//text()')).strip() if html.xpath(
            '//*[@id="intro"]//text()') else "未知简介"

        option_texts = html.xpath('/html/body/div[4]/div/div/select/option/text()')
        if not option_texts:
            print(f"未找到页码信息,URL: {url}")
            return

        zhang = re.findall(r'第\s*(\d+)\s*页\(末页\)', option_texts[-1])
        if not zhang:
            print(f"未找到页码匹配,URL: {url}")
            return
        zhang = int(zhang[0])
        print('开始爬取:', title)

        for i in range(1, zhang + 1):
            if len(zhangjie_content) >= 100:
                break  # 已经爬取100章,跳出循环

            zhangjie_url = f'{url}/index_{i}.html'
            zhangjie_html_str = await fetch(session, zhangjie_url)
            zhangjie_html = etree.HTML(zhangjie_html_str)
            if zhangjie_html is None:
                print(f"解析 HTML 内容错误,URL: {zhangjie_url}")
                break

            zhangjieLis = zhangjie_html.xpath('/html/body/div[4]/div/ul/li/a/@href')
            if not zhangjieLis:
                print(f"未找到章节列表,URL: {zhangjie_url}")
                break

            tasks = []
            for j in zhangjieLis:
                if len(zhangjie_content) >= 100:
                    break  # 已经爬取100章,跳出循环
                task = asyncio.create_task(neirong(session, url, j))
                tasks.append(task)

            await asyncio.gather(*tasks)

        # 插入所有爬取的章节内容到MongoDB
        insert_to_mongodb(title, novel_type, author, update_time, list(zhangjie_content), img_url, jianjie)
        zhangjie_content.clear()  # 清空章节内容列表

        print(f"已成功记录数据:{title}")

    except Exception as e:
        print(f"处理章节信息错误,URL: {url}, 错误: {e}")


async def main():
    async with aiohttp.ClientSession() as session:
        tasks = []
        for i in range(1, 51):
            main_url = f'http://www.biqule.net/top/monthvisit/{i}.html'  # 月访问榜主页链接
            task = asyncio.create_task(process_main_page(session, main_url))
            tasks.append(task)
        await asyncio.gather(*tasks)


async def process_main_page(session, main_url):
    try:
        html_str = await fetch(session, main_url)
        html = etree.HTML(html_str)
        if html is None:
            print("解析 HTML 内容错误,主页 URL")
            return

        novels = html.xpath('//div/ul/li/span[@class="sp_2"]/a')
        novel_urls = [{"title": novel.text.strip(), "url": novel.attrib['href']} for novel in novels if
                      novel.text and 'href' in novel.attrib]
        tasks = []
        for novel in novel_urls:
            task = asyncio.create_task(zhangjie(session, novel['url']))
            tasks.append(task)
        await asyncio.gather(*tasks)

    except Exception as e:
        print(f"处理主页面错误,URL: {main_url}, 错误: {e}")


if __name__ == "__main__":
    asyncio.run(main())

点赞(0) 打赏

评论列表 共有 0 条评论

暂无评论

微信公众账号

微信扫一扫加关注

发表
评论
返回
顶部