python监听gogs分支变化来触发jenkins job
import requests
import time
import logging
# 用于检测 Gogs 仓库的分支变化,触发
# Gogs API 的基本 URL
base_url = "http://192.168.0.142:3000/api/v1"
# 项目的所有者和名称
owner = "root"
repo = "testaaa"
# 分支名称
branch = "master" # 或者你想要检测的其他分支
# 你的访问令牌
access_token = "ac50ae88508a85188f22045a1af0883810bc0fd9"
# Jenkins配置
url = "http://192.168.0.236:8080/job/tertet/buildWithParameters?token=ATsVL9vzsQ34sL4yJlgt7qLotHwKsE"
# 请求体参数
payload = 'STRTEST=999'
# 头部
headers = {
'Content-Type': 'application/x-www-form-urlencoded',
'Authorization': 'Basic YWRtaW46MTE3OTVkMDllYTcwNzBhMDk1Y2IwYmVkNzllYzZiZjg4OQ=='
}
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s: %(message)s')
version_hash = None
while True:
# 获取当前分支的最新提交信息
def get_latest_commit():
url = f"{base_url}/repos/{owner}/{repo}/commits/{branch}"
headers = {"Authorization": f"token {access_token}"}
try:
response = requests.get(url, headers=headers)
res_code = response.status_code
except requests.exceptions.RequestException as e:
logging.error("请求失败:{}".format(e))
res_code = 500
if res_code == 200:
return response.json()
# 比较最新提交信息
def check_for_changes():
latest_commit = get_latest_commit()
global version_hash
if latest_commit:
if version_hash is None:
logging.info("分支第一次检测")
version_hash = latest_commit['sha']
else:
if latest_commit['sha'] != version_hash:
logging.info("分支发生了变化!")
version_hash = latest_commit['sha']
response = requests.request("POST", url, headers=headers, data=payload)
logging.info("Trigger jenkins job response code: {}".format(response.status_code))
else:
logging.info("分支没有变化。")
else:
logging.info("获取最新提交信息失败。")
time.sleep(30)
check_for_changes()
time.sleep(5)