Skip to content

Commit f36e9e5

Browse files
authored
Merge pull request #63 from sunmh207/mashb1t-feature/queue-driver-rq
Mashb1t feature/queue driver rq
2 parents 7c5dddb + 97a7d32 commit f36e9e5

13 files changed

+369
-193
lines changed

.github/workflows/build_image.yml renamed to .github/workflows/build_images.yml

+23
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,17 @@ jobs:
4545
type=semver,pattern={{major}}
4646
type=edge,branch=main
4747

48+
- name: Extract metadata (tags, labels) for Docker (worker)
49+
id: meta_worker
50+
uses: docker/metadata-action@v5
51+
with:
52+
images: ghcr.io/${{ github.repository_owner }}/${{ github.event.repository.name }}
53+
tags: |
54+
type=semver,pattern={{version}},suffix=-worker
55+
type=semver,pattern={{major}}.{{minor}},suffix=-worker
56+
type=semver,pattern={{major}},suffix=-worker
57+
type=edge,branch=main,suffix=-worker
58+
4859
# Build and push multi-arch prod image
4960
- name: Build and push prod Docker image (Multi-Arch)
5061
uses: docker/build-push-action@v6
@@ -56,3 +67,15 @@ jobs:
5667
tags: ${{ steps.meta_prod.outputs.tags }}
5768
labels: ${{ steps.meta_prod.outputs.labels }}
5869
target: prod
70+
71+
# Build and push multi-arch worker image
72+
- name: Build and push worker Docker image (Multi-Arch)
73+
uses: docker/build-push-action@v6
74+
with:
75+
context: .
76+
file: ./Dockerfile
77+
push: true
78+
platforms: linux/amd64,linux/arm64/v8
79+
tags: ${{ steps.meta_worker.outputs.tags }}
80+
labels: ${{ steps.meta_worker.outputs.labels }}
81+
target: worker

Dockerfile

+8-5
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,18 @@ COPY api.py ./api.py
1919
COPY ui.py ./ui.py
2020
COPY conf/prompt_templates.yml ./conf/prompt_templates.yml
2121

22-
23-
# 暴露 Flask 和 Streamlit 的端口
24-
EXPOSE 5001 5002
25-
2622
# 使用 supervisord 作为启动命令
2723
CMD ["/usr/bin/supervisord", "-c", "/etc/supervisor/conf.d/supervisord.conf"]
2824

2925
FROM base AS dev
3026
COPY ./conf/supervisord.dev.conf /etc/supervisor/conf.d/supervisord.conf
27+
# 暴露 Flask 和 Streamlit 的端口
28+
EXPOSE 5001 5002
3129

3230
FROM base AS prod
33-
COPY ./conf/supervisord.prod.conf /etc/supervisor/conf.d/supervisord.conf
31+
COPY ./conf/supervisord.prod.conf /etc/supervisor/conf.d/supervisord.conf
32+
# 暴露 Flask 和 Streamlit 的端口
33+
EXPOSE 5001 5002
34+
35+
FROM base AS worker
36+
COPY ./conf/supervisord.worker.conf /etc/supervisor/conf.d/supervisord.conf

api.py

+8-176
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,21 @@
11
import atexit
22
import json
33
import os
4-
import re
54
import traceback
65
from datetime import datetime
7-
from multiprocessing import Process
86
from urllib.parse import urlparse
97

10-
from biz.utils.token_util import count_tokens, truncate_text_by_tokens
118
from apscheduler.schedulers.background import BackgroundScheduler
129
from apscheduler.triggers.cron import CronTrigger
1310
from dotenv import load_dotenv
1411
from flask import Flask, request, jsonify
1512

16-
from biz.entity.review_entity import MergeRequestReviewEntity, PushReviewEntity
17-
from biz.event.event_manager import event_manager
18-
from biz.gitlab.webhook_handler import MergeRequestHandler, PushHandler
13+
from biz.gitlab.webhook_handler import slugify_url
14+
from biz.queue.worker import handle_merge_request_event, handle_push_event
1915
from biz.service.review_service import ReviewService
20-
from biz.utils.code_reviewer import CodeReviewer
2116
from biz.utils.im import notifier
2217
from biz.utils.log import logger
18+
from biz.utils.queue import handle_queue
2319
from biz.utils.reporter import Reporter
2420

2521
load_dotenv("conf/.env")
@@ -135,22 +131,23 @@ def handle_webhook():
135131
if not gitlab_token:
136132
return jsonify({'message': 'Missing GitLab access token'}), 400
137133

134+
gitlab_url_slug = slugify_url(gitlab_url)
135+
138136
# 打印整个payload数据,或根据需求进行处理
139137
logger.info(f'Received event: {object_kind}')
140138
logger.info(f'Payload: {json.dumps(data)}')
141139

142140
# 处理Merge Request Hook
143141
if object_kind == "merge_request":
144142
# 创建一个新进程进行异步处理
145-
process = Process(target=__handle_merge_request_event, args=(data, gitlab_token, gitlab_url))
146-
process.start()
143+
handle_queue(handle_merge_request_event, data, gitlab_token, gitlab_url, gitlab_url_slug)
147144
# 立马返回响应
148145
return jsonify(
149146
{'message': f'Request received(object_kind={object_kind}), will process asynchronously.'}), 200
150147
elif object_kind == "push":
151148
# 创建一个新进程进行异步处理
152-
process = Process(target=__handle_push_event, args=(data, gitlab_token, gitlab_url))
153-
process.start()
149+
# TODO check if PUSH_REVIEW_ENABLED is needed here
150+
handle_queue(handle_push_event, data, gitlab_token, gitlab_url, gitlab_url_slug)
154151
# 立马返回响应
155152
return jsonify(
156153
{'message': f'Request received(object_kind={object_kind}), will process asynchronously.'}), 200
@@ -162,171 +159,6 @@ def handle_webhook():
162159
return jsonify({'message': 'Invalid data format'}), 400
163160

164161

165-
def slugify_url(original_url: str) -> str:
166-
"""
167-
将原始URL转换为适合作为文件名的字符串,其中非字母或数字的字符会被替换为下划线,举例:
168-
slugify_url("http://example.com/path/to/repo/") => example_com_path_to_repo
169-
slugify_url("https://gitlab.com/user/repo.git") => gitlab_com_user_repo_git
170-
"""
171-
# Remove URL scheme (http, https, etc.) if present
172-
original_url = re.sub(r'^https?://', '', original_url)
173-
174-
# Replace non-alphanumeric characters (except underscore) with underscores
175-
target = re.sub(r'[^a-zA-Z0-9]', '_', original_url)
176-
177-
# Remove trailing underscore if present
178-
target = target.rstrip('_')
179-
180-
return target
181-
182-
183-
def __handle_push_event(webhook_data: dict, gitlab_token: str, gitlab_url: str):
184-
try:
185-
handler = PushHandler(webhook_data, gitlab_token, gitlab_url)
186-
logger.info('Push Hook event received')
187-
commits = handler.get_push_commits()
188-
if not commits:
189-
logger.error('Failed to get commits')
190-
return
191-
192-
review_result = None
193-
score = 0
194-
if PUSH_REVIEW_ENABLED:
195-
# 获取PUSH的changes
196-
changes = handler.get_push_changes()
197-
logger.info('changes: %s', changes)
198-
changes = filter_changes(changes)
199-
if not changes:
200-
logger.info('未检测到PUSH代码的修改,修改文件可能不满足SUPPORTED_EXTENSIONS。')
201-
return
202-
review_result = "关注的文件没有修改"
203-
204-
if len(changes) > 0:
205-
commits_text = ';'.join(commit.get('message', '').strip() for commit in commits)
206-
review_result = review_code(str(changes), commits_text)
207-
score = CodeReviewer.parse_review_score(review_text=review_result)
208-
# 将review结果提交到Gitlab的 notes
209-
handler.add_push_notes(f'Auto Review Result: \n{review_result}')
210-
211-
event_manager['push_reviewed'].send(PushReviewEntity(
212-
project_name=webhook_data['project']['name'],
213-
author=webhook_data['user_username'],
214-
branch=webhook_data['project']['default_branch'],
215-
updated_at=int(datetime.now().timestamp()), # 当前时间
216-
commits=commits,
217-
score=score,
218-
review_result=review_result,
219-
gitlab_url_slug=slugify_url(gitlab_url),
220-
))
221-
222-
except Exception as e:
223-
error_message = f'服务出现未知错误: {str(e)}\n{traceback.format_exc()}'
224-
notifier.send_notification(content=error_message)
225-
logger.error('出现未知错误: %s', error_message)
226-
227-
228-
def __handle_merge_request_event(webhook_data: dict, gitlab_token: str, gitlab_url: str):
229-
'''
230-
处理Merge Request Hook事件
231-
:param webhook_data:
232-
:param gitlab_token:
233-
:param gitlab_url:
234-
:return:
235-
'''
236-
try:
237-
# 解析Webhook数据
238-
handler = MergeRequestHandler(webhook_data, gitlab_token, gitlab_url)
239-
logger.info('Merge Request Hook event received')
240-
241-
if (handler.action in ['open', 'update']): # 仅仅在MR创建或更新时进行Code Review
242-
# 获取Merge Request的changes
243-
changes = handler.get_merge_request_changes()
244-
logger.info('changes: %s', changes)
245-
changes = filter_changes(changes)
246-
if not changes:
247-
logger.info('未检测到有关代码的修改,修改文件可能不满足SUPPORTED_EXTENSIONS。')
248-
return
249-
250-
# 获取Merge Request的commits
251-
commits = handler.get_merge_request_commits()
252-
if not commits:
253-
logger.error('Failed to get commits')
254-
return
255-
256-
# review 代码
257-
commits_text = ';'.join(commit['title'] for commit in commits)
258-
review_result = review_code(str(changes), commits_text)
259-
260-
if "COT ABORT!" in review_result:
261-
logger.error('COT ABORT!')
262-
return
263-
264-
# 将review结果提交到Gitlab的 notes
265-
handler.add_merge_request_notes(f'Auto Review Result: \n{review_result}')
266-
267-
# dispatch merge_request_reviewed event
268-
event_manager['merge_request_reviewed'].send(
269-
MergeRequestReviewEntity(
270-
project_name=webhook_data['project']['name'],
271-
author=webhook_data['user']['username'],
272-
source_branch=webhook_data['object_attributes']['source_branch'],
273-
target_branch=webhook_data['object_attributes']['target_branch'],
274-
updated_at=int(datetime.now().timestamp()),
275-
commits=commits,
276-
score=CodeReviewer.parse_review_score(review_text=review_result),
277-
url=webhook_data['object_attributes']['url'],
278-
review_result=review_result,
279-
gitlab_url_slug=slugify_url(gitlab_url),
280-
)
281-
)
282-
283-
else:
284-
logger.info(f"Merge Request Hook event, action={handler.action}, ignored.")
285-
286-
except Exception as e:
287-
error_message = f'AI Code Review 服务出现未知错误: {str(e)}\n{traceback.format_exc()}'
288-
notifier.send_notification(content=error_message)
289-
logger.error('出现未知错误: %s', error_message)
290-
291-
292-
def filter_changes(changes: list):
293-
'''
294-
过滤数据,只保留支持的文件类型以及必要的字段信息
295-
'''
296-
filter_deleted_files_changes = [change for change in changes if change.get("deleted_file") == False]
297-
# 从环境变量中获取支持的文件扩展名
298-
SUPPORTED_EXTENSIONS = os.getenv('SUPPORTED_EXTENSIONS', '.java,.py,.php').split(',')
299-
# 过滤 `new_path` 以支持的扩展名结尾的元素, 仅保留diff和new_path字段
300-
filtered_changes = [
301-
{
302-
'diff': item.get('diff', ''),
303-
'new_path': item['new_path']
304-
}
305-
for item in filter_deleted_files_changes
306-
if any(item.get('new_path', '').endswith(ext) for ext in SUPPORTED_EXTENSIONS)
307-
]
308-
return filtered_changes
309-
310-
311-
def review_code(changes_text: str, commits_text: str = '') -> str:
312-
# 如果超长,取前REVIEW_MAX_TOKENS个token
313-
review_max_tokens = int(os.getenv('REVIEW_MAX_TOKENS', 10000))
314-
# 如果changes为空,打印日志
315-
if not changes_text:
316-
logger.info('代码为空, diffs_text = %', str(changes_text))
317-
return '代码为空'
318-
319-
# 计算tokens数量,如果超过REVIEW_MAX_TOKENS,截断changes_text
320-
tokens_count = count_tokens(changes_text)
321-
if tokens_count > review_max_tokens:
322-
changes_text = truncate_text_by_tokens(changes_text, review_max_tokens)
323-
324-
review_result = CodeReviewer().review_code(changes_text, commits_text).strip()
325-
if review_result.startswith("```markdown") and review_result.endswith("```"):
326-
return review_result[11:-3].strip()
327-
return review_result
328-
329-
330162
if __name__ == '__main__':
331163
# 启动定时任务调度器
332164
setup_scheduler()

biz/gitlab/test_webhook_handler.py

+1-2
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
# -*- coding: utf-8 -*-
33
# @Time : 2025/3/18 17:58
44
# @Author : Arrow
5-
import os
65
from unittest import TestCase, main
76

87
from biz.gitlab.webhook_handler import PushHandler
@@ -34,4 +33,4 @@ def test_get_parent_commit_id(self):
3433

3534

3635
if __name__ == '__main__':
37-
main()
36+
main()

biz/gitlab/webhook_handler.py

+46-2
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,54 @@
1-
import json
1+
import os
2+
import re
23
import time
34
from urllib.parse import urljoin
45

56
import requests
7+
from dotenv import load_dotenv
68

79
from biz.utils.log import logger
810

11+
load_dotenv()
12+
# 从环境变量中获取支持的文件扩展名
13+
SUPPORTED_EXTENSIONS = os.getenv('SUPPORTED_EXTENSIONS', '.java,.py,.php').split(',')
14+
15+
16+
def filter_changes(changes: list):
17+
'''
18+
过滤数据,只保留支持的文件类型以及必要的字段信息
19+
'''
20+
filter_deleted_files_changes = [change for change in changes if change.get("deleted_file") == False]
21+
22+
# 过滤 `new_path` 以支持的扩展名结尾的元素, 仅保留diff和new_path字段
23+
filtered_changes = [
24+
{
25+
'diff': item.get('diff', ''),
26+
'new_path': item['new_path']
27+
}
28+
for item in filter_deleted_files_changes
29+
if any(item.get('new_path', '').endswith(ext) for ext in SUPPORTED_EXTENSIONS)
30+
]
31+
return filtered_changes
32+
33+
34+
def slugify_url(original_url: str) -> str:
35+
"""
36+
将原始URL转换为适合作为文件名的字符串,其中非字母或数字的字符会被替换为下划线,举例:
37+
slugify_url("http://example.com/path/to/repo/") => example_com_path_to_repo
38+
slugify_url("https://gitlab.com/user/repo.git") => gitlab_com_user_repo_git
39+
"""
40+
# Remove URL scheme (http, https, etc.) if present
41+
original_url = re.sub(r'^https?://', '', original_url)
42+
43+
# Replace non-alphanumeric characters (except underscore) with underscores
44+
target = re.sub(r'[^a-zA-Z0-9]', '_', original_url)
45+
46+
# Remove trailing underscore if present
47+
target = target.rstrip('_')
48+
49+
return target
50+
51+
952

1053
class MergeRequestHandler:
1154
def __init__(self, webhook_data: dict, gitlab_token: str, gitlab_url: str):
@@ -179,7 +222,8 @@ def add_push_notes(self, message: str):
179222
logger.error(f"Failed to add comment: {response.status_code}")
180223
logger.error(response.text)
181224

182-
def __repository_commits(self, ref_name: str = "", since: str = "", until: str = "", pre_page: int = 100, page: int = 1):
225+
def __repository_commits(self, ref_name: str = "", since: str = "", until: str = "", pre_page: int = 100,
226+
page: int = 1):
183227
# 获取仓库提交信息
184228
url = f"{urljoin(f'{self.gitlab_url}/', f'api/v4/projects/{self.project_id}/repository/commits')}?ref_name={ref_name}&since={since}&until={until}&per_page={pre_page}&page={page}"
185229
headers = {

0 commit comments

Comments
 (0)