-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathllm_service.py
More file actions
152 lines (128 loc) · 4.93 KB
/
llm_service.py
File metadata and controls
152 lines (128 loc) · 4.93 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
"""
LLM服务封装模块
支持ModelScope API调用和流式响应处理
"""
import json
import time
from typing import List, Dict, Optional, Iterator
from openai import OpenAI
import config
class LLMService:
"""LLM服务封装类"""
def __init__(self):
"""初始化LLM服务"""
if config.MODELSCOPE_API_KEY == 'YOUR_API_KEY_HERE':
raise ValueError("请在config.py中配置您的ModelScope API密钥")
self.client = OpenAI(
base_url=config.MODELSCOPE_BASE_URL,
api_key=config.MODELSCOPE_API_KEY,
)
self.model = config.MODELSCOPE_MODEL
self.max_retries = 3
self.retry_delay = 1 # 秒
def _call_api(self, messages: List[Dict], stream: bool = True, **kwargs) -> Iterator[str]:
"""
调用ModelScope API
Args:
messages: 消息列表
stream: 是否使用流式响应
**kwargs: 其他参数
Yields:
响应内容片段
"""
extra_body = {
"enable_thinking": True
}
for attempt in range(self.max_retries):
try:
response = self.client.chat.completions.create(
model=self.model,
messages=messages,
stream=stream,
extra_body=extra_body,
**kwargs
)
if stream:
done_thinking = False
for chunk in response:
if chunk.choices:
thinking_chunk = chunk.choices[0].delta.reasoning_content
answer_chunk = chunk.choices[0].delta.content
if thinking_chunk:
# 思考过程可以记录,这里暂时忽略
pass
elif answer_chunk:
if not done_thinking:
done_thinking = True
yield answer_chunk
else:
# 非流式响应
content = response.choices[0].message.content
yield content
return # 成功则返回
except Exception as e:
if attempt < self.max_retries - 1:
time.sleep(self.retry_delay * (attempt + 1))
continue
else:
raise Exception(f"API调用失败,已重试{self.max_retries}次: {str(e)}")
def chat(self, messages: List[Dict], stream: bool = True) -> str:
"""
发送聊天消息并获取完整响应
Args:
messages: 消息列表
stream: 是否使用流式响应
Returns:
完整的响应文本
"""
response_parts = []
for chunk in self._call_api(messages, stream=stream):
response_parts.append(chunk)
return ''.join(response_parts)
def chat_json(self, messages: List[Dict], stream: bool = True) -> Dict:
"""
发送聊天消息并解析JSON响应
Args:
messages: 消息列表
stream: 是否使用流式响应
Returns:
解析后的JSON字典
"""
response_text = self.chat(messages, stream=stream)
# 尝试提取JSON(可能包含markdown代码块)
response_text = response_text.strip()
if response_text.startswith('```'):
# 提取代码块中的内容
lines = response_text.split('\n')
json_start = False
json_lines = []
for line in lines:
if line.strip().startswith('```'):
if json_start:
break
json_start = True
continue
if json_start:
json_lines.append(line)
response_text = '\n'.join(json_lines)
try:
return json.loads(response_text)
except json.JSONDecodeError as e:
raise ValueError(f"无法解析JSON响应: {str(e)}\n响应内容: {response_text}")
def chat_multiple_times(self, messages: List[Dict], times: int = 3) -> List[str]:
"""
多次调用API并返回所有结果(用于排序稳定性)
Args:
messages: 消息列表
times: 调用次数
Returns:
多次调用的结果列表
"""
results = []
for i in range(times):
result = self.chat(messages, stream=True)
results.append(result)
# 避免请求过快
if i < times - 1:
time.sleep(0.5)
return results