import re import requests from lxml import etree import logging import time import tqdm from playwright.sync_api import sync_playwright from playwright.async_api import async_playwright import asyncio import aiohttp import os import subprocess from Crypto.Cipher import AES import keyboard from tenacity import retry, stop_after_attempt
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36 Edg/121.0.0.0'} logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') m3u8s = [] keys = [] limit = 5 semaphore = asyncio.Semaphore(limit)
@retry(stop=stop_after_attempt(5)) def scrape_page(url): if url is not None: try: response = requests.get(url, headers=headers) if response.status_code == 200: response.encoding = 'utf-8' return response.text else: logging.error('%s', response.status_code) except requests.RequestException: logging.error('其他错误!', exc_info=True) else: logging.error('url为空!') return None
def on_response(response): if '.m3u8' in response.url and 'mp4' not in response.url and response.status == 200: m3u8s.append(response.url) if '.key' in response.url and response.status == 200: keys.append(response.url)
def read_ts(json): mid_ts_urls = re.findall(r'#EXTINF:.*?,\n(.*?)\n', json, re.S) if mid_ts_urls is not None: for ts_urls in mid_ts_urls: ts_urls = ts_urls.replace('\n', '') full_ts_urls.append(ts_urls) if 'http' in full_ts_urls[0]: return 'http' if 'hls' in full_ts_urls[0]: return 'hls' else: return None
def makefile(name, base_url): try: name = name.replace('/', '') path = f'{base_url}\\{name}' if not os.path.exists(path): os.makedirs(path) return path else: return path except PermissionError: logging.error(PermissionError) except OSError: logging.error(OSError)
def make_txt(name, path): name = name.replace('/', '') if os.path.exists(path): txt_path = f'{path}\\{name}.txt' if os.path.exists(txt_path): logging.info(f'{name}.txt已经存在') return txt_path else: with open(txt_path, 'w') as f: f.write('') if os.path.exists(txt_path): return txt_path else: logging.error('txt创建失败') else: return False
def write_data(num, content, path): with open(f'{path}\\{num}.mp4', 'wb') as file: file.write(content)
@retry(stop=stop_after_attempt(5)) async def translate_bytes(session, num) -> list: async with semaphore: async with session.get(full_ts_urls[num], headers=headers) as res: if res.status == 200: content = await res.read() return [content, num] else: content = requests.get(full_ts_urls[0], headers=headers).content return [content, num]
def translate_key(key_url): res = requests.get(key_url) if res.status_code == 200: return res.content else: return None
@retry(stop=stop_after_attempt(5)) def aes_decode(encrypted_data, key, m3u8): """先判断是否是AES-128加密方式,KEY:METHOD=AES-128 如果是则进行解密。如果不是则捕捉异常,并输出给控制台。""" try: encryption = re.search('KEY:.*?=(.*?),', m3u8, re.S).group(1).strip() if re.search('KEY:.*?=(.*?),', m3u8, re.S) else None if encryption is not None: if encryption == 'AES-128': """AES-128解密""" cipher = AES.new(key, AES.MODE_CBC, key) data = cipher.decrypt(encrypted_data) return data
else: logging.info('不是AES-128加密,请检查加密方式') else: return False except ValueError: logging.error(ValueError) print(encrypted_data)
def write_txt(txt_path, path, num): if os.path.exists(txt_path): with open(txt_path, 'a+', encoding='utf-8') as f: f.write(f"file '{path}\\{num}.mp4'" + '\n')
def ffmpeg_combine(video_path, txt_path, name, num): """先查找txt_path是否存在,然后再依次合并, 并检查合并好的文件是否存在""" if os.path.exists(txt_path): name = name.replace('/', '') order = f'ffmpeg -f concat -safe 0 -i {txt_path} -c copy {video_path}\\{name}第{num}集.mp4 -loglevel quiet' subprocess.run(order) if os.path.exists(f'{video_path}\\{name}第{num}集.mp4'): logging.info('下载成功啦!') else: print('下载失败了,原因是合并时出现了问题') else: logging.error(f'{txt_path}不存在,请检查是否创建')
def clear(file_path, txt_path): try: for i in range(len(full_ts_urls)): os.remove(f'{file_path}\\{i}.mp4') os.remove(txt_path) except FileNotFoundError: logging.error('FileNotFoundError!')
async def cancel_requests(route, request): await route.abort()
@retry(stop=stop_after_attempt(5)) async def listen(url): async with async_playwright() as p: browser = await p.firefox.launch(headless=True, args=['--User-Agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36 Edg/121.0.0.0'], executable_path=r'D:\爬虫脚本\樱花动漫下载器v2.0\firefox\firefox.exe') page = await browser.new_page() await page.route(re.compile(r'(\.png)|(\.jpg)|(\.ts)'), cancel_requests) page.on('response', on_response) await page.goto(url) await browser.close() if m3u8s: print(m3u8s[-1])
async def main(): repair = [] repair_num = [] global full_ts_urls full_ts_urls = [] to_verify = input('你是否要程序自动从网页监听m3u8视频请求?(y or n):') name = input('请输入名称:') episode = input('请输入你下载的是第几集:') base_path = input('请输入保存路径:') if to_verify == 'y': input_url = input('请输入网页url:') logging.info('正在监听网络请求中...(可能稍慢~请耐心等待, 如果失败请检查网络并重新启动)') await listen(input_url) else: input_url = input('请输入m3u8网址:') m3u8s.append(input_url) if m3u8s: m3u8 = m3u8s[-1] m3u8_text = scrape_page(m3u8) file_path = makefile(name, base_path) txt_path = make_txt(name, file_path) front_url = re.search(r'(.*/)([^/]+)\.m3u8$', m3u8).group(1).strip() if re.search(r'(.*/)([^/]+)\.m3u8$', m3u8) else None true_false = read_ts(m3u8_text) if true_false == 'hls': for i in range(len(full_ts_urls)): full_ts_urls[i] = re.sub('.*?/hls/', '', full_ts_urls[i]) full_ts_urls[i] = front_url + full_ts_urls[i] elif true_false == 'http': pass else: full_ts_urls = [f'{front_url}{url}' for url in full_ts_urls] if keys: key_url = keys[-1] key = translate_key(key_url) key_start = time.time() async with aiohttp.ClientSession(headers=headers) as session: tasks = [asyncio.ensure_future(translate_bytes(session, num)) for num in range(len(full_ts_urls))] pbar = tqdm.tqdm(total=len(full_ts_urls)) for coroutine in asyncio.as_completed(tasks): content = await coroutine try: data = aes_decode(content[0], key, m3u8_text) write_data(content[1], data, file_path) except TypeError: repair.append(content[0]) repair_num.append(content[1]) pbar.update(1) pbar.close() tried = [] if repair_num: for i in range(len(repair_num)): try: data = aes_decode(repair[i], key, m3u8_text) write_data(repair_num[i], data, file_path) except: tried.append(repair_num[i]) is_tried = False for i in range(len(full_ts_urls)): if tried: for num in tried: if i == num: is_tried = True if not is_tried: write_txt(txt_path, file_path, i) else: is_tried = False print('正在清理内存中...') ffmpeg_combine(file_path, txt_path, name, episode) clear(file_path, txt_path) key_end = time.time() total_time = (key_end - key_start) / 60 print('用时:' + f'{total_time}min') else: no_key_start = time.time() async with aiohttp.ClientSession(headers=headers) as session: tasks = [asyncio.ensure_future(translate_bytes(session, num)) for num in range(len(full_ts_urls))] pbar = tqdm.tqdm(total=len(full_ts_urls)) for coroutine in asyncio.as_completed(tasks): try: content = await coroutine write_data(content[1], content[0], file_path) except TypeError: repair.append(content[0]) repair_num.append(content[1]) pbar.update(1) pbar.close() tried = [] if repair_num: for i in range(len(repair_num)): try: write_data(repair_num[i], repair[i], file_path) except: tried.append(repair_num[i]) is_tried = False for i in range(len(full_ts_urls)): if tried: for num in tried: if i == num: is_tried = True if not is_tried: write_txt(txt_path, file_path, i) else: is_tried = False print('正在清理内存中...') ffmpeg_combine(file_path, txt_path, name, episode) clear(file_path, txt_path) no_key_end = time.time() total_time = (no_key_end - no_key_start) / 60 print('用时:' + f'{total_time}min') else: print('动漫已经失效')
if __name__ == '__main__': asyncio.run(main()) print('按ESC结束进程~o( =∩ω∩= )m') keyboard.wait('Esc')
|