马蜂窝爬虫:基于Scrapy的高性能方案

编程入门 行业动态 更新时间:2024-10-07 14:28:38

<a href=https://www.elefans.com/category/jswz/34/1702365.html style=马蜂窝爬虫:基于Scrapy的高性能方案"/>

马蜂窝爬虫:基于Scrapy的高性能方案

引言

马蜂窝是国内著名的旅游攻略网站,拥有大量优质的UGC内容。出于某些原因,我进行了游记和自由行两类数据的爬取,并愿意与大家分享整个爬取流程,内容分析、破解方法以及代码实例。

本方案采用了Python著名的爬虫框架Scrapy,和Mongo数据库。单实例能够每天处理数百万级的列表数据,以及大约3万左右的详情内容爬取。如果您希望获得更高性能,还可以考虑多进程结合高质量的IP代理。

网站分析

数据分类

在此次爬取中,我主要获取了目的地、景点、游记和自由行等数据。其他类型的数据获取方式与这些类似,因此您可以将其视作参考。

目的地

目的地数据主要包括城市、省份和国家等信息,这是后续数据结构爬取所需的基本数据,每个目的地都有其对应的mddId。

  1. 从 获取国家目的地址,页面上就能轻松拿到国家的mddId。

  1. 拼接mddid成城市列表地址-> .html ,这样就可以获取当前国家所有城市。

    点击翻页底部,可以发现页面发送了一个post请求 ,去获取翻页后的页面数据,通过这个请求就可以获取到全部数据。

其中list是列表内容,page是底部页数信息。

景点

一旦获取到城市级别的mddid,您就可以开始爬取当地的景点数据。这里我需要的是景点信息。


同样的点击翻页,发现页面 也是发送了一个POST 请求 .php 来获取翻页数据。

游记

游记主要由用户生成,可以从多个入口访问,但我将介绍从景点和目的地两个入口获取游记数据的方法。可能会有重复数据,但您只需设置好唯一约束即可。

自由行

自由行数据可以在目的地详情页内找到。

列表访问地址如下 /
通过翻页,发现调用了 一个GET接口 =10065&page=1
。这个接口没有校验,可以直接访问。

破解

马蜂窝网站采用两种主要反爬机制:cookie校验和参数摘要校验,有时候也会同时使用这两种方式。在爬取页面时,您可以首先尝试拷贝参数并进行尝试。如果这仍然无法通过,那么可能采用了cookie校验;如果参数中包含_sn,那么可能是参数摘要校验。

cookie 校验

当全新打开一个马蜂窝页面时,会发现进行了三次页面加载,两次返回状态码521,最终才能成功访问页面,如果直接请求页面是拿不到想要的数据的。接下来详细分析这个过程。

step1

第一次请求会返回一段JavaScript脚本,其目的是设置Cookie,然后刷新页面。

再加上response header的set-cookie的操作,所以第二次请求实际上就是带上这两个cookie,再进行一次请求。

step2

在设置好Cookie之后,会返回经过JavaScript混淆后的代码。混淆工具全称为obfuscator,它是一款免费、开源的JavaScript混淆工具,用于保护核心JavaScript代码不容易被破解。

代码format之后大致是这样的结构,最后是执行了go函数,入参是可能涉及的变量。进入看go函数涉及到很多复杂混淆,看代码直接看晕。

要破解这种反爬机制,有两种主要方法:

  1. 构建一个模拟浏览器环境,以执行这些JavaScript代码。
  2. 是把js代码转为抽象语法树(AST),然后分析混淆方式并进行还原。反混淆是一个复杂而繁琐的工作,需要处理大量细节。这就好比将一个破碎严重的古董瓷器修复成原始的模样。当然网上也有一些一键还原网站,但是如果比较复杂就会失败。

幸好往上已经有大神把这段代码给反混淆了,转为python大概就是这样,实际上就是按某种方式,又生成了一遍__jsl_clearance_s。

        for i in range(len(go['chars'])):for j in range(len(go['chars'])):values = go['bts'][0] + go['chars'][i] + \go['chars'][j] + go['bts'][1]if go['ha'] == 'md5':ha = hashlib.md5(values.encode()).hexdigest()elif go['ha'] == 'sha1':ha = hashlib.sha1(values.encode()).hexdigest()elif go['ha'] == 'sha256':ha = hashlib.sha256(values.encode()).hexdigest()if ha == go['ct']:__jsl_clearance_s = values

step3

重新设置了cookie中的__jsl_clearance_s之后,再进行一次请求就可以拿到想要的数据啦。

参数摘要校验

某些接口例如上面提到景点的翻页接口 .php 会有一个_sn参数,这就是参数摘要,如果您去掉这个参数会导致返回内容为空,修改参数值也会导致失败。

添加一个XHR断点,把带有router.php的请求拦住。

一步一步跟代码,可以在ajax里面做了大量运算的逻辑,然后在进入cA之前c为参数列表,l.data为参数的拼接字符串。进入cA以后l多了_sn的参数,肯定是cA内进行了_sn的运算。

在这个cA function里面是一段混淆,通过多次单步调试,发现在return 方法之后,再次进入cA方法时,会跳转到另一个js文件,index.js?1631273413。

打开以后发现是一个ob混淆的代码。

使用 /encry_decry/obfuscator.html 简单的进行反混淆,可以看出大致的逻辑。

因为没有复杂的对象,可以手动构造一下参数,然后进行hash,下面是一个python 的例子。

	
_ts = str(time.time_ns()//100000)
qdata = f'{{"_ts":"{_ts}","mddid":"21536","page":"1"}}c9d6618dbc657b41a66eb0af952906f1'.encode('utf-8')
hl = hashlib.md5()
hl.update(qdata)
_sn =  hl.hexdigest()[2:12]

拼出_sn就能对接口进行请求啦。

代码实现

简述

整个项目基于 scrapy ,省去了很多基础代码开发,这里不在对scrapy如何使用进行赘述。

针对不同类型的数据,我写了多个爬虫逻辑。可以进行分批爬取,因为经常会出现各种问题导致断档,之后根据实际情况,再进行补录。在数据准备流程上,先取目的地,再取景点,最后爬取自由行和游记的征文。在数据选择上,逼着使用的是xpath方式。

需要注意的是马蜂窝一旦爬取速度过快数量过多,就会导致ip被封,所以最好是使用代理。如果想有更快的爬取速度,那必须要使用多进程进行爬取,这时候可能会设计任务分配的问题,我使用的是mongo的find_one_and_update来保证并发下任务分配。并发参数配置需要根据实际情况,包括代理的ip量、网络带宽和不同数据类型等等。

环境准备

我所依赖的环境为centos8,python3.9、mongo5,并没有用一些高级特性,python3.6以上应该就可以,最好在linux或者mac上进行开发,windows环境不好配置。

项目一览

./spiders/mfw_mdd_home.py 目的地首页爬虫
./spiders/mfw_scenic.py 景点爬虫
./spiders/notes_detail_2.py 游记详情爬虫 基于scrapy requrst,速度快,但是并不稳定
./spiders/zyx_detail.py 自由行爬虫
./spiders/notes_by_mdd.py 目的地游记列表爬虫
./spiders/notes_detail_1.py 游记详情爬虫 基于requests,速度慢,但是稳定
./spiders/zyxs_by_mmd.py 自由行列表爬虫
./spiders/notes_by_scenic.py 景点游记列表爬虫
./spiders/mfw_mdd_v2.py 目的地爬虫./myextend.py 自定义扩展,包括代理和mongo链接
./page_decryptor.py 破解cookie校验工具
./middlewares.py 中间件,包括代理和cookie等
./settings.py 配置文件
./items.py orm
./pipelines.py 主要处理持久化逻辑
./requirements.txt 所需依赖

所需依赖:

attrs==23.1.0
Automat==22.10.0
certifi==2023.7.22
cffi==1.15.1
charset-normalizer==3.2.0
constantly==15.1.0
cryptography==41.0.3
cssselect==1.2.0
dnspython==2.4.2
filelock==3.12.3
hyperlink==21.0.0
idna==3.4
incremental==22.10.0
itemadapter==0.8.0
itemloaders==1.1.0
jmespath==1.0.1
lxml==4.9.3
packaging==23.1
parsel==1.8.1
Protego==0.3.0
pyasn1==0.5.0
pyasn1-modules==0.3.0
pycparser==2.21
PyDispatcher==2.0.7
PyExecJS==1.5.1
pymongo==4.5.0
pyOpenSSL==23.2.0
queuelib==1.6.2
requests==2.31.0
requests-file==1.5.1
retrying==1.3.4
Scrapy==2.10.1
service-identity==23.1.0
six==1.16.0
tldextract==3.5.0
Twisted==22.10.0
typing_extensions==4.7.1
urllib3==2.0.4
w3lib==2.1.2
zope.interface==6.0

代码

这里仅列出一些比较重要的代码

目的地爬虫

import scrapy
from scrapy.http import Responsefrom mfwscrapy.items import *class MfwSpider(scrapy.Spider):name = "mfw_mdd_home"handle_httpstatus_list = [200]allowed_domains = ["mafengwo"]def start_requests(self):base_url = ""yield scrapy.Request(url=base_url, callback=self.parse)def parse(self, response: Response):sel = scrapy.Selector(response)hot_list = sel.xpath('//div[@class="row row-hot"]')hot_city_links=hot_list.xpath('.//div[@class="col"]//dl//a')for x in hot_city_links:item = Mdd()link=x.xpath("./@href").get()item["mddId"]=link.split("/")[-1].split(".")[0]item["title"]=x.xpath("./text()").get()if item["mddId"] is None or item["title"] is None:continueyield item

景点爬虫

import hashlib
import json
import re
import timeimport scrapy
from scrapy.http import Responsefrom mfwscrapy.items import Scenic
from mfwscrapy.myextend import mongoclass MfwSpider(scrapy.Spider):name = "mfw_scenic"handle_httpstatus_list = [200, 521]base_url = ".php"# 生成表单数据def getFormData(self, mddid, page):_ts = str(time.time_ns()//100000)qdata = f'{{"_ts":"{_ts}","iMddid":"{mddid}","iPage":"{page}","iTagId":"0","sAct":"KMdd_StructWebAjax|GetPoisByTag"}}c9d6618dbc657b41a66eb0af952906f1'qdata = qdata.encode('utf-8')hl = hashlib.md5()hl.update(qdata)_sn = hl.hexdigest()[2:12]return {'sAct': 'KMdd_StructWebAjax|GetPoisByTag','iMddid': mddid,'iPage': str(page),'iTagId': '0','_ts': _ts,'_sn': _sn}def start_requests(self):while True:data = mongo.zyx.find_one_and_update({"status": 0}, {"$set": {"status": 1}})if not data:breakformdata = self.getFormData(data["_id"], 1)yield scrapy.FormRequest(url=self.base_url, formdata=formdata, callback=self.parsePage, cb_kwargs={"mddPo": data})# 解析页数def parsePage(self, response: Response, mddPo):scenic_html = json.loads(response.text)["data"]["page"]page_match = re.search(r"共<span>(\d+)</span>页", scenic_html, re.M)if page_match:totalPage = int(page_match.group(1))# 最多访问20页if totalPage > 20:totalPage = 20for page in range(1, totalPage+1):formdata = self.getFormData(mddPo["_id"], page)yield scrapy.FormRequest(url=self.base_url,  dont_filter=True, formdata=formdata, callback=self.parse)# 解析列表def parse(self, response: Response):scenic_html = json.loads(response.text)["data"]["list"]sel = scrapy.Selector(text=scenic_html)for a_tag in sel.xpath('//li/a'):o_url = a_tag.xpath("./@href").get()title = a_tag.xpath("./@title").get()scenic_id = o_url[o_url.rfind("/")+1:o_url.rfind(".")]target_url = f"{scenic_id}.html"item = Scenic()item["poi_id"] = scenic_iditem["title"] = titleitem["link"] = target_urlitem["status"] = 0yield item

游记爬虫

import scrapyfrom scrapy.http import Response
from mfwscrapy.items import NotesDetail 
from mfwscrapy.myextend import mongo
from mfwscrapy.page_decryptor import descryporclass MfwSpider(scrapy.Spider):name = "notes_detail_1"handle_httpstatus_list = [200,404, 521]def start_requests(self):filter = {"status": -2} update = {"$set": {"status": -3}}  while True:data = mongo.note.find_one_and_update(filter,update)if not data:breakyield scrapy.Request(url=""+data["_id"], callback=self.parse, cb_kwargs={"item": data})def parse(self, response: Response,  item):def not_find_content(entity):entity["status"] = 404yield entityentity = NotesDetail()entity["_id"] = item["_id"]if response.status == 521:# 解决cookie校验,并获取内容content = descrypor.getPostDetail(response.url)if content is None:yield from not_find_content(entity)returnsel = scrapy.Selector(text=content)elif response.status == 404:yield from not_find_content(entity)returnelif response.status == 200:sel = scrapy.Selector(response)else:returnj_content = sel.xpath('//div[@class="_j_content"] | //div[@class="_j_content_box"] | //div[@class="post_info"] | //div[@class="va_con _j_master_content"]').get()if j_content is None:yield from not_find_content(entity)else:entity["content"] = j_contententity["status"] = 1yield entity

破解cookie校验工具

import re
import execjs
import requests
import json
from requests.utils import add_dict_to_cookiejar
import hashlib
from mfwscrapy.myextend import proclass Decryptor():requests.packages.urllib3.disable_warnings()header = {'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/116.0.0.0 Safari/537.36','Host' : 'www.mafengwo'}def __init__(self) -> None:passdef getPostDetail(self,url:str):try:session = requests.sessions.session() #使用session会一直携带上一次的cookiesproxies=pro.getProxy()response = session.get(url, headers=self.header,proxies=proxies, verify=False,timeout=10) #直接访问得到JS代码js_clearance = re.findall('cookie=(.*?);location', response.text)[0] #用正则表达式匹配出需要的部分result = execjs.eval(js_clearance).split(';')[0].split('=')[1] #反混淆、分割出cookie的部分add_dict_to_cookiejar(session.cookies, {'__jsl_clearance_s': result})  #将第一次访问的cookie添加进入session会话中response = session.get(url, headers=self.header,proxies=proxies,verify=False,timeout=10) #带上更新后的cookie进行第二次访问go = json.loads(re.findall(r'};go\((.*?)\)</script>', response.text)[0])for i in range(len(go['chars'])):for j in range(len(go['chars'])):values = go['bts'][0] + go['chars'][i] + go['chars'][j] + go['bts'][1]if go['ha'] == 'md5':ha = hashlib.md5(values.encode()).hexdigest()elif go['ha'] == 'sha1':ha = hashlib.sha1(values.encode()).hexdigest()elif go['ha'] == 'sha256':ha = hashlib.sha256(values.encode()).hexdigest()if ha == go['ct']:__jsl_clearance_s = valuesadd_dict_to_cookiejar(session.cookies, {'__jsl_clearance_s' :__jsl_clearance_s})response = session.get(url, headers=self.header,proxies=proxies, verify=False,timeout=10)except Exception as e:print(e)return Noneif response.status_code == 200:return response.textelse:return Nonedescrypor=Decryptor()

items

items 主要是作为数据库ORM,status用于存储状态

import scrapy#游记详情
class NotesDetail(scrapy.Item):_id=scrapy.Field()title = scrapy.Field()source = scrapy.Field()status = scrapy.Field()link = scrapy.Field()content = scrapy.Field()createTime=scrapy.Field()updateTime=scrapy.Field()#自由行详情
class ZYXDetail(scrapy.Item):_id=scrapy.Field()title = scrapy.Field()status = scrapy.Field()source = scrapy.Field()content = scrapy.Field()createTime=scrapy.Field()updateTime=scrapy.Field()#景点
class Scenic(scrapy.Item):_id=scrapy.Field()poi_id=scrapy.Field()link =scrapy.Field()title = scrapy.Field()status = scrapy.Field()createTime=scrapy.Field()updateTime=scrapy.Field()#目的地
class Mdd(scrapy.Item):_id=scrapy.Field()mddId=scrapy.Field()title = scrapy.Field()status = scrapy.Field()createTime=scrapy.Field()updateTime=scrapy.Field()

pipeline 持久化处理

# Define your item pipelines here
#
# Don't forget to add your pipeline to the ITEM_PIPELINES setting
# See: .htmlfrom datetime import datetime
from urllib.parse import quote_plus# useful for handling different item types with a single interfacefrom .items import NotesDetail, Mdd, Scenic, ZYXDetail
from .myextend import mongoclass SaveMongoPipeline:def __init__(self) -> None:passdef process_item(self, item, spider):if isinstance(item, Scenic):self.process_item_scenic(item,spider)elif isinstance(item, NotesDetail):self.process_item_note(item,spider)elif isinstance(item, Mdd):self.process_item_mdd(item,spider)elif isinstance(item, ZYXDetail):self.process_item_zyx(item,spider)return itemdef process_item_scenic(self, item, spider):item["_id"]=item["poi_id"]item['updateTime']=datetime.now()if item["status"] == 0: item['createTime']=datetime.now()mongo.scenic.update_one({'_id':item["_id"]},dict(item),upsert=True)else:mongo.scenic.update_one({'_id':item["_id"]},{'$set':dict(item)})return itemdef process_item_note(self, item, spider):item['updateTime']=datetime.now()if item["status"] == 0: item['createTime']=datetime.now()mongo.note.update_one({'_id':item["_id"]},dict(item),upsert=True)else:mongo.note.update_one({'_id':item["_id"]},{'$set':dict(item)})return itemdef process_item_mdd(self, item, spider):item['updateTime']=datetime.now()if item["status"] == 0: item['createTime']=datetime.now()mongo.mdd.update_one({'_id':item["_id"]},dict(item),upsert=True)else:mongo.mdd.update_one({'_id':item["_id"]},{'$set':dict(item)})return item def process_item_zyx(self, item, spider):item['updateTime']=datetime.now()if item["status"] == 0: item['createTime']=datetime.now()mongo.zyx.update_one({'_id':item["_id"]},dict(item),upsert=True)else:mongo.zyx.update_one({'_id':item["_id"]},{'$set':dict(item)})return item

代理&mongo数据源

class MongoConnect:def __init__(self) -> None:# Set your username, password, database, and cluster URLusername = ""password = ""database = ""port=27017host=""connection_url = f"mongodb://{username}:{password}@{host}:{port}/{database}"# Connect to MongoDBclient = MongoClient(connection_url)self.client= clientdb=client.get_database(database)self.db=db# Select the database to useself.note = db.get_collection("detail")self.scenic = db.get_collection("scenic")self.mdd = db.get_collection("mdd")self.zyx = db.get_collection("zyx")mongo=MongoConnect()class Proxy:def __init__(self):self.refreshProxy()# 获取代理时可能出现异常,异常以后进行重试@retry(stop_max_attempt_number=3, wait_fixed=1000)def refreshProxy(self):self._proxy_list = [] #根据实际情况获取您拥有的代理ip@propertydef proxy(self):return self._proxy_list@proxy.setterdef proxy(self, list):self._proxy_list = list# 随机获取代理ipdef getProxy(self):proxy = random.choice(self._proxy_list)username = ""password = ""return {"http":f"http://{username}:{password}@{proxy}/","https":f"http://{username}:{password}@{proxy}/"}pro = Proxy()

settings 配置


USER_AGENT = "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/116.0.0.0 Safari/537.36"CONCURRENT_REQUESTS = 8# 注意这里一定要加delay,要不然很快就被封了
DOWNLOAD_DELAY = 0.2COOKIES_ENABLED = TrueDOWNLOADER_MIDDLEWARES = {"mfwscrapy.middlewares.MfwscrapyDownloaderMiddleware": 543,}EXTENSIONS = {'mfwscrapy.myextend.MyExtend': 300,
}ITEM_PIPELINES = {"mfwscrapy.pipelines.SaveMongoPipeline": 200}

下载中间件

class MfwscrapyDownloaderMiddleware:@classmethoddef from_crawler(cls, crawler):s = cls()crawler.signals.connect(s.spider_opened, signal=signals.spider_opened)return s# 在请求之前配置代理信息def process_request(self, request: Request, spider):request.meta['proxy'] = pro.getProxy()["https"]def process_response(self, request: Request, response: Response, spider):return response# 在使用代理的时候,经常会出现网络异常的情况,重新处理一遍requestdef process_exception(self, request, exception, spider):print(exception)if isinstance(exception,TimeoutError):return requestelif isinstance(exception,TCPTimedOutError):return requestdef spider_opened(self, spider):spider.logger.info("Spider opened: %s" % spider.name)

如果出现 TCP connection timed out:110、Connection was refused by other side: 111: Connection refused 之类的异常,可以按照上面的process_exception 方法配置。

更多推荐

马蜂窝爬虫:基于Scrapy的高性能方案

本文发布于:2024-03-23 20:07:25,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1742283.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:马蜂窝   爬虫   高性能   方案   Scrapy

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!