# 主要目的
防止镜像站或在传输的途中有人 “加料”
# 检测脚本(DeepSeek 生成)
import docker | |
import requests | |
from tqdm import tqdm  | |
# ============== 用户配置区域 ============== | |
PROXIES = { # 设置代理(若无代理则置空)  | |
"http": "http://your-proxy:8080",  | |
"https": "http://your-proxy:8080",  | |
} | |
# ======================================== | |
def parse_repository(repo):  | |
parts = repo.split('/')  | |
if not parts:  | |
return None, None, None  | |
    # 检查是否是 registry(包含。或:) | |
if '.' in parts[0] or ':' in parts[0]:  | |
registry = parts[0]  | |
remaining = parts[1:]  | |
else:  | |
registry = 'docker.io'  | |
        remaining = parts | |
    # 处理剩余部分 | |
if len(remaining) == 0:  | |
return registry, None, None  | |
elif len(remaining) == 1:  | |
namespace = 'library'  | |
image = remaining[0]  | |
else:  | |
namespace = '/'.join(remaining[:-1])  | |
image = remaining[-1]  | |
return registry, namespace, image  | |
def get_auth_token(repository):  | |
auth_url = f"https://auth.docker.io/token?service=registry.docker.io&scope=repository:{repository}:pull"  | |
try:  | |
response = requests.get(auth_url, proxies=PROXIES)  | |
response.raise_for_status()  | |
return response.json().get('token')  | |
except requests.exceptions.RequestException as e:  | |
print(f"获取认证token失败:{e}")  | |
return None  | |
def get_hub_digest(repository, tag):  | |
token = get_auth_token(repository)  | |
if not token:  | |
return None  | |
headers = {  | |
'Authorization': f'Bearer {token}',  | |
'Accept': 'application/vnd.docker.distribution.manifest.v2+json'  | |
    } | |
url = f"https://registry-1.docker.io/v2/{repository}/manifests/{tag}"  | |
try:  | |
response = requests.head(url, headers=headers, proxies=PROXIES)  | |
response.raise_for_status()  | |
return response.headers.get('Docker-Content-Digest')  | |
except requests.exceptions.RequestException as e:  | |
print(f"获取Docker Hub上的manifest失败:{e}")  | |
return None  | |
def main():  | |
client = docker.from_env()  | |
images = client.images.list()  | |
result_pass = []  | |
result_warn = []  | |
    # 使用 tqdm 添加进度条 | |
with tqdm(total=len(images), desc="扫描镜像", unit="image") as pbar:  | |
for image in images:  | |
repoTags = image.attrs.get('RepoTags', [])  | |
repoDigests = image.attrs.get('RepoDigests', [])  | |
for repo_tag in repoTags:  | |
                # 更新进度条描述 | |
pbar.set_postfix_str(f"正在检查:{repo_tag[:30]}")  | |
if '@' in repo_tag:  | |
continue # 跳过含 digest 的 repo_tag  | |
                # 分割 repository 和 tag | |
repo_tag_parts = repo_tag.rsplit(':', 1)  | |
if len(repo_tag_parts) == 2:  | |
repository, tag = repo_tag_parts  | |
else:  | |
repository = repo_tag_parts[0]  | |
tag = 'latest'  | |
                # 解析 repository | |
registry, namespace, image_name = parse_repository(repository)  | |
if not registry or not namespace or not image_name:  | |
                    continue | |
                # 仅处理 docker.io/library 的官方镜像 | |
                #if registry == 'docker.io' and namespace == 'library': | |
                # 仅处理 docker.io 的官方镜像 | |
if registry == 'docker.io':  | |
digest = None  | |
                    # 在 repoDigests 中查找匹配的 digest | |
for d in repoDigests:  | |
if '@' not in d:  | |
                            continue | |
repo_part, digest_part = d.split('@', 1)  | |
d_registry, d_namespace, d_image = parse_repository(repo_part)  | |
if d_registry == registry and d_namespace == namespace and d_image == image_name:  | |
                            digest = digest_part | |
                            break | |
if not digest:  | |
print(f"镜像 {repo_tag} 没有找到对应的Repo Digest,跳过")  | |
result_pass.append(repo_tag)  | |
                        continue | |
                    # 构建 Docker Hub 的仓库名 | |
hub_repo = f"{namespace}/{image_name}"  | |
                    # 获取 Docker Hub 的 digest | |
hub_digest = get_hub_digest(hub_repo, tag)  | |
if not hub_digest:  | |
print(f"无法获取 {hub_repo}:{tag} 的Docker Hub digest")  | |
result_warn.append(f"无法获取 {hub_repo}:{tag} 的Docker Hub digest")  | |
                        continue | |
                    # 比较 digest | |
if hub_digest == digest:  | |
print(f"镜像 {repo_tag} 的digest匹配")  | |
else:  | |
print(f"镜像 {repo_tag} 的digest不匹配。本地: {digest}, 远程: {hub_digest}")  | |
result_warn.append(f"镜像 {repo_tag} 的digest不匹配")  | |
else:  | |
print(f"镜像 {repo_tag} registry {registry},跳过")  | |
result_pass.append(repo_tag)  | |
            # 更新进度条 | |
pbar.update(1)  | |
pbar.refresh()  | |
print('-' * 10)  | |
print()  | |
print()  | |
print('跳过' + ', '.join(result_pass))  | |
print()  | |
print('\n'.join(result_warn))  | |
if __name__ == "__main__":  | |
main()  | 
# 安装依赖
pip install docker requests tqdm  | 
# 运行效果
执行脚本后,它将遍历本地所有 Docker 镜像,识别官方镜像,并对比本地与 Docker Hub 的 Digest。
此脚本能够有效地检查本地官方镜像的 Digest 是否与 Docker Hub 一致,帮助确认镜像的完整性和更新状态。
# 注意事项
- 
认证限制:Docker Hub API 有请求频率限制,频繁请求可能导致临时封禁。
 - 
网络连接:确保运行环境可以访问 Docker Hub(可通过更改
PROXIES来配置代理)。 - 
官方镜像判断:脚本严格判断镜像属于
docker.io/library仓库,其他仓库的镜像将被排除。 - 
错误处理:脚本包含基础错误处理,但部分异常可能需要根据实际情况扩展。