Commit c2fcc072 authored by 何家明's avatar 何家明

搭建mcp服务

parents
Pipeline #1213 canceled with stages
# Python-generated files
__pycache__/
*.py[oc]
build/
dist/
wheels/
*.egg-info
# Virtual environments
.venv
log
from pydantic import BaseModel
class QueryParam(BaseModel):
"""查询参数实体"""
message: str
"""用户输入的消息内容"""
customer_token: str
"""客户token"""
import uvicorn
from fastapi import FastAPI
from QueryParam import QueryParam
from mcp_client import user_query
app = FastAPI(name=["BME MCP服务"])
@app.post(path="/mcp/query", description="调用mcp工具查询")
async def query(query_param: QueryParam):
message = await user_query(query_param)
return {
"message": message
}
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)
active: deepseek_v3_bme
model:
# deepseek-v3官方模型
deepseek_v3_official:
api_key: sk-1fc82f2f3b424668bddc84c89b7131d8
base_url: https://api.deepseek.com
model_name: deepseek-chat
# deepseek-r1官方模型
deepseek_r1_official:
api_key: sk-1fc82f2f3b424668bddc84c89b7131d8
base_url: https://api.deepseek.com
model_name: deepseek-reasoner
# deepseek-r1公司模型
deepseek_r1_bme:
api_key: Tc7sY47hiU5d1LNGbJjGBfqfY13IE3khIc0uBvpJ11U
base_url: http://10.10.10.12:30070/v1
model_name: deepseek-r1
# deepseek-v3公司模型
deepseek_v3_bme:
api_key: Tc7sY47hiU5d1LNGbJjGBfqfY13IE3khIc0uBvpJ11U
base_url: http://10.10.10.12:30100/v1
model_name: deepseek-v3
server:
# 运行命令
command: python
args:
- mcp_server.py
tool_calls_deep: 20 # tool_call调用深度
log:
base_path: log
remote:
base_url:
bme-screen-service: https://vis.bmetech.com/vis
customer:
01ce2837d453c02f9b0e1828d0134e8e: bme # 超级管理员,可以查看所有客户资源
ef616aad53d3eddfb53ca71980421440: 59 # 连云港华乐合金集团有限公司
\ No newline at end of file
from contextlib import AsyncExitStack
from datetime import datetime
from typing import Optional
from loguru import logger
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
from openai import OpenAI
from openai.types.chat import ChatCompletionToolParam, ChatCompletionAssistantMessageParam, \
ChatCompletionMessageToolCallParam, ChatCompletionToolMessageParam
from openai.types.chat.chat_completion_message_tool_call_param import Function
from openai.types.shared_params import FunctionDefinition
import json
import yaml
from pydantic import AnyUrl
from QueryParam import QueryParam
with open("config.yaml", "r", encoding="utf-8") as yml_file:
config = yaml.safe_load(yml_file)
if config["log"]["base_path"]:
logger.add(config["log"]["base_path"] + "/mcp_client/log_{time:%Y-%m-%d}.log", rotation="1 day", encoding="utf-8",
level="INFO")
class McpClient:
def __init__(self):
self.customer_resource: [] = None # 客户资源
self.default_system_prompt = None # mcp_server提供的默认提示词
self.available_tools: [] = None # mcp_server提供的tool
self.session: Optional[ClientSession] = None
self.exit_stack = AsyncExitStack()
active_model = config["model"][config["active"]]
self.client = OpenAI(
api_key=active_model["api_key"],
base_url=active_model["base_url"],
)
self.model_name = active_model["model_name"]
self.connected = False
async def connect_to_server(self):
"""
连接mcp_server服务
"""
server_params = StdioServerParameters(
command=config["server"]["command"],
args=config["server"]["args"],
)
stdio_transport = await self.exit_stack.enter_async_context(stdio_client(server_params))
stdio, write = stdio_transport
self.session = await self.exit_stack.enter_async_context(ClientSession(stdio, write))
await self.session.initialize()
self.connected = True
async def read_mcp(self):
"""
读取mcp服务提供的数据
"""
if self.available_tools is None:
await self.get_server_tools()
if self.default_system_prompt is None:
await self.get_server_prompts()
if self.customer_resource is None:
await self.get_server_resources()
async def get_server_tools(self):
"""
获取server提供的tool
"""
mcp_server_tools = await self.session.list_tools()
self.available_tools = [ChatCompletionToolParam(
type="function",
function=FunctionDefinition(
name=tool.name,
description=tool.description,
parameters=tool.inputSchema
),
) for tool in mcp_server_tools.tools]
logger.info(f"--> available_tools: {self.available_tools}")
async def get_server_prompts(self):
"""
获取server提供的prompt
"""
mcp_server_default_system_prompt = await self.session.get_prompt(name="default_system_prompt")
self.default_system_prompt = mcp_server_default_system_prompt.messages[0].content.text
logger.info(f"--> default_system_prompt: {self.default_system_prompt}")
async def get_server_resources(self):
"""
获取server提供的resource
"""
# 这里查出所有客户信息,使用时直接取即可,不要通过id去获取客户资源,这样每次都要重新查询
mcp_server_customer_resource = await self.session.read_resource(AnyUrl("api://customers"))
self.customer_resource = json.loads(mcp_server_customer_resource.contents[0].text)
logger.info(f"--> customer_resource: {self.customer_resource}")
def deal_customer_permission(self, customer_token: str):
customer_id = config["customer"].get(customer_token, None)
if not customer_id:
logger.info(f"Access restricted, customer token[{customer_token}] is not configured!")
return "访问受限,客户信息未配置!"
if not self.customer_resource:
logger.info("No customer resources found!")
return "访问受限,客户信息未配置!"
if customer_id == "bme":
customer_resource = "客户资源如下(如果用户查询没有指定客户,请提示并要求用户传入客户信息):\n"
customer_resource += "\n".join(
f"客户id:{data.get('customerId')},客户名:{data.get('customerName')},客户全称:{data.get('customerFullname')}"
for data in self.customer_resource)
return customer_resource
else:
customer = list(filter(lambda c: c["customerId"] == customer_id, self.customer_resource))
if not customer:
return "访问受限,客户信息未配置!"
return f"""请使用下面的客户信息:
客户id:{customer[0]['customerId']}
客户名:{customer[0]['customerName']}
客户全称:{customer[0]['customerFullname']}"""
async def process_query(self, param: QueryParam):
"""
处理查询逻辑
:param param: 请求参数
:return: 经过mcp加工后的ai回答
"""
logger.info(f"--> user origin query: {param}")
messages = [
{"role": "system", "content": self.default_system_prompt},
{"role": "system", "content": self.deal_customer_permission(param.customer_token)},
{"role": "system", "content": f"如果要使用到当前时间,请使用{datetime.now()}"},
{"role": "user", "content": param.message}
]
logger.info(f"--> messages: {messages}")
logger.info(f"--> model: {self.model_name}")
# 调用ai
ai_response = self.client.chat.completions.create(
model=self.model_name,
messages=messages,
tools=self.available_tools
)
logger.info(f"--> ai response: {ai_response}")
final_text = []
chat_completion_message = ai_response.choices[0].message
final_text.append(chat_completion_message.content) if chat_completion_message.content else None
# 防止死循环
loop = 0
# 只要还存在工具调用,就循环下去
while chat_completion_message.tool_calls and loop < config["tool_calls_deep"]:
loop = loop + 1
logger.info(f"----> Available tools: {chat_completion_message.tool_calls}")
# 可能ai一次选取了多个工具,这里循环处理
for tool_call in chat_completion_message.tool_calls:
tool_name = tool_call.function.name
tool_args = json.loads(tool_call.function.arguments)
logger.info(f"------> start to call tool...")
logger.info(f"------> tool_name: {tool_name}, tool_args: {tool_args}")
result = await self.session.call_tool(tool_name, tool_args)
logger.info(f"------> call result: {result}")
messages.append(ChatCompletionAssistantMessageParam(
role="assistant",
tool_calls=[ChatCompletionMessageToolCallParam(
id=tool_call.id,
type="function",
function=Function(
name=tool_name,
arguments=json.dumps(tool_args)
)
)]
))
messages.append(ChatCompletionToolMessageParam(
role="tool",
tool_call_id=tool_call.id,
content=str(result.content)
))
ai_response = self.client.chat.completions.create(
model=self.model_name,
messages=messages,
tools=self.available_tools
)
logger.info(f"----> ai response: {ai_response}")
chat_completion_message = ai_response.choices[0].message
final_text.append(chat_completion_message.content) if chat_completion_message.content else None
return "\n".join(final_text)
async def handle_query(self, param: QueryParam):
try:
return await self.process_query(param)
except Exception as e:
logger.exception(e)
async def cleanup(self):
"""Clean up resources."""
await self.exit_stack.aclose()
client = McpClient()
async def user_query(param: QueryParam):
try:
if not client.connected:
await client.connect_to_server()
await client.read_mcp()
result = await client.handle_query(param)
logger.info(f"Final return: {result}")
return result
except Exception as e:
logger.exception(e)
import requests
import yaml
from mcp.server.fastmcp import FastMCP
mcp = FastMCP("BME-MCP")
with open("config.yaml", "r", encoding="utf-8") as yml_file:
config = yaml.safe_load(yml_file)
base_url = config["remote"]["base_url"]
bme_screen_service = base_url["bme-screen-service"]
def deal_request_exception(response):
"""
处理response的异常信息
:param response: 响应体
:return: json数据
"""
response.raise_for_status()
response_json = response.json()
if isinstance(response_json, dict) and "success" in response_json and not response_json["success"]:
raise Exception(response_json.get("msg"))
return response_json
def get_size_limit(size: int, max_size=20, min_size=0) -> int:
"""
处理数据量边界
:param size: 数据量
:param max_size: 最大边界
:param min_size: 最小边界
:return:
"""
size = max_size if size > max_size else size
size = min_size if size < min_size else size
return size
@mcp.tool()
def get_air_pm_10_month_focus(customer_id: int) -> []:
"""
根据客户id获取本月重点关注的空气质量微站(PM10标准)
:param customer_id: 客户id
:return: 返回结构中的字段名解释:
rank:排行,
deviceNo:设备编号,
deviceName:设备名称
"""
response = requests.get(bme_screen_service + "/monitor/getMonitorRankingTitle", {
"customerId": customer_id,
"type": "1", # 随便传,该参数当前无用
"order": "1", # 随便传,该参数当前无用
})
response_data = deal_request_exception(response).get("data", {}).get("title", [])
return [{
"rank": data.get("rank"),
"deviceNo": data.get("deviceNo"),
"deviceName": data.get("deviceName")
} for data in response_data]
@mcp.tool()
def get_air_pm_rank(customer_id: int, order: int=1, statistic_type: int=1) -> []:
"""
根据客户id获取空气质量微站排行(包含PM2.5和PM10标准)
:param customer_id: 客户id
:param order: 排序规则:按统计区间的类型进行排序,升序排序传1,降序排序传2,默认为1
:param statistic_type: 统计区间:按小时统计传1,按日统计传2,要按月统计传3,默认为1
:return: 返回结构中的字段名解释:
PM10:PM10的数据集,
PM2.5:PM2.5的数据集,
avgValue:平均值 单位为μg/m³,
deviceNo:设备编号,
deviceName:设备名称,
"""
response = requests.get(bme_screen_service + "/monitor/getMonitorRanking", {
"customerId": customer_id,
"type": statistic_type,
"order": order,
})
response_data = deal_request_exception(response).get("data", {})
result = {
"PM10": [{
"avgValue": pm10.get("avg_value"),
"deviceNo": pm10.get("deviceNo"),
"deviceName": pm10.get("deviceName")
} for pm10 in response_data.get("pm10lists")],
"PM2.5": [{
"avgValue": pm2_5.get("avg_value"),
"deviceNo": pm2_5.get("deviceNo"),
"deviceName": pm2_5.get("deviceName")
} for pm2_5 in response_data.get("pm25lists")],
}
return result
@mcp.tool()
def get_tsp_rank(customer_id: int, order: int=1, statistic_type: int=1, size: int=10) -> []:
"""
根据客户id获取TSP监测排行
:param customer_id: 客户id
:param order: 排序规则:按统计区间的类型进行排序,升序排序传1,降序排序传2,默认为1
:param statistic_type: 统计区间:按小时统计传1,按日统计传2,要按月统计传3,默认为1
:param size: 要取多少条数据,比如按小时升序取前两条,则传2,默认为10
:return: 返回结构中的字段名解释:
total: 区域总数
data: 详细数据
data->area: 区域名称
data->devicecount: 关联设备数
data->realtimePollutionIndex: 实时污染指数
data->hourRank: 小时排名
data->dayRank: 日排名
data->monthRank: 月排名
"""
response = requests.get(bme_screen_service + "/monitor/getMonitorTSPRanking", {
"customerId": customer_id,
"order": order,
"pageNo": 1,
"pageSize": get_size_limit(size),
"type": statistic_type,
})
response_data = deal_request_exception(response).get("data", {})
result = {
"total": response_data.get("areaCount", 0),
"data": [{
"area": data.get("area"),
"devicecount": data.get("devicecount"),
"realtimePollutionIndex": f"{data.get('tdc')} {data.get('tspUnit')}",
"hourRank": data.get("hourRank"),
"dayRank": data.get("dayRank"),
"monthRank": data.get("monthRank"),
} for data in response_data.get("page", {}).get("records")],
}
return result
@mcp.tool()
def get_governance_process_statistics(customer_id: int) -> {}:
"""
根据客户id获取今天治理过程记录统计数据
:param customer_id: 客户id
:return: 返回结构中的字段名解释:
timeCount: 今日雾炮平均开启时长(小时)
allCount: 今日污染发生次数(次)
openCount: 今日雾炮开启次数(次)
"""
response = requests.get(bme_screen_service + "/eq/selectEquCount", {"customerId": customer_id})
response_data = deal_request_exception(response).get("data", {})
result = {
"timeCount": response_data.get("timeCount", 0),
"allCount": response_data.get("allCount", 0),
"openCount": response_data.get("openCount", 0),
}
return result
@mcp.tool()
def get_governance_process_records(customer_id: int, instruct_type: str, device_name: str, start_time: str,
end_time: str, size: int) -> []:
"""
根据客户id查询治理过程全记录列表
:param customer_id: 客户id
:param instruct_type: 触发机制:智能联动为0,手动开启为1,全部为空
:param device_name: 设备名称
:param start_time: 开始时间,格式为yyyy-MM-dd HH:mm:ss,如:2025-04-27 17:30:00,默认为七天前的时间
:param end_time: 结束时间,格式为yyyy-MM-dd HH:mm:ss,如:2025-04-27 17:30:00,默认为当前时间
:param size: 查询数量,默认10
:return: 返回结构中的字段名解释:
total:总数(由于该接口是分页查询,所以这里返回总数)
data:数据集
data->createTime: 时间
data->deviceName: 设备
data->position: 位置
data->instructType: 触发机制,0为智能联动,1为手动开启
data->describe: 治理过程描述
"""
response = requests.get(bme_screen_service + "/eq/selectEquPage", {
"customerId": customer_id,
"InstructType": instruct_type,
"deviceName": device_name,
"StatcDate": start_time,
"StopDate": end_time,
"pageNo": 1,
"pageSize": get_size_limit(size),
})
response_data = deal_request_exception(response).get("data", {})
result = {
"total": response_data.get("total", 0),
"data": [{
"createTime": data.get("createTime"),
"deviceName": data.get("deviceName"),
"position": data.get("position"),
"instructType": data.get("instructType"),
"describe": data.get("describe"),
} for data in response_data.get("records")],
}
return result
@mcp.resource("api://customers")
def get_all_available_customer() -> []:
"""
获取所有的客户信息
:return: 客户信息数据
"""
response = requests.get(bme_screen_service + "/reportgen/customer")
response_data = deal_request_exception(response).get("data", [])
return [{
"customerId": data.get("customerId"),
"customerName": data.get("customerName"),
"customerFullname": data.get("customerFullname")
} for data in response_data]
@mcp.prompt(name="default_system_prompt")
def get_default_system_prompt() -> str:
"""
默认的系统提示词
:return: 默认的系统提示词
"""
return (
"你可以结合一系列的工具(tool)来回答用户的问题。\n"
"以下是你应该始终遵循的规则:\n"
"1.始终传入类型正确的参数,如果从输入中没有解析到参数,则取工具对应参数描述的默认值,如果也没有默认值,则结束流程并告知用户信息不全。\n"
"2.只在需要时调用工具,如果你不需要额外信息,不要调用搜索代理,尽量自己解决任务。\n"
"3.如果不需要调用工具,直接回答问题即可。\n"
"4.永远不要用完全相同的参数重新进行之前的工具调用。\n"
"5.如果要使用到客户信息,当用名字查找时,优先匹配客户全称,其次匹配客户名,再考虑使用部分匹配,最后考虑读音相近的名字,都没找到则结束流程并告知用户客户不存在。\n"
)
if __name__ == '__main__':
mcp.run(transport="stdio")
[project]
name = "bme-mcp"
version = "0.1.0"
description = "Add your description here"
readme = "README.md"
requires-python = ">=3.13"
dependencies = [
"mcp[cli]>=1.6.0",
]
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment