M3U Python Download Script
Features
- Accepts an m3u8 URL (supports variant playlists) and automatically selects the highest bandwidth variant if available.
- Downloads each segment using multithreading (default 16 threads, configurable).
- Supports AES-128 decryption (if
EXT-X-KEY METHOD=AES-128
is used; requirespycryptodome
). - Merges segments: if ffmpeg is detected, it merges into an MP4; otherwise, it concatenates
.ts
segments intooutput.ts
. - Supports resume: existing segments are skipped automatically.
Python Code Overview
The script works in these steps:
Fetch M3U8 playlist Download the playlist text and, if it's a variant playlist, pick the highest bandwidth variant.
Parse segments & key
- Extract all segment URLs.
- Detect AES-128 encryption and parse the key URI and IV if provided.
Download segments with threads
- Each segment is downloaded concurrently.
- AES-128 decryption is applied if needed.
- Already existing files are skipped (supports resume).
Merge segments
- Prefer using ffmpeg for merging (handles TS -> MP4 seamlessly).
- Fallback: binary concatenate
.ts
files if ffmpeg is not available.
Key Functions
Pick highest bandwidth variant
python
def pick_variant(m3u8_text, base_url):
"""
Parse variant playlist (#EXT-X-STREAM-INF) and return URL of highest bandwidth variant.
"""
Parse segments and optional AES key
python
def parse_segments(m3u8_text, base_url):
"""
Return (segments_list, key_info) where key_info = {method, uri, iv} if AES-128 exists.
"""
Download a single segment with optional decryption
python
def download_segment(idx, url, dest_path, session, retries=3, key=None):
"""
Download one segment; if AES-128 key is provided, decrypt the segment.
"""
Merge segments using ffmpeg
python
def merge_with_ffmpeg(segment_files, output_file):
"""
Use ffmpeg concat demuxer to merge TS segments into a single file.
"""
Main download logic
python
def download_m3u8(url, out, threads=16, tmp=None, keep_ts=True, session=None):
"""
- Select variant if present
- Parse segments and key
- Download segments in parallel
- Merge segments with ffmpeg or fallback to binary concatenation
"""
CLI Usage
python
def cil(url: str, output_base: str):
"""
Wrapper for argparse CLI. Downloads a single video.
"""
- Example usage:
python
videos = [
{"url": "https://example.com/video1.m3u8", "name": "Video-1"},
{"url": "https://example.com/video2.m3u8", "name": "Video-2"}
]
for video in videos:
cil(url=video["url"], output_base=video["name"])
Practical Usage
- Modern streaming video often uses HLS, splitting videos into small
.ts
segments for faster playback. - This script downloads all
.ts
segments and reconstructs the original MP4 file locally. - AES-128 encrypted streams can be handled if the key URI is provided in the playlist.
Notes
- Dependencies:
requests
,tqdm
,pycryptodome
(optional for AES). - Multithreading: controlled via
threads
parameter (default 16). - Temporary files: stored in a temp folder; can be kept with
--keep-ts
. - ffmpeg: recommended for merging to MP4, fallback is binary concatenation to
.ts
.
This script is a robust solution for downloading and reconstructing HLS videos from m3u8 playlists, including variant streams and AES-128 encryption.
Full Code
python
import os
import re
import sys
import shutil
import argparse
import tempfile
import subprocess
from urllib.parse import urljoin, urlparse
from concurrent.futures import ThreadPoolExecutor, as_completed
try:
import requests
except ImportError:
print("Please install dependencies first: pip install requests tqdm")
sys.exit(1)
try:
from tqdm import tqdm
except Exception:
tqdm = None
# optional crypto
try:
from Crypto.Cipher import AES
have_crypto = True
except Exception:
have_crypto = False
# ----- helper funcs -----
HEADERS = {"User-Agent": "m3u8-downloader/1.0"}
def fetch_text(url, session=None, **kwargs):
s = session or requests
resp = s.get(url, headers=HEADERS, timeout=15, **kwargs)
resp.raise_for_status()
return resp.text
def fetch_bytes(url, session=None, **kwargs):
s = session or requests
resp = s.get(url, headers=HEADERS, timeout=20, stream=True, **kwargs)
resp.raise_for_status()
return resp.content
def pick_variant(m3u8_text, base_url):
"""
If this is a variant playlist (#EXT-X-STREAM-INF), pick the variant with the highest bandwidth.
Returns variant_url or None (if not a variant playlist).
"""
lines = m3u8_text.strip().splitlines()
variant_infos = []
for i,line in enumerate(lines):
if line.startswith("#EXT-X-STREAM-INF"):
# parse bandwidth if present
m = re.search(r'BANDWIDTH=(\d+)', line)
bw = int(m.group(1)) if m else 0
# next non-empty non-comment line is URI
for j in range(i+1, len(lines)):
u = lines[j].strip()
if u and not u.startswith("#"):
variant_infos.append((bw, urljoin(base_url, u)))
break
if not variant_infos:
return None
# pick highest bandwidth
variant_infos.sort(key=lambda x: x[0], reverse=True)
return variant_infos[0][1]
def parse_segments(m3u8_text, base_url):
"""
Parse segment URLs, return (segments_list, key_info)
key_info = dict with {method, uri, iv_bytes or None} if AES-128 present, else None
"""
lines = m3u8_text.strip().splitlines()
segments = []
key = None
last_iv = None
for i,line in enumerate(lines):
line = line.strip()
if not line or line.startswith("#") is False and not line.startswith("#EXT"):
pass
if line.startswith("#EXT-X-KEY"):
m_method = re.search(r'METHOD=([^,]+)', line)
m_uri = re.search(r'URI="([^"]+)"', line)
m_iv = re.search(r'IV=([^,]+)', line)
method = m_method.group(1) if m_method else None
uri = urljoin(base_url, m_uri.group(1)) if m_uri else None
iv = None
if m_iv:
ivtxt = m_iv.group(1)
if ivtxt.startswith("0x") or ivtxt.startswith("0X"):
iv = bytes.fromhex(ivtxt[2:])
else:
try:
iv = bytes.fromhex(ivtxt)
except Exception:
iv = None
key = {"method": method, "uri": uri, "iv": iv}
if line and not line.startswith("#"):
seg_url = urljoin(base_url, line)
segments.append(seg_url)
return segments, key
def download_segment(idx, url, dest_path, session, retries=3, key=None):
"""
Download a single segment, optionally decrypt with AES-128 if key is provided.
key: dict with method, uri (used to fetch key bytes), iv (bytes) or None
"""
tmp_path = dest_path + ".part"
if os.path.exists(dest_path):
return True, dest_path # already exists
for attempt in range(1, retries+1):
try:
r = session.get(url, headers=HEADERS, timeout=30, stream=True)
r.raise_for_status()
data = r.content
if key and key.get("method", "").upper() == "AES-128":
if not have_crypto:
raise RuntimeError("AES decryption requires pycryptodome, which is not installed.")
if not key.get("_key_bytes"):
kb = session.get(key["uri"], headers=HEADERS, timeout=15)
kb.raise_for_status()
key["_key_bytes"] = kb.content
iv = key.get("iv") or b'\x00' * 16
cipher = AES.new(key["_key_bytes"], AES.MODE_CBC, iv=iv)
data = cipher.decrypt(data)
with open(tmp_path, "wb") as f:
f.write(data)
os.replace(tmp_path, dest_path)
return True, dest_path
except Exception as e:
if attempt == retries:
return False, str(e)
return False, "unknown error"
def merge_with_ffmpeg(segment_files, output_file):
"""
Merge segments using ffmpeg concat demuxer.
"""
tmpdir = os.path.dirname(output_file) or "."
list_file = os.path.join(tmpdir, "ff_concat_list.txt")
with open(list_file, "w", encoding="utf-8") as f:
for seg in segment_files:
f.write(f"file '{os.path.abspath(seg)}'\n")
cmd = ["ffmpeg", "-hide_banner", "-loglevel", "error", "-f", "concat", "-safe", "0",
"-i", list_file, "-c", "copy", output_file]
try:
subprocess.check_call(cmd)
os.remove(list_file)
return True, None
except subprocess.CalledProcessError as e:
return False, str(e)
# ----- main download logic -----
def download_m3u8(url, out, threads=16, tmp=None, keep_ts=True, session=None):
session = session or requests.Session()
base_url = url
text = fetch_text(url, session=session)
variant = pick_variant(text, base_url)
if variant:
print(f"Variant playlist detected, using variant: {variant}")
base_url = variant
text = fetch_text(variant, session=session)
base_for_join = base_url.rsplit("/", 1)[0] + "/"
segments, key = parse_segments(text, base_for_join)
if not segments:
raise RuntimeError("No segments found — possibly not a valid m3u8.")
print(f"Found {len(segments)} segments, threads={threads}")
tmpdir = tmp or tempfile.mkdtemp(prefix="m3u8_dl_")
os.makedirs(tmpdir, exist_ok=True)
seg_files = [os.path.join(tmpdir, f"{i:06d}.ts") for i in range(len(segments))]
if key and key.get("method", "").upper() == "AES-128":
if not have_crypto:
print("Warning: AES-128 encryption detected, but pycryptodome not installed.")
else:
print(f"AES-128 encryption detected, key uri: {key.get('uri')}")
failures = []
pbar = tqdm(total=len(segments), desc="Downloading", unit="seg") if tqdm else None
with ThreadPoolExecutor(max_workers=threads) as exe:
future_to_idx = {}
for idx, seg_url in enumerate(segments):
dest = seg_files[idx]
future = exe.submit(download_segment, idx, seg_url, dest, session, 5, key)
future_to_idx[future] = (idx, seg_url, dest)
for fut in as_completed(future_to_idx):
idx, seg_url, dest = future_to_idx[fut]
ok, info = fut.result()
if not ok:
failures.append((idx, seg_url, info))
if pbar:
pbar.update(1)
if pbar:
pbar.close()
if failures:
print(f"{len(failures)} segments failed (index, url, error):")
for f in failures[:10]:
print(f)
raise RuntimeError("Download failed, stopping merge.")
ffmpeg_path = shutil.which("ffmpeg")
if ffmpeg_path:
print("ffmpeg detected, merging into final file...")
success, err = merge_with_ffmpeg(seg_files, out)
if not success:
print(f"ffmpeg merge failed: {err}\nFalling back to TS concatenation: {out}.ts")
else:
print(f"Merged -> {out}")
if not keep_ts:
try:
shutil.rmtree(tmpdir)
except Exception:
pass
return out
print("No ffmpeg or merge failed, concatenating .ts segments manually.")
out_ts = out if out.lower().endswith(".ts") else out + ".ts"
with open(out_ts, "wb") as outf:
for seg in seg_files:
with open(seg, "rb") as f:
shutil.copyfileobj(f, outf)
print(f"Concatenated -> {out_ts}")
if not keep_ts:
try:
shutil.rmtree(tmpdir)
except Exception:
pass
return out_ts
# ----- CLI -----
def cil(url: str, output_base: str):
parser = argparse.ArgumentParser(description="Multithreaded M3U8 (HLS) Video Downloader & Merger")
parser.add_argument("--keep-ts", action="store_true", help="Keep temporary TS files")
args = parser.parse_args()
output = output_base + ".mp4"
temp = output_base + "_temp"
print(f"Starting download for output_base: {output_base}")
try:
out = download_m3u8(url, output, threads=32, tmp=temp, keep_ts=True)
print("Completed:", out)
except Exception as e:
print("Error:", e)
sys.exit(1)
if __name__ == "__main__":
videos = [
# {
# "url": "video1.m3u8",
# "name": "Video-1",
# },
# {
# "url": "video2.m3u8",
# "name": "Video-2"
# },
# {
# "url": "video3.m3u8",
# "name": "Video-3"
# }
]
if len(videos) == 0:
print("No video download resources provided")
sys.exit(0)
for video in videos:
cil(url=video["url"], output_base=video["name"])