Skip to content

M3U Python Download Script

Features

  • Accepts an m3u8 URL (supports variant playlists) and automatically selects the highest bandwidth variant if available.
  • Downloads each segment using multithreading (default 16 threads, configurable).
  • Supports AES-128 decryption (if EXT-X-KEY METHOD=AES-128 is used; requires pycryptodome).
  • Merges segments: if ffmpeg is detected, it merges into an MP4; otherwise, it concatenates .ts segments into output.ts.
  • Supports resume: existing segments are skipped automatically.

Python Code Overview

The script works in these steps:

  1. Fetch M3U8 playlist Download the playlist text and, if it's a variant playlist, pick the highest bandwidth variant.

  2. Parse segments & key

    • Extract all segment URLs.
    • Detect AES-128 encryption and parse the key URI and IV if provided.
  3. Download segments with threads

    • Each segment is downloaded concurrently.
    • AES-128 decryption is applied if needed.
    • Already existing files are skipped (supports resume).
  4. Merge segments

    • Prefer using ffmpeg for merging (handles TS -> MP4 seamlessly).
    • Fallback: binary concatenate .ts files if ffmpeg is not available.

Key Functions

Pick highest bandwidth variant

python
def pick_variant(m3u8_text, base_url):
    """
    Parse variant playlist (#EXT-X-STREAM-INF) and return URL of highest bandwidth variant.
    """

Parse segments and optional AES key

python
def parse_segments(m3u8_text, base_url):
    """
    Return (segments_list, key_info) where key_info = {method, uri, iv} if AES-128 exists.
    """

Download a single segment with optional decryption

python
def download_segment(idx, url, dest_path, session, retries=3, key=None):
    """
    Download one segment; if AES-128 key is provided, decrypt the segment.
    """

Merge segments using ffmpeg

python
def merge_with_ffmpeg(segment_files, output_file):
    """
    Use ffmpeg concat demuxer to merge TS segments into a single file.
    """

Main download logic

python
def download_m3u8(url, out, threads=16, tmp=None, keep_ts=True, session=None):
    """
    - Select variant if present
    - Parse segments and key
    - Download segments in parallel
    - Merge segments with ffmpeg or fallback to binary concatenation
    """

CLI Usage

python
def cil(url: str, output_base: str):
    """
    Wrapper for argparse CLI. Downloads a single video.
    """
  • Example usage:
python
videos = [
    {"url": "https://example.com/video1.m3u8", "name": "Video-1"},
    {"url": "https://example.com/video2.m3u8", "name": "Video-2"}
]

for video in videos:
    cil(url=video["url"], output_base=video["name"])

Practical Usage

  • Modern streaming video often uses HLS, splitting videos into small .ts segments for faster playback.
  • This script downloads all .ts segments and reconstructs the original MP4 file locally.
  • AES-128 encrypted streams can be handled if the key URI is provided in the playlist.

Notes

  • Dependencies: requests, tqdm, pycryptodome (optional for AES).
  • Multithreading: controlled via threads parameter (default 16).
  • Temporary files: stored in a temp folder; can be kept with --keep-ts.
  • ffmpeg: recommended for merging to MP4, fallback is binary concatenation to .ts.

This script is a robust solution for downloading and reconstructing HLS videos from m3u8 playlists, including variant streams and AES-128 encryption.

Full Code

python
import os
import re
import sys
import shutil
import argparse
import tempfile
import subprocess
from urllib.parse import urljoin, urlparse
from concurrent.futures import ThreadPoolExecutor, as_completed

try:
    import requests
except ImportError:
    print("Please install dependencies first: pip install requests tqdm")
    sys.exit(1)

try:
    from tqdm import tqdm
except Exception:
    tqdm = None

# optional crypto
try:
    from Crypto.Cipher import AES
    have_crypto = True
except Exception:
    have_crypto = False

# ----- helper funcs -----
HEADERS = {"User-Agent": "m3u8-downloader/1.0"}

def fetch_text(url, session=None, **kwargs):
    s = session or requests
    resp = s.get(url, headers=HEADERS, timeout=15, **kwargs)
    resp.raise_for_status()
    return resp.text

def fetch_bytes(url, session=None, **kwargs):
    s = session or requests
    resp = s.get(url, headers=HEADERS, timeout=20, stream=True, **kwargs)
    resp.raise_for_status()
    return resp.content

def pick_variant(m3u8_text, base_url):
    """
    If this is a variant playlist (#EXT-X-STREAM-INF), pick the variant with the highest bandwidth.
    Returns variant_url or None (if not a variant playlist).
    """
    lines = m3u8_text.strip().splitlines()
    variant_infos = []
    for i,line in enumerate(lines):
        if line.startswith("#EXT-X-STREAM-INF"):
            # parse bandwidth if present
            m = re.search(r'BANDWIDTH=(\d+)', line)
            bw = int(m.group(1)) if m else 0
            # next non-empty non-comment line is URI
            for j in range(i+1, len(lines)):
                u = lines[j].strip()
                if u and not u.startswith("#"):
                    variant_infos.append((bw, urljoin(base_url, u)))
                    break
    if not variant_infos:
        return None
    # pick highest bandwidth
    variant_infos.sort(key=lambda x: x[0], reverse=True)
    return variant_infos[0][1]

def parse_segments(m3u8_text, base_url):
    """
    Parse segment URLs, return (segments_list, key_info)
    key_info = dict with {method, uri, iv_bytes or None} if AES-128 present, else None
    """
    lines = m3u8_text.strip().splitlines()
    segments = []
    key = None
    last_iv = None

    for i,line in enumerate(lines):
        line = line.strip()
        if not line or line.startswith("#") is False and not line.startswith("#EXT"):
            pass
        if line.startswith("#EXT-X-KEY"):
            m_method = re.search(r'METHOD=([^,]+)', line)
            m_uri = re.search(r'URI="([^"]+)"', line)
            m_iv = re.search(r'IV=([^,]+)', line)
            method = m_method.group(1) if m_method else None
            uri = urljoin(base_url, m_uri.group(1)) if m_uri else None
            iv = None
            if m_iv:
                ivtxt = m_iv.group(1)
                if ivtxt.startswith("0x") or ivtxt.startswith("0X"):
                    iv = bytes.fromhex(ivtxt[2:])
                else:
                    try:
                        iv = bytes.fromhex(ivtxt)
                    except Exception:
                        iv = None
            key = {"method": method, "uri": uri, "iv": iv}
        if line and not line.startswith("#"):
            seg_url = urljoin(base_url, line)
            segments.append(seg_url)
    return segments, key

def download_segment(idx, url, dest_path, session, retries=3, key=None):
    """
    Download a single segment, optionally decrypt with AES-128 if key is provided.
    key: dict with method, uri (used to fetch key bytes), iv (bytes) or None
    """
    tmp_path = dest_path + ".part"
    if os.path.exists(dest_path):
        return True, dest_path  # already exists

    for attempt in range(1, retries+1):
        try:
            r = session.get(url, headers=HEADERS, timeout=30, stream=True)
            r.raise_for_status()
            data = r.content
            if key and key.get("method", "").upper() == "AES-128":
                if not have_crypto:
                    raise RuntimeError("AES decryption requires pycryptodome, which is not installed.")
                if not key.get("_key_bytes"):
                    kb = session.get(key["uri"], headers=HEADERS, timeout=15)
                    kb.raise_for_status()
                    key["_key_bytes"] = kb.content
                iv = key.get("iv") or b'\x00' * 16
                cipher = AES.new(key["_key_bytes"], AES.MODE_CBC, iv=iv)
                data = cipher.decrypt(data)
            with open(tmp_path, "wb") as f:
                f.write(data)
            os.replace(tmp_path, dest_path)
            return True, dest_path
        except Exception as e:
            if attempt == retries:
                return False, str(e)
    return False, "unknown error"

def merge_with_ffmpeg(segment_files, output_file):
    """
    Merge segments using ffmpeg concat demuxer.
    """
    tmpdir = os.path.dirname(output_file) or "."
    list_file = os.path.join(tmpdir, "ff_concat_list.txt")
    with open(list_file, "w", encoding="utf-8") as f:
        for seg in segment_files:
            f.write(f"file '{os.path.abspath(seg)}'\n")
    cmd = ["ffmpeg", "-hide_banner", "-loglevel", "error", "-f", "concat", "-safe", "0",
           "-i", list_file, "-c", "copy", output_file]
    try:
        subprocess.check_call(cmd)
        os.remove(list_file)
        return True, None
    except subprocess.CalledProcessError as e:
        return False, str(e)

# ----- main download logic -----

def download_m3u8(url, out, threads=16, tmp=None, keep_ts=True, session=None):
    session = session or requests.Session()
    base_url = url
    text = fetch_text(url, session=session)
    variant = pick_variant(text, base_url)
    if variant:
        print(f"Variant playlist detected, using variant: {variant}")
        base_url = variant
        text = fetch_text(variant, session=session)
    base_for_join = base_url.rsplit("/", 1)[0] + "/"

    segments, key = parse_segments(text, base_for_join)
    if not segments:
        raise RuntimeError("No segments found — possibly not a valid m3u8.")

    print(f"Found {len(segments)} segments, threads={threads}")

    tmpdir = tmp or tempfile.mkdtemp(prefix="m3u8_dl_")
    os.makedirs(tmpdir, exist_ok=True)
    seg_files = [os.path.join(tmpdir, f"{i:06d}.ts") for i in range(len(segments))]

    if key and key.get("method", "").upper() == "AES-128":
        if not have_crypto:
            print("Warning: AES-128 encryption detected, but pycryptodome not installed.")
        else:
            print(f"AES-128 encryption detected, key uri: {key.get('uri')}")

    failures = []
    pbar = tqdm(total=len(segments), desc="Downloading", unit="seg") if tqdm else None

    with ThreadPoolExecutor(max_workers=threads) as exe:
        future_to_idx = {}
        for idx, seg_url in enumerate(segments):
            dest = seg_files[idx]
            future = exe.submit(download_segment, idx, seg_url, dest, session, 5, key)
            future_to_idx[future] = (idx, seg_url, dest)
        for fut in as_completed(future_to_idx):
            idx, seg_url, dest = future_to_idx[fut]
            ok, info = fut.result()
            if not ok:
                failures.append((idx, seg_url, info))
            if pbar:
                pbar.update(1)
    if pbar:
        pbar.close()

    if failures:
        print(f"{len(failures)} segments failed (index, url, error):")
        for f in failures[:10]:
            print(f)
        raise RuntimeError("Download failed, stopping merge.")

    ffmpeg_path = shutil.which("ffmpeg")
    if ffmpeg_path:
        print("ffmpeg detected, merging into final file...")
        success, err = merge_with_ffmpeg(seg_files, out)
        if not success:
            print(f"ffmpeg merge failed: {err}\nFalling back to TS concatenation: {out}.ts")
        else:
            print(f"Merged -> {out}")
            if not keep_ts:
                try:
                    shutil.rmtree(tmpdir)
                except Exception:
                    pass
            return out

    print("No ffmpeg or merge failed, concatenating .ts segments manually.")
    out_ts = out if out.lower().endswith(".ts") else out + ".ts"
    with open(out_ts, "wb") as outf:
        for seg in seg_files:
            with open(seg, "rb") as f:
                shutil.copyfileobj(f, outf)
    print(f"Concatenated -> {out_ts}")
    if not keep_ts:
        try:
            shutil.rmtree(tmpdir)
        except Exception:
            pass
    return out_ts

# ----- CLI -----
def cil(url: str, output_base: str):
    parser = argparse.ArgumentParser(description="Multithreaded M3U8 (HLS) Video Downloader & Merger")
    parser.add_argument("--keep-ts", action="store_true", help="Keep temporary TS files")
    args = parser.parse_args()

    output = output_base + ".mp4"
    temp = output_base + "_temp"

    print(f"Starting download for output_base: {output_base}")
    try:
        out = download_m3u8(url, output, threads=32, tmp=temp, keep_ts=True)
        print("Completed:", out)
    except Exception as e:
        print("Error:", e)
        sys.exit(1)

if __name__ == "__main__":
    videos = [
        # {
        #     "url": "video1.m3u8",
        #     "name": "Video-1",
        # },
        # {
        #     "url": "video2.m3u8",
        #     "name": "Video-2"
        # },
        # {
        #     "url": "video3.m3u8",
        #     "name": "Video-3"
        # }
    ]

    if len(videos) == 0:
        print("No video download resources provided")
        sys.exit(0)

    for video in videos:
        cil(url=video["url"], output_base=video["name"])

Just something casual. Hope you like it. Built with VitePress