admin管理员组文章数量:1644929
尝试在我的博客中添上程序流程图,如果画的有误或有修改意见请各位大佬提出,我会加以改进的
本程序的流程
准备工作
- python安装完成
- pycharm安装完成
- lxml、asyncio、aiohttp、aiofiles第三方库安装完成,如果你卡在了这一步,我会写一篇关于python安装第三方库报错的博客
程序各个模块
返回页面源代码部分
def get_page_code(url):
with requests.get(url) as resp:
text =resp.text #获得页面的源代码
print("已经获取到源代码") # 你不要这个也行,但是我看着没有任何提示语句的程序内心很慌张
return text
获取第一层m3u8地址
找m3u8文件,直接在页面源代码中查找m3u8就行,查找快捷键:Ctrl+F
你就看到了这一行代码
让我们观察一下,m3u8地址在ifram标签中的src属性里面,我们要确认一下这个页面是否只有一个iframe标签,如果是直接全页面搜索iframe即可,经过查找发现,该页面只有一个iframe标签,那就好办了,这里你可以用xpath或者BeautifulSoup都可以,如果要用BeautifulSoup的话,需要在程序开头加上一句 from bs4 import BeautifulSoup即可,如果报错,评论区中告诉我,我尝试解决
但是这个m3u8文件的地址需要进行处理
上xpath(xpath不会的话我后期可能会写一篇博客)
def get_first_m3u8_url(code):
tree = etree.HTML(code)#创建etree对象,由于这里是HTML所以就选HTML就行
src = tree.xpath('//iframe/@src')[0]#//表示满页面的找ifame标签,@src表示获取iframe标签的src属性值,由于xpath返回的是一个列表,我们只要第一个,所以就是0
# 到了这一步我们拿到了第一层m3u8文件的地址,但需要提取
src= src.split("=")[1].strip('&id')# 真正的m3u8文件的地址在第二个元素中
print("已经获取到了第一层m3u8的地址")
return src
下载m3u8文件
在第二个函数中我们已经获得到了第一层m3u8文件的地址,但是真正的m3u8文件的地址实在第二层m3u8文件中
所以还要再处理一次
def download_m3u8_file(first_m3u8_file):
print("正在下载第二层m3u8文件")
second= get_page_code(first_m3u8_file)
root = first_m3u8_file.rsplit('/',3)[0]
second = second.split()[-1]
second = root+second#拼接第二层地址
second_file = get_page_code(second)
with open("m3u8.txt",mode="w",encoding='utf-8') as f:
f.write(second_file)
print("第二层m3u8文件下载完成")
下载文件
这是用协程来实现的两个函数,应该能看得懂
async def download_one(url, sem):
async with sem: # 这玩意叫信号量。 可以控制并发量, 目前看 运行稳定。 应该没啥问题
for i in range(100):
try:
print(url, "开始工作")
filename = url.split('/')[-1] # 刚刚这里有问题
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
content = await resp.content.read()
async with aiofiles.open(f"./待拼接的/{filename}",mode='wb') as f:
await f.write(content)
print(f"{filename}下载成功")
break
except Exception as e:
print(f"网址为{url}出错了,重新尝试", e)
print(f"等待{(i + 2) * 5}秒")
await asyncio.sleep((i+2)*5)
async def download_all():
sem = asyncio.Semaphore(10) # 10 表示最大并发量是10 也就是有10个任务可以被挂起
tasks=[]
with open("m3u8.txt",mode="r",encoding='utf-8') as f:
lines=f.readlines()
for line in lines:
line=line.strip()
if "#" in line:
continue
else:
task = asyncio.create_task(download_one(line, sem))
tasks.append(task)
await asyncio.wait(tasks)
合并文件
Windows上是这么做的
在cmd中输入copy /b 第一个文件(含扩展名) +第二个文件(含扩展名) 最终文件(含扩展名)
Mac上是这么干的
cat 第一个文件(含扩展名) +第二个文件(含扩展名) 最终文件(含扩展名) > 最终文件(含扩展名)
def merget():
namelist=[]
with open("m3u8.txt",encoding="utf-8") as f:
lines = f.readlines()
for line in lines:
if "#" in line:
continue
else:
line=line.strip()
line=line.split('/')[-1]
namelist.append(line)
print("已经将所有的ts文件名加入到了列表中,准备合并文件")
os.chdir('./待拼接的')
temp = [] #存放ts文件名后转成字符串的列表
n=1
for i in range(len(namelist)):
temp.append(namelist[i])
if i!=0 and i%40==0:
command="+".join(temp)
os.system(f"copy /b {command} {n}.ts")
temp=[]
n+=1
command="+".join(temp)#假如有48个文件,这一步处理剩下来的文件
os.system(f"copy /b {command} {n}.ts")
print("倒数第二步")
n+=1
temp=[]
for i in range(1,n):
temp.append(f"{i}.ts")
command = "+".join(temp)
os.system(f"copy /b {command} hope.mp4")
print("完成!!!我希望看到这一步")
最终代码
import os
import requests
from lxml import etree
import asyncio
import aiohttp
import aiofiles
def get_page_code(url):
with requests.get(url) as resp:
text =resp.text
print("已经获取到源代码")
return text
def get_first_m3u8_url(code):
tree = etree.HTML(code)
src = tree.xpath('//iframe/@src')[0]
src= src.split("=")[1].strip('&id')
print("已经获取到了第一层m3u8的地址")
return src
def download_m3u8_file(first_m3u8_file):
print("正在下载第二层m3u8文件")
second= get_page_code(first_m3u8_file)
root = first_m3u8_file.rsplit('/',3)[0]
second = second.split()[-1]
second = root+second#拼接第二层地址
second_file = get_page_code(second)
with open("m3u8.txt",mode="w",encoding='utf-8') as f:
f.write(second_file)
print("第二层m3u8文件下载完成")
async def download_one(url, sem):
async with sem: # 这玩意叫信号量。 可以控制并发量
for i in range(100):
try:
print(url, "开始工作")
filename = url.split('/')[-1] # 刚刚这里有问题
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
content = await resp.content.read()
async with aiofiles.open(f"./待拼接的/{filename}",mode='wb') as f:
await f.write(content)
print(f"{filename}下载成功")
break
except Exception as e: # 这样写就能知道错哪儿了
print(f"网址为{url}出错了,重新尝试", e)
print(f"等待{(i + 2) * 5}秒")
await asyncio.sleep((i+2)*5)
async def download_all():
sem = asyncio.Semaphore(10) # 10 表示最大并发量是10 也就是有10个任务可以被挂起
tasks=[]
with open("m3u8.txt",mode="r",encoding='utf-8') as f:
lines=f.readlines()
for line in lines:
line=line.strip()
if "#" in line:
continue
else:
task = asyncio.create_task(download_one(line, sem))
tasks.append(task)
await asyncio.wait(tasks)
def merget():
namelist=[]
with open("m3u8.txt",encoding="utf-8") as f:
lines = f.readlines()
for line in lines:
if "#" in line:
continue
else:
line=line.strip()
line=line.split('/')[-1]
namelist.append(line)
print("已经将所有的ts文件名加入到了列表中,准备合并文件")
os.chdir('./待拼接的')
temp = [] #存放ts文件名后转成字符串的列表
n=1
for i in range(len(namelist)):
temp.append(namelist[i])
if i!=0 and i%40==0:
command="+".join(temp)
os.system(f"copy /b {command} {n}.ts")
temp=[]
n+=1
command="+".join(temp)#假如有48个文件,这一步处理剩下来的文件
os.system(f"copy /b {command} {n}.ts")
print("倒数第二步")
n+=1
temp=[]
for i in range(1,n):
temp.append(f"{i}.ts")
command = "+".join(temp)
os.system(f"copy /b {command} hope.mp4")
print("完成!!!我希望看到这一步")
if __name__ == '__main__':
url = "http://www.wbdy.tv/play/67656_1_1.html"
main_code=get_page_code(url)
first_m3u8 = get_first_m3u8_url(main_code)
print(first_m3u8)
download_m3u8_file(first_m3u8)
loop = asyncio.get_event_loop()
loop.run_until_complete(download_all())
merget()
版权声明:本文标题:python爬虫之下载电影(不是爱奇艺腾讯视频等平台哈) 内容由热心网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:https://www.elefans.com/xitong/1726539683a1074597.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论