参考
gitlab api文档:https://docs.gitlab.com/ee/api/groups.html
github:https://gist.github.com/linjunjj/440ea8b9b687b57f6c28217d58535e79
简书:https://www.jianshu.com/p/67d827fbb4e8
#!/usr/bin/python3
from urllib.request import urlopen
import json
import subprocess, shlex
import time
import os
# 默认使用 http 拉取,非 ssh
gitlabToken = 'kwxmuskShkUs-xxxxxxx' # person token (ps: 只有第一次创建时可见,不是feed token)
gitlabAddr = 'xxxxxx.com' # gitlab地址
targets =['xxx','xxx'] # 所属群组 (ps: 为空克隆所有群组,慎用)
withShared = 'false' # 是否包含共享的项目,默认false (ps:为true会拉取到归属其他群组的项目)
# -------------------------
counter = 0
procs = []
#download
def get_next(group_id):
global counter
global procs
print('get_next group_id:', group_id)
url = gen_next_url(group_id)
allProjects = urlopen(url)
allProjectsDict = json.loads(allProjects.read().decode())
if len(allProjectsDict) == 0:
return
for thisProject in allProjectsDict:
try:
thisProjectURL = thisProject['http_url_to_repo']
thisProjectPath = thisProject['path_with_namespace']
if os.path.exists(thisProjectPath):
command = shlex.split('git -C "%s" pull' % (thisProjectPath))
else:
print("=========== 开始克隆 %s %s ===========" % (group_id, thisProject['name']))
print('执行:git clone %s %s' % (thisProjectURL, thisProjectPath))
command = shlex.split('git clone %s %s' % (thisProjectURL, thisProjectPath))
proc = subprocess.Popen(command)
procs.append(proc)
time.sleep(1)
counter += 1
except Exception as e:
print("Error on %s: %s" % (thisProjectURL, e.strerror))
print("=========== 等待子线程执行结束 ===========")
for p in procs:
p.wait()
p.kill()
print("=========== 子线程执行结束 ===========")
return
def have_next_projects(group_id):
url = gen_next_url(group_id)
allProjects = urlopen(url)
allProjectsDict = json.loads(allProjects.read().decode())
if len(allProjectsDict) == 0:
return False
return True,allProjectsDict
def get_sub_groups(parent_id):
url = gen_subgroups_url(parent_id)
allProjects = urlopen(url)
allProjectsDict = json.loads(allProjects.read().decode())
sub_ids = []
if len(allProjectsDict) == 0:
return sub_ids
for thisProject in allProjectsDict:
try:
id = thisProject['id']
sub_ids.append(id)
except Exception as e:
print("Error on %s: %s" % (id, e.strerror))
return sub_ids
def cal_next_sub_groupids(parent_id):
parent = ''
parent = parent_id
is_start = 1
parent_list = []
sub_ids = get_sub_groups(parent_id)
print('cal_next_sub_groupids sub_ids and parent_id:', sub_ids, parent_id)
ok,allProjectsDict = have_next_projects(parent_id)
print('have_next_projects result:', ok)
#本级有群组,无项目
if len(sub_ids) != 0 and ok == False:
for i in range(len(sub_ids)):
print('cal_next_sub_groupids sub_ids[i]:', sub_ids[i])
parent = sub_ids[i]
a = cal_next_sub_groupids(sub_ids[i])
return a
#本级有群组,有项目
if len(sub_ids) != 0 and ok == True:
for i in range(len(sub_ids)):
print('cal_next_sub_groupids parent:', parent)
parent = sub_ids[i]
#parent_list.append(sub_ids[i])
a = cal_next_sub_groupids(sub_ids[i])
parent_list.extend(a)
parent_list.append(parent_id)#加入本级群组ID
#本级没有群组,有项目
if len(sub_ids) == 0 and ok == True:
print('cal_next_sub_groupids is_start:', is_start)
parent_list.append(parent)
return parent_list
#本级无群组,无项目
if len(sub_ids) == 0 and ok == False:
return parent_list
return parent_list
def download_code(parent_id):
data = cal_next_sub_groupids(parent_id)
print('download_code result: ', data)
for group_id in data:
get_next(group_id)
return
def gen_next_url(target_id):
return "https://%s/api/v4/groups/%s/projects?private_token=%s&with_shared=%s&order_by=updated_at" % (
gitlabAddr, target_id, gitlabToken, withShared)
def gen_subgroups_url(target_id):
return "https://%s/api/v4/groups/%s/subgroups?private_token=%s" % (gitlabAddr, target_id, gitlabToken)
def gen_global_url():
return "http://%s/api/v4/projects?private_token=%s" % (gitlabAddr, gitlabToken)
def download_global_code():
global counter
global procs
url = gen_global_url()
allProjects = urlopen(url)
allProjectsDict = json.loads(allProjects.read().decode())
if len(allProjectsDict) == 0:
return
for thisProject in allProjectsDict:
try:
thisProjectURL = thisProject['http_url_to_repo']
thisProjectPath = thisProject['path_with_namespace']
print(thisProjectURL + ' ' + thisProjectPath)
if os.path.exists(thisProjectPath):
command = shlex.split('git -C "%s" pull' % (thisProjectPath))
else:
print("=========== 开始克隆 %s %s ===========" % (group_id, thisProject['name']))
print('执行:git clone %s %s' % (thisProjectURL, thisProjectPath))
command = shlex.split('git clone %s %s' % (thisProjectURL, thisProjectPath))
proc = subprocess.Popen(command)
procs.append(proc)
time.sleep(1)
counter += 1
except Exception as e:
print("Error on %s: %s" % (thisProjectURL, e.strerror))
print("=========== 等待子线程执行结束 ===========")
for p in procs:
p.wait()
p.kill()
print("=========== 子线程执行结束 ===========")
return
def download_targets_code():
for target in targets:
url = "https://%s/api/v4/groups?private_token=%s&search=%s" % (gitlabAddr, gitlabToken, target)
allProjects = urlopen(url)
allProjectsDict = json.loads(allProjects.read().decode())
if len(allProjectsDict) == 0:
return
target_id = ''
for thisProject in allProjectsDict:
try:
this_name = thisProject['name']
if target == this_name:
target_id = thisProject['id']
break
except Exception as e:
print("Error on %s: %s" % (this_name, e.strerror))
download_code(target_id)
return
def main():
if len(targets) == 0:
download_global_code()
else:
download_targets_code()
print("=========== 执行结束,克隆项目数: %s ===========" % (counter))
return
main()