简介

用于搬迁文档到Halo 博客系统(通过文章导入工具批量上传md文档)

这是一个用于批量下载 Markdown 文件中图片外链的工具脚本。它可以自动扫描指定目录下的所有 Markdown 文件,提取其中的图片链接(http/https),下载到本地,并自动将 Markdown 文件中的图片链接替换为本地路径。

主要功能

  • ✅ 递归扫描指定目录下的所有 .md 文件
  • ✅ 提取 Markdown 中的图片外链(支持 ![alt](url) 语法)
  • ✅ 下载图片到本地(支持重试机制,最多3次)
  • ✅ 支持两种图片存储模式:
    • 相对路径模式:每个 Markdown 文件对应一个 .assets 目录,使用相对路径引用
    • 绝对路径模式:所有图片统一存放在指定目录,保持原目录结构,使用绝对路径引用
  • ✅ 自动替换 Markdown 文件中的图片链接
  • ✅ 自动清理下载过程中产生的空目录
  • ✅ 生成详细的处理日志(控制台 + 文件)

结果展示

image-20260515142001158

相对路径模式

md文件目录/
├── 文档1.md
├── 文档1.assets/
│   ├── image_20250106143022_a1b2c3.png
│   └── image_20250106143025_d4e5f6.jpg
├── 子目录/
│   ├── 文档2.md
│   └── 文档2.assets/
│       └── image_20250106143030_g7h8i9.png

Markdown 中的链接会变成:

![图片](./文档1.assets/image_20250106143022_a1b2c3.png)

绝对路径模式

image-20260515141758987

md文件目录/
├── 文档1.md
├── 子目录/
│   ├── 文档2.md
指定的图片目录/
├── image_20250106143022_a1b2c3.png
├── image_20250106143025_d4e5f6.jpg
├── image_20250106143030_g7h8i9.png

Markdown 中的链接会变成:

![图片](指定的图片目录/image_20250106143022_a1b2c3.png)

注意事项

  1. 备份重要文件:脚本会直接修改原始 Markdown 文件,建议运行前备份
  2. 网络要求:需要能够访问外链图片的服务器
  3. 文件名唯一性:下载的图片会使用 image_时间戳_UUID 格式命名,避免重名冲突
  4. 权限问题:确保对目标目录有读写权限

常见问题

Q: 某些图片下载失败怎么办?

A: 脚本会保留原始链接,失败的文件会在汇总日志中列出。可以:

  • 检查网络连接
  • 确认图片 URL 是否有效
  • 手动下载失败的图片

Q: 如何只处理特定文件?

A: 可以将需要处理的 Markdown 文件单独放在一个目录中,然后扫描该目录。


源码

PixPin_2026-05-15_17-57-34

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Time   : 2025/01/06 14:52 周一
Author : Leo
Version: v1.0
Desc   : Markdown外链图片下载器
         功能:
         1. 扫描指定目录下的所有 Markdown 文件
         2. 提取文件中的所有图片外链(http/https)
         3. 下载图片到本地(支持两种引用模式)
            - 相对路径模式:下载到每个文件对应的 .assets 目录,使用相对路径引用
            - 绝对路径模式:统一下载到指定目录,使用绝对路径引用
         4. 自动替换 Markdown 中的图片链接为本地路径
         5. 支持下载重试机制,自动清理空目录
"""
import datetime
import os
import re
import time
import uuid
import requests
from urllib.parse import urlparse

# 全局配置:图片存储模式
# mode = "relative"  # 相对路径模式:下载到每个文件对应的 .assets 目录下,使用相对路径
# mode = "absolute"  # 绝对路径模式:统一下载到指定目录下,使用绝对路径
IMAGE_STORE_MODE = "relative"  # 默认使用相对路径模式
UNIFIED_IMAGE_DIR = ""  # 统一图片目录,当 IMAGE_STORE_MODE = "absolute" 时必须设置


def get_logger(name=None, log_dir="logs"):
    """
    创建一个日志记录器对象,并设置日志输出到控制台和文件。

    :param name: 可选,日志记录器的名称。如果为 None,则自动获取调用脚本的名称。
    :param log_dir: 可选,日志记录器的保存目录。默认为logs
    :return: 配置好的日志记录器对象。
    """
    import logging
    import inspect
    import os
    # 获取调用脚本的文件名(不含扩展名)
    if name is None:
        frame = inspect.stack()[1]
        module = inspect.getmodule(frame[0])
        script_name = os.path.splitext(os.path.basename(module.__file__))[0] if module else "unknown_script"
    else:
        script_name = name

    # 创建一个日志记录器对象
    logger = logging.getLogger(name)

    # 定义日志格式,包含时间戳、日志级别、文件名、函数名、行号和日志消息
    formatter = logging.Formatter(
        # '[%(asctime)s-%(levelname)s-%(filename)s][%(funcName)s-%(lineno)d]--%(message)s'
        '[%(asctime)s][%(funcName)s-%(lineno)d][%(levelname)s]--%(message)s',
        datefmt='%Y-%m-%d %H:%M:%S'
    )

    # 设置日志记录器的日志级别为 INFO
    # logger.setLevel(logging.DEBUG)
    logger.setLevel(logging.INFO)

    # 检查日志记录器是否已经有处理程序,防止重复添加
    if not logger.handlers:
        # 控制台日志(输出到屏幕)
        console_handler = logging.StreamHandler()
        console_handler.setFormatter(formatter)
        logger.addHandler(console_handler)

        # 创建日志目录
        os.makedirs(log_dir, exist_ok=True)

        # 创建文件处理程序,用于将日志写入指定文件
        file_handler = logging.FileHandler(
            filename=os.path.join(log_dir, f"{script_name}.txt"),  # 设置日志文件路径
            mode='a',  # 文件模式,'a' 表示追加模式,不会覆盖已有内容
            encoding='utf-8'  # 日志文件的编码格式为 UTF-8
        )
        file_handler.setFormatter(formatter)  # 设置日志处理程序的格式化器为上面定义的 formatter
        logger.addHandler(file_handler)  # 将该文件处理程序添加到日志记录器中处理日志

    return logger


def get_file_link(file_path, link_array=None):
    """
    获取文件内的所有图片链接
    :param file_path: 文件路径
    :param link_array: 存储文件内链接的数组,可传可不传
    :return: 包含图片链接的列表
    """
    if link_array is None:
        link_array = []

    # 读取文件内容
    with open(file_path, "r", encoding="utf-8") as file:
        file_data = file.read()

    # 正则表达式匹配 Markdown 图片语法:![alt](url),并确保 url 以 http 或 https 开头
    file_link_reg = re.compile(r"!\[.*?\]\((https?://.*?)\)", re.IGNORECASE)
    temp_result = file_link_reg.findall(file_data)

    # 将匹配到的链接添加到列表中
    link_array.extend(temp_result)
    return link_array


def read_all_file_list(directory, files_list=None):
    """
    获取指定目录下的所有 Markdown 文件列表
    :param directory: 目录路径
    :param files_list: 暂存读取的文件列表
    :return: 包含所有 Markdown 文件路径的列表
    """
    if files_list is None:
        files_list = []

    # 遍历目录
    for item in os.listdir(directory):
        full_path = os.path.join(directory, item)
        if os.path.isdir(full_path):
            # 如果是目录,递归读取
            read_all_file_list(full_path, files_list)
        else:
            # 如果是 Markdown 文件,添加到列表中
            if full_path.endswith(".md"):
                files_list.append(full_path)
    return files_list


def download_image(url, save_path, retries=3):
    """
    下载图片并保存到指定路径,支持重试机制
    """
    for attempt in range(retries):
        try:
            # 设置请求头,模拟浏览器请求
            headers = {
                "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36 Edg/131.0.0.0",
                "accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7",
                "accept-encoding": "gzip, deflate, br, zstd",
                "accept-language": "zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6",
                "cache-control": "max-age=0",
                "priority": "u=0, i",
                "sec-ch-ua": '"Microsoft Edge";v="131", "Chromium";v="131", "Not_A Brand";v="24"',
                "sec-ch-ua-mobile": "?0",
                "sec-ch-ua-platform": '"Windows"',
                "sec-fetch-dest": "document",
                "sec-fetch-mode": "navigate",
                "sec-fetch-site": "none",
                "sec-fetch-user": "?1",
                "upgrade-insecure-requests": "1",
            }
            response = requests.get(url, headers=headers, stream=True, timeout=30)
            if response.status_code == 200:
                # 确保目录存在
                os.makedirs(os.path.dirname(save_path), exist_ok=True)
                with open(save_path, "wb") as file:
                    for chunk in response.iter_content(1024):
                        file.write(chunk)
                print(f"下载成功: {url} -> {save_path}")
                return True
            else:
                logging.warning(f"下载失败: {url} (状态码: {response.status_code})")
        except Exception as e:
            logging.error(f"下载失败: {url} (错误: {str(e)})")
        if attempt < retries - 1:
            logging.info(f"重试中... ({attempt + 1}/{retries})")

        # 添加 1 秒延时
        time.sleep(1)

    return False


def get_image_name(file_name, has_extension, keep_original_name=False):
    """
    从图片链接中提取文件名,并使用时间戳和 UUID 确保唯一性

    :param keep_original_name: 是否保留原文件名(True=保留,False=重置)
    """
    timestamp = datetime.datetime.now().strftime('%Y%m%d%H%M%S')
    unique_suffix = uuid.uuid4().hex[:12]

    # 如果不保留原文件名
    if not keep_original_name:
        # 如果有扩展名,使用原扩展名;否则使用 .png
        if has_extension and file_name:
            name, ext = os.path.splitext(file_name)
            return f"image_{timestamp}_{unique_suffix}{ext}"
        else:
            return f"image_{timestamp}_{unique_suffix}.png"

    # 保留原文件名
    if not has_extension:
        file_name += ".png"

    # 分离文件名和扩展名
    base_name, ext = os.path.splitext(file_name)
    # 组合:原名_时间戳_UUID.扩展名
    return f"{base_name}_{timestamp}_{unique_suffix}{ext}"


def replace_image_links(markdown_content, success_urls, failed_urls, markdown_file_path):
    """
    将 Markdown 内容中的图片外链替换为相对路径或绝对路径
    """
    for url, local_path in success_urls.items():
        if IMAGE_STORE_MODE == "relative":
            # 相对路径模式:获取相对于 Markdown 文件的相对路径
            relative_path = os.path.relpath(local_path, start=os.path.dirname(markdown_file_path)).replace(os.sep, "/")
            # 确保路径包含 .assets 目录
            if not relative_path.startswith(f"{os.path.basename(os.path.dirname(local_path))}/"):
                relative_path = f"{os.path.basename(os.path.dirname(local_path))}/{relative_path}"
            markdown_content = markdown_content.replace(url, relative_path)
            print(f"替换成功: {url} -> {relative_path}")
        else:  # absolute 模式
            # 绝对路径模式:使用绝对路径(将反斜杠转换为正斜杠,保持跨平台兼容性)
            absolute_path = os.path.abspath(local_path).replace(os.sep, "/")
            markdown_content = markdown_content.replace(url, absolute_path)
            print(f"替换成功: {url} -> {absolute_path}")

    # 对于下载失败的 URL,保持原链接
    for url in failed_urls:
        logging.warning(f"保持原链接: {url}")

    return markdown_content


def process_markdown_file(file_path):
    """
    处理单个 Markdown 文件:提取图片链接、下载图片并替换链接
    """
    global UNIFIED_IMAGE_DIR

    # 获取文件所在目录
    markdown_dir = os.path.dirname(file_path)

    # 根据模式确定图片保存目录
    if IMAGE_STORE_MODE == "relative":
        # 相对路径模式:图片保存在 .assets 目录下
        image_dir = os.path.join(markdown_dir, f"{os.path.splitext(os.path.basename(file_path))[0]}.assets")
        logging.info(f"图片保存目录 (相对路径模式): {image_dir}")
    else:  # absolute 模式
        # 绝对路径模式:图片统一保存在指定目录下
        if not UNIFIED_IMAGE_DIR:
            # 如果没有设置统一目录,则使用默认目录
            UNIFIED_IMAGE_DIR = os.path.join(markdown_dir, "images")
            logging.warning(f"未设置统一图片目录,使用默认目录: {UNIFIED_IMAGE_DIR}")

        # 在统一目录下,按原 Markdown 文件的相对路径创建子目录结构
        # 获取相对于扫描根目录的相对路径
        rel_path = os.path.relpath(markdown_dir, start=SCAN_ROOT_DIR) if 'SCAN_ROOT_DIR' in globals() else ""
        if rel_path == ".":
            rel_path = ""

        # 构建完整的图片保存路径
        image_dir = os.path.join(UNIFIED_IMAGE_DIR, rel_path)
        logging.info(f"图片保存目录 (绝对路径模式): {image_dir}")

    # 创建图片保存目录(如果不存在)
    os.makedirs(image_dir, exist_ok=True)

    # 读取 Markdown 文件内容
    with open(file_path, "r", encoding="utf-8") as file:
        markdown_content = file.read()

    # 提取图片外链
    image_urls = get_file_link(file_path)
    if not image_urls:
        logging.warning(f"未找到图片外链: {file_path}")
        return True, set()  # 返回默认值,表示文件处理成功且没有失败的 URL

    # 记录下载成功的 URL 和对应的本地路径
    success_urls = {}
    # 记录下载失败的 URL
    failed_urls = set()

    # 下载图片并保存到本地
    for url in image_urls:
        # 提取文件名
        file_name = os.path.basename(urlparse(url).path)
        # 检查文件名是否包含常见的图片扩展名
        has_extension = bool(re.search(r"\.(jpg|bmp|gif|ico|pcx|jpeg|tif|png|raw|tga)$", file_name, re.IGNORECASE))

        # 生成图片文件名
        image_name = get_image_name(file_name, has_extension)

        # 生成保存路径
        save_path = os.path.join(image_dir, image_name).replace(os.sep, "/")

        # 下载图片
        if download_image(url, save_path):
            success_urls[url] = save_path
        else:
            failed_urls.add(url)

    # 替换 Markdown 文件中的图片外链
    updated_content = replace_image_links(markdown_content, success_urls, failed_urls, file_path)

    # 将更新后的内容写回 Markdown 文件 (覆盖原文件)
    with open(file_path, "w", encoding="utf-8") as file:
        file.write(updated_content)

    # 输出执行情况
    logging.info(f"图片下载成功数: {len(success_urls)}")
    logging.info(f"图片下载失败数: {len(failed_urls)}")
    if failed_urls:
        logging.warning("失败URL:")
        for url in failed_urls:
            logging.warning(f"  - {url}")

    return len(failed_urls) == 0, failed_urls, len(success_urls), len(failed_urls)


def remove_empty_dirs(directory):
    """
    递归删除指定目录下的所有空目录

    Args:
        directory (str): 要清理的目录路径
    """
    if not os.path.isdir(directory):
        logging.error(f"错误:{directory} 不是一个有效的目录")
        return

    deleted_count = 0

    # 遍历目录
    for root, dirs, files in os.walk(directory, topdown=False):
        # topdown=False 表示从最深层目录开始遍历

        for dir_name in dirs:
            dir_path = os.path.join(root, dir_name)

            try:
                # 检查目录是否为空
                if not os.listdir(dir_path):
                    os.rmdir(dir_path)
                    logging.info(f"已删除空目录: {dir_path}")
                    deleted_count += 1
            except OSError as e:
                logging.error(f"无法删除目录 {dir_path}: {e}")
            except Exception as e:
                logging.error(f"检查目录 {dir_path} 时出错: {e}")

    logging.info(f"清理完成,共删除 {deleted_count} 个空目录")


def select_mode():
    """
    交互式选择图片存储模式
    """
    global IMAGE_STORE_MODE, UNIFIED_IMAGE_DIR

    print("\n请选择图片存储模式:")
    print("1. 相对路径模式(默认)")
    print("   - 图片下载到每个 Markdown 文件对应的 .assets 目录下")
    print("   - Markdown 中使用相对路径引用图片")
    print("2. 绝对路径模式")
    print("   - 图片统一下载到指定目录下")
    print("   - Markdown 中使用绝对路径引用图片")

    while True:
        choice = input("\n请输入选择 (1 或 2,默认为 1): ").strip()
        if choice == "" or choice == "1":
            IMAGE_STORE_MODE = "relative"
            print("已选择:相对路径模式")
            break
        elif choice == "2":
            IMAGE_STORE_MODE = "absolute"
            print("已选择:绝对路径模式")

            # 询问统一图片目录
            default_dir = os.path.join(os.getcwd(), "imgs")
            unified_dir = input(f"请输入统一图片存放目录(默认为 {default_dir}): ").strip()
            if not unified_dir:
                UNIFIED_IMAGE_DIR = default_dir
            else:
                UNIFIED_IMAGE_DIR = unified_dir

            print(f"图片将统一保存到: {UNIFIED_IMAGE_DIR}")
            break
        else:
            print("输入无效,请输入 1 或 2")


def main():
    global SCAN_ROOT_DIR

    # 交互式选择模式
    select_mode()

    # 用户输入指定目录
    directory = input("\n请输入要扫描的目录路径: ").strip()
    if not os.path.isdir(directory):
        print("输入的路径不是有效的目录")
        return

    # 保存扫描根目录,用于绝对路径模式下保持目录结构
    SCAN_ROOT_DIR = os.path.abspath(directory)

    logging.info(f'============================= 日志分割线 =============================')
    logging.info(f"图片存储模式: {IMAGE_STORE_MODE}")
    if IMAGE_STORE_MODE == "absolute":
        logging.info(f"统一图片目录: {UNIFIED_IMAGE_DIR}")

    # 处理每个 Markdown 文件
    md_files = read_all_file_list(directory)

    success_files = 0
    failed_files = []
    failed_files_details = {}
    file_stats = []  # 存储每个文件的统计信息

    logging.info(f"Markdown 文件数量: {len(md_files)}")
    for i, file_path in enumerate(md_files, 1):
        logging.info(f"正在处理第{i}个文件: {file_path}")
        is_success, failed_urls, success_count, failed_count = process_markdown_file(file_path)

        # 记录文件统计信息
        file_stats.append({
            'file_path': file_path,
            'success_count': success_count,
            'failed_count': failed_count,
            'is_success': is_success
        })

        if is_success:
            success_files += 1
        else:
            failed_files.append(file_path)
            failed_files_details[file_path] = failed_urls

    # 新增:文件明细板块
    logging.info(f'============================= 文件明细 =============================')
    # 统计总体图片下载情况
    total_success = sum(stat['success_count'] for stat in file_stats)
    total_failed = sum(stat['failed_count'] for stat in file_stats)
    logging.info(
        f"总体统计: 总计处理文件数: {len(file_stats)}, 总成功图片数: {total_success}, 总失败图片数: {total_failed}")
    logging.info(f"{'序号':<4} {'成功数':<6} {'失败数':<6} {'状态':<8} 文件路径")
    logging.info(f"{'-' * 80}")

    for idx, stat in enumerate(file_stats, 1):
        status = "成功" if stat['is_success'] else "部分失败"
        # 限制文件路径长度,避免日志过长(可选)
        file_path_display = stat['file_path']
        if len(file_path_display) > 60:
            file_path_display = "..." + file_path_display[-57:]

        logging.info(f"{idx:<4} {stat['success_count']:<6} {stat['failed_count']:<6} {status:<8} {file_path_display}")

    logging.info(f"{'-' * 80}")

    # 汇总日志
    logging.info(f'============================= 汇总日志 =============================')

    logging.info(f"全部成功文件数: {success_files}")
    logging.info(f"有失败记录文件数: {len(failed_files)}")
    if failed_files:
        logging.warning("失败文件及其失败的 URL:")
        for file_path in failed_files:
            logging.warning(f"  - 失败文件: {file_path}")
            for url in failed_files_details[file_path]:
                logging.warning(f"    - 失败 URL: {url}")

    # 删除无效目录(空目录)- 仅在相对路径模式下执行
    if IMAGE_STORE_MODE == "relative":
        remove_empty_dirs(directory)
    else:
        logging.info("绝对路径模式下跳过空目录清理")


if __name__ == "__main__":
    # 初始化日志记录器
    logging = get_logger()
    start = time.time()
    main()
    end = time.time()
    logging.info(f'共耗时: {round(end - start, 2)}秒')