# 主要目的

防止镜像站或在传输的途中有人 “加料”

# 检测脚本(DeepSeek 生成)

import docker
import requests
from tqdm import tqdm
# ============== 用户配置区域 ==============
PROXIES = {  # 设置代理(若无代理则置空)
    "http": "http://your-proxy:8080",
    "https": "http://your-proxy:8080",
}
# ========================================
def parse_repository(repo):
    parts = repo.split('/')
    if not parts:
        return None, None, None
    # 检查是否是 registry(包含。或:)
    if '.' in parts[0] or ':' in parts[0]:
        registry = parts[0]
        remaining = parts[1:]
    else:
        registry = 'docker.io'
        remaining = parts
    # 处理剩余部分
    if len(remaining) == 0:
        return registry, None, None
    elif len(remaining) == 1:
        namespace = 'library'
        image = remaining[0]
    else:
        namespace = '/'.join(remaining[:-1])
        image = remaining[-1]
    return registry, namespace, image
def get_auth_token(repository):
    auth_url = f"https://auth.docker.io/token?service=registry.docker.io&scope=repository:{repository}:pull"
    try:
        response = requests.get(auth_url, proxies=PROXIES)
        response.raise_for_status()
        return response.json().get('token')
    except requests.exceptions.RequestException as e:
        print(f"获取认证token失败:{e}")
        return None
def get_hub_digest(repository, tag):
    token = get_auth_token(repository)
    if not token:
        return None
    headers = {
        'Authorization': f'Bearer {token}',
        'Accept': 'application/vnd.docker.distribution.manifest.v2+json'
    }
    url = f"https://registry-1.docker.io/v2/{repository}/manifests/{tag}"
    try:
        response = requests.head(url, headers=headers, proxies=PROXIES)
        response.raise_for_status()
        return response.headers.get('Docker-Content-Digest')
    except requests.exceptions.RequestException as e:
        print(f"获取Docker Hub上的manifest失败:{e}")
        return None
def main():
    client = docker.from_env()
    images = client.images.list()
    result_pass = []
    result_warn = []
    
    # 使用 tqdm 添加进度条
    with tqdm(total=len(images), desc="扫描镜像", unit="image") as pbar:
        for image in images:
            repoTags = image.attrs.get('RepoTags', [])
            repoDigests = image.attrs.get('RepoDigests', [])
            
            for repo_tag in repoTags:
                # 更新进度条描述
                pbar.set_postfix_str(f"正在检查:{repo_tag[:30]}")
                
                if '@' in repo_tag:
                    continue  # 跳过含 digest 的 repo_tag
                # 分割 repository 和 tag
                repo_tag_parts = repo_tag.rsplit(':', 1)
                if len(repo_tag_parts) == 2:
                    repository, tag = repo_tag_parts
                else:
                    repository = repo_tag_parts[0]
                    tag = 'latest'
                # 解析 repository
                registry, namespace, image_name = parse_repository(repository)
                if not registry or not namespace or not image_name:
                    continue
                
                # 仅处理 docker.io/library 的官方镜像
                #if registry == 'docker.io' and namespace == 'library':
                # 仅处理 docker.io 的官方镜像
                if registry == 'docker.io':
                    digest = None
                    # 在 repoDigests 中查找匹配的 digest
                    for d in repoDigests:
                        if '@' not in d:
                            continue
                        repo_part, digest_part = d.split('@', 1)
                        d_registry, d_namespace, d_image = parse_repository(repo_part)
                        if d_registry == registry and d_namespace == namespace and d_image == image_name:
                            digest = digest_part
                            break
                    if not digest:
                        print(f"镜像 {repo_tag} 没有找到对应的Repo Digest,跳过")
                        result_pass.append(repo_tag)
                        continue
                    # 构建 Docker Hub 的仓库名
                    hub_repo = f"{namespace}/{image_name}"
                    # 获取 Docker Hub 的 digest
                    hub_digest = get_hub_digest(hub_repo, tag)
                    if not hub_digest:
                        print(f"无法获取 {hub_repo}:{tag} 的Docker Hub digest")
                        result_warn.append(f"无法获取 {hub_repo}:{tag} 的Docker Hub digest")
                        continue
                    # 比较 digest
                    if hub_digest == digest:
                        print(f"镜像 {repo_tag} 的digest匹配")
                    else:
                        print(f"镜像 {repo_tag} 的digest不匹配。本地: {digest}, 远程: {hub_digest}")
                        result_warn.append(f"镜像 {repo_tag} 的digest不匹配")
                else:
                    print(f"镜像 {repo_tag} registry {registry},跳过")
                    result_pass.append(repo_tag)
            # 更新进度条
            pbar.update(1)
            pbar.refresh()
    print('-' * 10)
    print()
    print()
    print('跳过' + ', '.join(result_pass))
    print()
    print('\n'.join(result_warn))
if __name__ == "__main__":
    main()

# 安装依赖

pip install docker requests tqdm

# 运行效果

执行脚本后,它将遍历本地所有 Docker 镜像,识别官方镜像,并对比本地与 Docker Hub 的 Digest。
此脚本能够有效地检查本地官方镜像的 Digest 是否与 Docker Hub 一致,帮助确认镜像的完整性和更新状态。

# 注意事项

  • 认证限制:Docker Hub API 有请求频率限制,频繁请求可能导致临时封禁。

  • 网络连接:确保运行环境可以访问 Docker Hub(可通过更改 PROXIES 来配置代理)。

  • 官方镜像判断:脚本严格判断镜像属于 docker.io/library 仓库,其他仓库的镜像将被排除。

  • 错误处理:脚本包含基础错误处理,但部分异常可能需要根据实际情况扩展。