# 主要目的
防止镜像站或在传输的途中有人 “加料”
# 检测脚本(DeepSeek 生成)
import docker | |
import requests | |
from tqdm import tqdm | |
# ============== 用户配置区域 ============== | |
PROXIES = { # 设置代理(若无代理则置空) | |
"http": "http://your-proxy:8080", | |
"https": "http://your-proxy:8080", | |
} | |
# ======================================== | |
def parse_repository(repo): | |
parts = repo.split('/') | |
if not parts: | |
return None, None, None | |
# 检查是否是 registry(包含。或:) | |
if '.' in parts[0] or ':' in parts[0]: | |
registry = parts[0] | |
remaining = parts[1:] | |
else: | |
registry = 'docker.io' | |
remaining = parts | |
# 处理剩余部分 | |
if len(remaining) == 0: | |
return registry, None, None | |
elif len(remaining) == 1: | |
namespace = 'library' | |
image = remaining[0] | |
else: | |
namespace = '/'.join(remaining[:-1]) | |
image = remaining[-1] | |
return registry, namespace, image | |
def get_auth_token(repository): | |
auth_url = f"https://auth.docker.io/token?service=registry.docker.io&scope=repository:{repository}:pull" | |
try: | |
response = requests.get(auth_url, proxies=PROXIES) | |
response.raise_for_status() | |
return response.json().get('token') | |
except requests.exceptions.RequestException as e: | |
print(f"获取认证token失败:{e}") | |
return None | |
def get_hub_digest(repository, tag): | |
token = get_auth_token(repository) | |
if not token: | |
return None | |
headers = { | |
'Authorization': f'Bearer {token}', | |
'Accept': 'application/vnd.docker.distribution.manifest.v2+json' | |
} | |
url = f"https://registry-1.docker.io/v2/{repository}/manifests/{tag}" | |
try: | |
response = requests.head(url, headers=headers, proxies=PROXIES) | |
response.raise_for_status() | |
return response.headers.get('Docker-Content-Digest') | |
except requests.exceptions.RequestException as e: | |
print(f"获取Docker Hub上的manifest失败:{e}") | |
return None | |
def main(): | |
client = docker.from_env() | |
images = client.images.list() | |
result_pass = [] | |
result_warn = [] | |
# 使用 tqdm 添加进度条 | |
with tqdm(total=len(images), desc="扫描镜像", unit="image") as pbar: | |
for image in images: | |
repoTags = image.attrs.get('RepoTags', []) | |
repoDigests = image.attrs.get('RepoDigests', []) | |
for repo_tag in repoTags: | |
# 更新进度条描述 | |
pbar.set_postfix_str(f"正在检查:{repo_tag[:30]}") | |
if '@' in repo_tag: | |
continue # 跳过含 digest 的 repo_tag | |
# 分割 repository 和 tag | |
repo_tag_parts = repo_tag.rsplit(':', 1) | |
if len(repo_tag_parts) == 2: | |
repository, tag = repo_tag_parts | |
else: | |
repository = repo_tag_parts[0] | |
tag = 'latest' | |
# 解析 repository | |
registry, namespace, image_name = parse_repository(repository) | |
if not registry or not namespace or not image_name: | |
continue | |
# 仅处理 docker.io/library 的官方镜像 | |
#if registry == 'docker.io' and namespace == 'library': | |
# 仅处理 docker.io 的官方镜像 | |
if registry == 'docker.io': | |
digest = None | |
# 在 repoDigests 中查找匹配的 digest | |
for d in repoDigests: | |
if '@' not in d: | |
continue | |
repo_part, digest_part = d.split('@', 1) | |
d_registry, d_namespace, d_image = parse_repository(repo_part) | |
if d_registry == registry and d_namespace == namespace and d_image == image_name: | |
digest = digest_part | |
break | |
if not digest: | |
print(f"镜像 {repo_tag} 没有找到对应的Repo Digest,跳过") | |
result_pass.append(repo_tag) | |
continue | |
# 构建 Docker Hub 的仓库名 | |
hub_repo = f"{namespace}/{image_name}" | |
# 获取 Docker Hub 的 digest | |
hub_digest = get_hub_digest(hub_repo, tag) | |
if not hub_digest: | |
print(f"无法获取 {hub_repo}:{tag} 的Docker Hub digest") | |
result_warn.append(f"无法获取 {hub_repo}:{tag} 的Docker Hub digest") | |
continue | |
# 比较 digest | |
if hub_digest == digest: | |
print(f"镜像 {repo_tag} 的digest匹配") | |
else: | |
print(f"镜像 {repo_tag} 的digest不匹配。本地: {digest}, 远程: {hub_digest}") | |
result_warn.append(f"镜像 {repo_tag} 的digest不匹配") | |
else: | |
print(f"镜像 {repo_tag} registry {registry},跳过") | |
result_pass.append(repo_tag) | |
# 更新进度条 | |
pbar.update(1) | |
pbar.refresh() | |
print('-' * 10) | |
print() | |
print() | |
print('跳过' + ', '.join(result_pass)) | |
print() | |
print('\n'.join(result_warn)) | |
if __name__ == "__main__": | |
main() |
# 安装依赖
pip install docker requests tqdm |
# 运行效果
执行脚本后,它将遍历本地所有 Docker 镜像,识别官方镜像,并对比本地与 Docker Hub 的 Digest。
此脚本能够有效地检查本地官方镜像的 Digest 是否与 Docker Hub 一致,帮助确认镜像的完整性和更新状态。
# 注意事项
-
认证限制:Docker Hub API 有请求频率限制,频繁请求可能导致临时封禁。
-
网络连接:确保运行环境可以访问 Docker Hub(可通过更改
PROXIES
来配置代理)。 -
官方镜像判断:脚本严格判断镜像属于
docker.io/library
仓库,其他仓库的镜像将被排除。 -
错误处理:脚本包含基础错误处理,但部分异常可能需要根据实际情况扩展。