M3U Python下载 脚本
功能
- 输入 m3u8 链接(支持变体 playlist),选择最高带宽的 variant(若有)。
- 多线程下载每个 segment(默认 16 个线程,可调)。
- 支持 AES-128 (EXT-X-KEY METHOD=AES-128) 的简单解密(需 pycryptodome)。
- 合并分片:若检测到 ffmpeg, 则用 ffmpeg 合并成 mp4; 否则把 .ts 片段按序二进制拼接为 output.ts。
- 自动做断点(已存在的分片会跳过)。
Python Code
python
import os
import re
import sys
import shutil
import argparse
import tempfile
import subprocess
from urllib.parse import urljoin, urlparse
from concurrent.futures import ThreadPoolExecutor, as_completed
try:
import requests
except ImportError:
print("请先安装依赖: pip install requests tqdm")
sys.exit(1)
try:
from tqdm import tqdm
except Exception:
tqdm = None
# optional crypto
try:
from Crypto.Cipher import AES
have_crypto = True
except Exception:
have_crypto = False
# ----- helper funcs -----
HEADERS = {"User-Agent": "m3u8-downloader/1.0"}
def fetch_text(url, session=None, **kwargs):
s = session or requests
resp = s.get(url, headers=HEADERS, timeout=15, **kwargs)
resp.raise_for_status()
return resp.text
def fetch_bytes(url, session=None, **kwargs):
s = session or requests
resp = s.get(url, headers=HEADERS, timeout=20, stream=True, **kwargs)
resp.raise_for_status()
return resp.content
def pick_variant(m3u8_text, base_url):
"""
如果是变体 playlist(包含 #EXT-X-STREAM-INF),选择 bandwidth 最大的 variant。
返回 variant_url 或 None(如果不是变体)。
"""
lines = m3u8_text.strip().splitlines()
variant_infos = []
for i,line in enumerate(lines):
if line.startswith("#EXT-X-STREAM-INF"):
# parse bandwidth if present
m = re.search(r'BANDWIDTH=(\d+)', line)
bw = int(m.group(1)) if m else 0
# next non-empty non-comment line is URI
for j in range(i+1, len(lines)):
u = lines[j].strip()
if u and not u.startswith("#"):
variant_infos.append((bw, urljoin(base_url, u)))
break
if not variant_infos:
return None
# pick highest bandwidth
variant_infos.sort(key=lambda x: x[0], reverse=True)
return variant_infos[0][1]
def parse_segments(m3u8_text, base_url):
"""
解析片段 URL 列表,返回 (segments_list, key_info)
key_info = dict with {method, uri, iv_bytes or None} if AES-128 present, else None
"""
lines = m3u8_text.strip().splitlines()
segments = []
key = None
last_iv = None
for i,line in enumerate(lines):
line = line.strip()
if not line or line.startswith("#") is False and not line.startswith("#EXT"):
# plain URI line could be here but better check below
pass
if line.startswith("#EXT-X-KEY"):
# parse method and uri and iv
m_method = re.search(r'METHOD=([^,]+)', line)
m_uri = re.search(r'URI="([^"]+)"', line)
m_iv = re.search(r'IV=([^,]+)', line)
method = m_method.group(1) if m_method else None
uri = urljoin(base_url, m_uri.group(1)) if m_uri else None
iv = None
if m_iv:
ivtxt = m_iv.group(1)
# IV can be 0x... hex
if ivtxt.startswith("0x") or ivtxt.startswith("0X"):
iv = bytes.fromhex(ivtxt[2:])
else:
try:
iv = bytes.fromhex(ivtxt)
except Exception:
iv = None
key = {"method": method, "uri": uri, "iv": iv}
# segment lines (non-comment, non-empty)
if line and not line.startswith("#"):
seg_url = urljoin(base_url, line)
segments.append(seg_url)
return segments, key
def download_segment(idx, url, dest_path, session, retries=3, key=None):
"""
下载单个分片,支持 AES-128 解密(如果 key 提供并 method == AES-128)。
key: dict with method, uri (used to fetch the key bytes), iv (bytes) or None
"""
tmp_path = dest_path + ".part"
if os.path.exists(dest_path):
return True, dest_path # already exists
for attempt in range(1, retries+1):
try:
r = session.get(url, headers=HEADERS, timeout=30, stream=True)
r.raise_for_status()
data = r.content
# decrypt if needed
if key and key.get("method", "").upper() == "AES-128":
if not have_crypto:
raise RuntimeError("需要 pycryptodome 支持 AES 解密,但未安装。")
# get key bytes (fetch once)
if not key.get("_key_bytes"):
kb = session.get(key["uri"], headers=HEADERS, timeout=15)
kb.raise_for_status()
key["_key_bytes"] = kb.content
iv = key.get("iv")
if iv is None:
# if IV not provided, use sequence number as big-endian 16 bytes? HLS spec: IV can be segment sequence number; but complex.
# We'll fallback to zero IV (not ideal). User should provide IV in playlist.
iv = b'\x00' * 16
cipher = AES.new(key["_key_bytes"], AES.MODE_CBC, iv=iv)
data = cipher.decrypt(data)
# Note: for AES-CBC we don't strip padding automatically because some streams expect exact TS frames.
# write atomically
with open(tmp_path, "wb") as f:
f.write(data)
os.replace(tmp_path, dest_path)
return True, dest_path
except Exception as e:
if attempt == retries:
return False, str(e)
# else retry
return False, "unknown error"
def merge_with_ffmpeg(segment_files, output_file):
"""
用 ffmpeg 合并 - 最稳妥的方法是把文件列表写入一个 filelist.txt 然后 ffmpeg -f concat -safe 0 -i filelist -c copy out.ts/mp4
如果输出名以 .mp4,ffmpeg 可能需要转封装(-c copy 也可能失败),但我们先尝试 -c copy。
"""
tmpdir = os.path.dirname(output_file) or "."
list_file = os.path.join(tmpdir, "ff_concat_list.txt")
with open(list_file, "w", encoding="utf-8") as f:
for seg in segment_files:
# ffmpeg concat requires paths like: file 'path'
f.write(f"file '{os.path.abspath(seg)}'\n")
cmd = ["ffmpeg", "-hide_banner", "-loglevel", "error", "-f", "concat", "-safe", "0",
"-i", list_file, "-c", "copy", output_file]
try:
subprocess.check_call(cmd)
os.remove(list_file)
return True, None
except subprocess.CalledProcessError as e:
return False, str(e)
# ----- main download logic -----
def download_m3u8(url, out, threads=16, tmp=None, keep_ts=True, session=None):
session = session or requests.Session()
base_url = url
text = fetch_text(url, session=session)
# if variant playlist, pick best
variant = pick_variant(text, base_url)
if variant:
print(f"检测到变体 playlist,选择 variant: {variant}")
base_url = variant
text = fetch_text(variant, session=session)
# determine base for relative paths
parsed = urlparse(base_url)
base_for_join = base_url.rsplit("/", 1)[0] + "/"
segments, key = parse_segments(text, base_for_join)
if not segments:
raise RuntimeError("未找到任何分片 (segments) —— 可能不是正确的 m3u8。")
print(f"共发现 {len(segments)} 个分片,线程数={threads}")
# prepare tmp dir
tmpdir = tmp or tempfile.mkdtemp(prefix="m3u8_dl_")
os.makedirs(tmpdir, exist_ok=True)
seg_files = [os.path.join(tmpdir, f"{i:06d}.ts") for i in range(len(segments))]
# Pre-fetch key if present
if key and key.get("method", "").upper() == "AES-128":
if not have_crypto:
print("提醒:playlist 使用 AES-128 加密,但未安装 pycryptodome,无法解密。")
else:
print(f"检测到 AES-128 加密,key uri: {key.get('uri')}")
# download with threads
failures = []
if tqdm:
pbar = tqdm(total=len(segments), desc="下载", unit="seg")
else:
pbar = None
with ThreadPoolExecutor(max_workers=threads) as exe:
future_to_idx = {}
for idx, seg_url in enumerate(segments):
dest = seg_files[idx]
future = exe.submit(download_segment, idx, seg_url, dest, session, 5, key)
future_to_idx[future] = (idx, seg_url, dest)
for fut in as_completed(future_to_idx):
idx, seg_url, dest = future_to_idx[fut]
ok, info = fut.result()
if not ok:
failures.append((idx, seg_url, info))
if pbar:
pbar.update(1)
if pbar:
pbar.close()
if failures:
print(f"以下 {len(failures)} 个分片下载失败(索引, url, 错误):")
for f in failures[:10]:
print(f)
raise RuntimeError("存在下载失败,停止合并。")
# 合并
# 首先检查 ffmpeg 是否存在
ffmpeg_path = shutil.which("ffmpeg")
if ffmpeg_path:
print("检测到 ffmpeg,使用 ffmpeg 合并为最终文件(快速且兼容性好)...")
success, err = merge_with_ffmpeg(seg_files, out)
if not success:
print(f"ffmpeg 合并失败: {err}\n尝试直接拼接 .ts 文件为 {out}.ts")
# fallthrough to concat
else:
print(f"合并完成 -> {out}")
if not keep_ts:
try:
shutil.rmtree(tmpdir)
except Exception:
pass
return out
# 如果没有 ffmpeg 或 ffmpeg 合并失败,则二进制拼接
print("使用二进制拼接所有 .ts 片段 (输出为 .ts 容器)。")
out_ts = out if out.lower().endswith(".ts") else out + ".ts"
with open(out_ts, "wb") as outf:
for seg in seg_files:
with open(seg, "rb") as f:
shutil.copyfileobj(f, outf)
print(f"拼接完成 -> {out_ts}")
if not keep_ts:
try:
shutil.rmtree(tmpdir)
except Exception:
pass
return out_ts
# ----- CLI -----
def cil(url: str, output_base: str):
parser = argparse.ArgumentParser(description="多线程下载 M3U8 (HLS) 视频并合并")
parser.add_argument("--keep-ts", action="store_true", help="保留临时 ts 文件")
args = parser.parse_args()
output = output_base + ".mp4"
temp = output_base + "_temp"
print("Generate the link of output_base is: {}".format(output_base))
try:
out = download_m3u8(url, output, threads=32, tmp=temp, keep_ts=True)
print("完成:", out)
except Exception as e:
print("出错:", e)
sys.exit(1)
if __name__ == "__main__":
videos = [
# {
# "url": "video1.m3u8",
# "name": "Video-1",
# },
# {
# "url": "video2.m3u8",
# "name": "Video-2"
# },
# {
# "url": "video3.m3u8",
# "name": "Video-3"
# }
]
if len(videos) == 0:
print("还未输入video下载资源")
else:
for video in videos:
cil(url=video["url"], output_base=video["name"])
实际用途
- 目前的在线视频,基本经过一些片段化的方式(例如使用HLS),将视频能够更快点播。
- 这个脚本的目的,就是将输入的m3u8视频,下载出来所有的ts片段,然后再拼装回去实际的mp4格式视频文件