视频下载流程分析、下载、合成"/>
视频下载流程分析、下载、合成
目录
一、难点分析处理
1.下载地址分析
2.视频合并工具
3.视频对应关系处理
4.视频合并
二、代码编写
1.普通版本
2.线程池版本
3.协程版本
一、难点分析处理
1.下载地址分析
在Chrome抓包,查看请求,发现第一个index.m3u8请求返回的内容中包含了第二个m3u8请求的url地址。
也就是说通过第一个index.m3u8中的url请求返回包含第二个index.m3u8文件地址,通过拼接请求第二个index.m3u8后 返回了包含当前所有ts文件的地址内容。
所以对二个index.m3u8中的url拼接后去访问就可以得到所有的ts文件。
2.视频合并工具
对于下载好的视频如何合并这里推荐使用ffmpeg。官网下载地址:.html#build-windows
①下载步骤:打开官网根据自己的电脑选择对应系统进行点击。我选定是windows,选择下面出现的“Windows builds from gyan.dev”。
② 根据需求下载需要的版本
③将下载好文件放在固定位置,将bin目录加入Path环境变量。例如:D:\ffmpeg-5.0.1-essentials_build\ffmpeg-5.0.1-essentials_build\bin
3.视频对应关系处理
通过程序处理将左边的原文件处理为右边的目标文件。
处理代码如下(示例):
def parallelism_m3u8():"""将第二次请求到的index2.m3u8(所有视频片段下载地址的文件),对应下载地址改为已经下载的文件名,其他保留:return:"""with open('index2.m3u8', 'r', encoding='UTF-8') as f:lines = f.readlines()path = 'ts'# 判断 当前存储ts的文件目录是否存在 不存在则创建if not os.path.exists(path):os.mkdir(path)# 拼接路径file_path = os.path.join(path, 'index.m3u8')with open(file_path, 'w', encoding='UTF-8') as f:i = 0for line in lines:# 获取所有要下载的ts的url地址 不以#作为开头if line.startswith('#'):f.write(line)else:f.write(str(i) + '.ts\n')i += 1
4.视频合并
视频合并代码如下(示例):
def merge():"""合并视频:return:"""path = 'ts'os.chdir(path)cmd = f'ffmpeg -i index.m3u8 -c copy 567.mp4'os.system(cmd)# 完成提示print('over!')
注意:如果Python中无法运行cmd,可在ts文件夹下打开终端运行下面的命令:ffmpeg -i index.m3u8 -c copy 567.mp4
二、代码编写
1.普通版本
代码如下(示例):
import requests
from lxml import etree
import osheaders = {'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/104.0.0.0 Safari/537.36'
}def get_m3u8_url():"""获取第一次请求视频数据的网址,由第一次获取视频数据的网址获取第二次请求视频数据的网址(全部视频片段的下载地址):return:第二次请求视频的网址"""# 视频播放页网址:designbytovaurl = ''# 获取第一次请求视频数据的网址req = requests.get(url=url, headers=headers)req.encoding = req.apparent_encodingtree = etree.HTML(req.text)m3u8_one_url = tree.xpath('//span[@class="ff-player"]/@data-src')[0].strip()# 获取第二次请求视频数据的网址(全部视频片段的下载地址)req = requests.get(url=m3u8_one_url, headers=headers)req.encoding = req.apparent_encoding# 保存第二次请求视频数据的网址(全部视频片段的下载地址)的文件with open('index1.m3u8', mode='w', encoding='UTF-8') as f:f.write(req.content.decode())# 读取第二次请求视频数据的网址(全部视频片段的下载地址)的文件with open('index1.m3u8', mode='r', encoding='UTF-8') as f:lines = f.readlines()# 生成第二次请求视频数据的网址for line in lines:if not line.startswith(r'#') or not line:m3u8_two_url = '/' + linereturn m3u8_two_urldef get_m3u8_two(m3u8_two_url):"""得到全部视频数据的下载地址:param m3u8_two_url: 第二次请求视频数据的网址:return: 全部视频数据的下载地址的列表"""# 全部视频数据的下载地址的列表downlode_m3u8_url_list = []# 获取第二次请求视频数据的文件req = requests.get(url=m3u8_two_url, headers=headers)req.encoding = req.apparent_encoding# 保存第二次请求视频数据的文件with open('index2.m3u8', mode='w', encoding='UTF-8') as f:f.write(req.content.decode())# 读取第二次请求视频数据的文件with open('index2.m3u8', mode='r', encoding='UTF-8') as f:lines = f.readlines()# 生成全部视频数据的下载地址的列表for line in lines:if line.startswith(r'#'):continuedownlode_m3u8_url_list.append(line.strip())return downlode_m3u8_url_listdef downlode_m3u8(i, downlode_m3u8_url):"""下载所有视频片段:param i: 下载的个数,一个给下载的视频片段重命名:param downlode_m3u8_url::return:"""# 获取视频片段数据req = requests.get(url=downlode_m3u8_url, headers=headers)req.encoding = req.apparent_encoding# 判断路径是否存在path = 'ts'if not os.path.exists(path):os.mkdir(path)# 下载视频片段with open(f'{path}/{i}.ts', mode='wb') as f:f.write(req.content)# 下载视频提示print(f'第{i}个视频下载完成!')def parallelism_m3u8():"""将第二次请求到的index2.m3u8(所有视频片段下载地址的文件),对应下载地址改为已经下载的文件名,其他保留:return:"""with open('index2.m3u8', 'r', encoding='UTF-8') as f:lines = f.readlines()path = 'ts'# 判断 当前存储ts的文件目录是否存在 不存在则创建if not os.path.exists(path):os.mkdir(path)# 拼接路径file_path = os.path.join(path, 'index.m3u8')with open(file_path, 'w', encoding='UTF-8') as f:i = 0for line in lines:# 获取所有要下载的ts的url地址 不以#作为开头if line.startswith('#'):f.write(line)else:f.write(str(i) + '.ts\n')i += 1def merge():"""合并视频,Python中无法运行可在ts文件夹下打开终端运行下面的命令:ffmpeg -i index.m3u8 -c copy 567.mp4:return:"""path = 'ts'os.chdir(path)cmd = f'ffmpeg -i index.m3u8 -c copy 567.mp4'os.system(cmd)# 完成提示print('over!')if __name__ == '__main__':# # 获取第二个m3u8请求网址m3u8_two_url = get_m3u8_url()# print(m3u8_two_url)# # 获取全部m3u8下载地址downlode_m3u8_url_list = get_m3u8_two(m3u8_two_url)# print(downlode_m3u8_url_list)# # 下载m3u8视频[downlode_m3u8(i, downlode_m3u8_url) for i, downlode_m3u8_url in enumerate(downlode_m3u8_url_list)]# 处理视频对应关系parallelism_m3u8()# 合并视频merge()
2.线程池版本
代码如下(示例):
import os
from concurrent.futures import ThreadPoolExecutor, waitimport requests
from lxml import etreeheaders = {'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/104.0.0.0 Safari/537.36'
}def get_m3u8_url():"""获取第一次请求视频数据的网址,由第一次获取视频数据的网址获取第二次请求视频数据的网址(全部视频片段的下载地址):return:第二次请求视频的网址"""# 视频播放页网址:designbytovaurl = ''# 获取第一次请求视频数据的网址req = requests.get(url=url, headers=headers)req.encoding = req.apparent_encodingtree = etree.HTML(req.text)m3u8_one_url = tree.xpath('//span[@class="ff-player"]/@data-src')[0].strip()# 获取第二次请求视频数据的网址(全部视频片段的下载地址)req = requests.get(url=m3u8_one_url, headers=headers)req.encoding = req.apparent_encoding# 保存第二次请求视频数据的网址(全部视频片段的下载地址)的文件with open('index1.m3u8', mode='w', encoding='UTF-8') as f:f.write(req.content.decode())# 读取第二次请求视频数据的网址(全部视频片段的下载地址)的文件with open('index1.m3u8', mode='r', encoding='UTF-8') as f:lines = f.readlines()# 生成第二次请求视频数据的网址for line in lines:if not line.startswith(r'#') or not line:m3u8_two_url = '/' + linereturn m3u8_two_urldef get_m3u8_two(m3u8_two_url):"""得到全部视频数据的下载地址:param m3u8_two_url: 第二次请求视频数据的网址:return: 全部视频数据的下载地址的列表"""# 全部视频数据的下载地址的列表downlode_m3u8_url_list = []# 获取第二次请求视频数据的文件req = requests.get(url=m3u8_two_url, headers=headers)req.encoding = req.apparent_encoding# 保存第二次请求视频数据的文件with open('index2.m3u8', mode='w', encoding='UTF-8') as f:f.write(req.content.decode())# 读取第二次请求视频数据的文件with open('index2.m3u8', mode='r', encoding='UTF-8') as f:lines = f.readlines()# 生成全部视频数据的下载地址的列表for line in lines:if line.startswith(r'#'):continuedownlode_m3u8_url_list.append(line.strip())return downlode_m3u8_url_listdef downlode_m3u8(i, downlode_m3u8_url):"""下载所有视频片段:param i: 下载的个数,一个给下载的视频片段重命名:param downlode_m3u8_url::return:"""# 获取视频片段数据req = requests.get(url=downlode_m3u8_url, headers=headers)req.encoding = req.apparent_encoding# 判断路径是否存在path = 'ts'if not os.path.exists(path):os.mkdir(path)# 下载视频片段with open(f'{path}/{i}.ts', mode='wb') as f:f.write(req.content)# 下载视频提示print(f'第{i}个视频下载完成!')def parallelism_m3u8():"""将第二次请求到的index2.m3u8(所有视频片段下载地址的文件),对应下载地址改为已经下载的文件名,其他保留:return:"""with open('index2.m3u8', 'r', encoding='UTF-8') as f:lines = f.readlines()path = 'ts'# 判断 当前存储ts的文件目录是否存在 不存在则创建if not os.path.exists(path):os.mkdir(path)# 拼接路径file_path = os.path.join(path, 'index.m3u8')with open(file_path, 'w', encoding='UTF-8') as f:i = 0for line in lines:# 获取所有要下载的ts的url地址 不以#作为开头if line.startswith('#'):f.write(line)else:f.write(str(i) + '.ts\n')i += 1def merge():"""合并视频:return:"""path = 'ts'os.chdir(path)cmd = f'ffmpeg -i index.m3u8 -c copy 567.mp4'os.system(cmd)# 完成提示print('over!')if __name__ == '__main__':# 获取第二个m3u8请求网址m3u8_two_url = get_m3u8_url()# print(m3u8_two_url)# 获取全部m3u8下载地址downlode_m3u8_url_list = get_m3u8_two(m3u8_two_url)# print(downlode_m3u8_url_list)# 线程池下载m3u8视频pool = ThreadPoolExecutor()tasks = []for i, downlode_m3u8_url in enumerate(downlode_m3u8_url_list):tasks.append(pool.submit(downlode_m3u8, i, downlode_m3u8_url))wait(tasks)# 处理视频对应关系parallelism_m3u8()# 合并视频merge()
该处使用的url网络请求的数据。
3.协程版本
代码如下(示例):
import os
import asyncio
import aiofiles
import aiohttp
from aiohttp import TCPConnectorimport requests
from lxml import etreeheaders = {'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/104.0.0.0 Safari/537.36'
}def get_m3u8_url():"""获取第一次请求视频数据的网址,由第一次获取视频数据的网址获取第二次请求视频数据的网址(全部视频片段的下载地址):return:第二次请求视频的网址"""# 视频播放页网址:designbytovaurl = ''# 获取第一次请求视频数据的网址req = requests.get(url=url, headers=headers)req.encoding = req.apparent_encodingtree = etree.HTML(req.text)m3u8_one_url = tree.xpath('//span[@class="ff-player"]/@data-src')[0].strip()# 获取第二次请求视频数据的网址(全部视频片段的下载地址)req = requests.get(url=m3u8_one_url, headers=headers)req.encoding = req.apparent_encoding# 保存第二次请求视频数据的网址(全部视频片段的下载地址)的文件with open('index1.m3u8', mode='w', encoding='UTF-8') as f:f.write(req.content.decode())# 读取第二次请求视频数据的网址(全部视频片段的下载地址)的文件with open('index1.m3u8', mode='r', encoding='UTF-8') as f:lines = f.readlines()# 生成第二次请求视频数据的网址for line in lines:if not line.startswith(r'#') or not line:m3u8_two_url = '/' + linereturn m3u8_two_urldef get_m3u8_two(m3u8_two_url):"""得到全部视频数据的下载地址:param m3u8_two_url: 第二次请求视频数据的网址:return: 全部视频数据的下载地址的列表"""# 全部视频数据的下载地址的列表downlode_m3u8_url_list = []# 获取第二次请求视频数据的文件req = requests.get(url=m3u8_two_url, headers=headers)req.encoding = req.apparent_encoding# 保存第二次请求视频数据的文件with open('index2.m3u8', mode='w', encoding='UTF-8') as f:f.write(req.content.decode())# 读取第二次请求视频数据的文件with open('index2.m3u8', mode='r', encoding='UTF-8') as f:lines = f.readlines()# 生成全部视频数据的下载地址的列表for line in lines:if line.startswith(r'#'):continuedownlode_m3u8_url_list.append(line.strip())return downlode_m3u8_url_listasync def downlode_m3u8(i, downlode_m3u8_url, semaphore):"""下载所有视频片段:param i: 下载的个数,一个给下载的视频片段重命名:param downlode_m3u8_url: 视频片段下载地址:param semaphore: 并发量:return:"""# 判断路径是否存在path = 'ts'if not os.path.exists(path):os.mkdir(path)# 获取视频片段数据并下载async with semaphore:async with aiohttp.ClientSession(headers=headers) as session:async with session.get(url=downlode_m3u8_url, timeout=30) as resp:data = await resp.read()# 下载视频片段async with aiofiles.open(f'{path}/{i}.ts', mode='wb') as f:await f.write(data)# 下载视频提示print(f'第{i}个视频下载完成!')def parallelism_m3u8():"""将第二次请求到的index2.m3u8(所有视频片段下载地址的文件),对应下载地址改为已经下载的文件名,其他保留:return:"""with open('index2.m3u8', 'r', encoding='UTF-8') as f:lines = f.readlines()path = 'ts'# 判断 当前存储ts的文件目录是否存在 不存在则创建if not os.path.exists(path):os.mkdir(path)# 拼接路径file_path = os.path.join(path, 'index.m3u8')with open(file_path, 'w', encoding='UTF-8') as f:i = 0for line in lines:# 获取所有要下载的ts的url地址 不以#作为开头if line.startswith('#'):f.write(line)else:f.write(str(i) + '.ts\n')i += 1def merge():"""合并视频,Python中无法运行可在ts文件夹下打开终端运行下面的命令:ffmpeg -i index.m3u8 -c copy 567.mp4:return:"""path = 'ts'os.chdir(path)cmd = f'ffmpeg -i index.m3u8 -c copy 567.mp4'os.system(cmd)# 完成提示print('over!')async def aio_download(downlode_m3u8_url_list):"""封装:param downlode_m3u8_url_list: 视频片段下载地址列表:return:"""# 总任务tasks = []# 创建信号量,并发下载semaphore = asyncio.Semaphore(100)for i, downlode_m3u8_url in enumerate(downlode_m3u8_url_list):tasks.append(asyncio.create_task(downlode_m3u8(i, downlode_m3u8_url, semaphore)))await asyncio.wait(tasks)if __name__ == '__main__':# # 获取第二个m3u8请求网址m3u8_two_url = get_m3u8_url()# print(m3u8_two_url)# # 获取全部m3u8下载地址downlode_m3u8_url_list = get_m3u8_two(m3u8_two_url)# print(downlode_m3u8_url_list)# # 下载m3u8视频loop = asyncio.get_event_loop()loop.run_until_complete(aio_download(downlode_m3u8_url_list))loop.close()# 处理视频对应关系parallelism_m3u8()# 合并视频merge()
更多推荐
视频下载流程分析、下载、合成
发布评论