dify+MCP多应用,构建灵活的AI应用生态系统 - 肖祥 - 博客园

来源: dify+MCP多应用,构建灵活的AI应用生态系统 – 肖祥 – 博客园

一、概述

前面几篇文章写很多MCP应用,基本上一个dify工作流使用一个MCP应用。

那么一个dify工作流,同时使用多个MCP应用,是否可以呢?答案是可以的。

 

先来看一下效果图

说明:

这里使用了问题分类器,用来判断用户的问题,应该调用哪个MCP应用

AGENT1~4,分别对应一个MCP应用,例如:public-ip-mcp-server,mySQL8-mcp-server,desensitization-mcp-server,searxng-mcp-server

针对mySQL查询输出的内容,会进行脱敏处理。

二、问题分类器

定义

通过定义分类描述,问题分类器能够根据用户输入,使用 LLM 推理与之相匹配的分类并输出分类结果,向下游节点提供更加精确的信息。

场景

常见的使用情景包括客服对话意图分类、产品评价分类、邮件批量分类等。

在一个典型的产品客服问答场景中,问题分类器可以作为知识库检索的前置步骤,对用户输入问题意图进行分类处理,分类后导向下游不同的知识库查询相关的内容,以精确回复用户的问题。

设置

对于比较精确的条件,一般使用条件分支。但是对于我这种场景,条件比较模糊,所以需要使用问题分类器

 

这里定义了3个分类:

公网ip相关问题
mysql 数据库相关查询,涉及学生、教师、成绩、班级、课程等
其他问题

效果如下:

说明:

公网ip相关问题,会直接调用MCP应用public-ip-mcp-server

mysql相关问题,会调用MCP应用mysql8-mcp-server

其他问题,会调用MCP应用searxng-mcp-server,这个是一个联网搜索引擎,你可以理解为百度,想搜什么都可以。

三、环境说明

dify版本

这里使用的是最新版本1.4.0,如果你的版本没有这么高,1.3.0以上版本也可以。

mcp插件

确保已经安装了以下插件:

Agent 策略(支持 MCP 工具)

MCP SSE / StreamableHTTP

确保插件版本,已经升级到最新版本

mcp应用

这里的所有MCP应用,统一使用Streamable HTTP模式,全部部署在k8s里面。

当然,使用docker运行也是可以的。

mcp插件设置

点击插件MCP SSE / StreamableHTTP,输入MCP 服务配置

完整内容如下:

复制代码
{
    "mysql8-mcp-server": {
        "transport": "streamable_http",
        "url": "http://mysql8-mcp-server-svc.mcp:9000/mcp/",
        "headers": {},
        "timeout": 60
    },
    "desensitization-mcp-server": {
        "transport": "streamable_http",
        "url": "http://desensitization-mcp-server-svc.mcp:9000/mcp/",
        "headers": {},
        "timeout": 60
    },
    "public-ip-mcp-server": {
        "transport": "streamable_http",
        "url": "http://public-ip-mcp-server-svc.mcp:9000/mcp/",
        "headers": {},
        "timeout": 60
    },
    "searxng-mcp-server": {
        "transport": "streamable_http",
        "url": "http://searxng-mcp-server-svc.mcp:9000/mcp/",
        "headers": {},
        "timeout": 60
    }
}
复制代码

注意:这里的url使用的是k8s内部地址,如果使用的是docker方式运行,请根据实际情况修改。

四、public-ip-mcp-server设置

public-ip-mcp-server核心代码如下:

server.py

复制代码
from fastmcp import FastMCP
import json
import requests

mcp = FastMCP("public-ip-address")


@mcp.tool()
def get_public_ip_address() -> str:
    """
    获取公网ip地址
    返回:
        str: 当前网络的公网ip地址
    """
    response = requests.get("http://ip-api.com/json")
    content = json.loads(response.text)
    return content["query"]


if __name__ == "__main__":
    mcp.run(transport="streamable-http", host="0.0.0.0", port=9000, path="/mcp")
复制代码

Agent配置

Agent 1详细配置如下:

MCP服务配置

复制代码
{
  "public-ip-mcp-server": {
        "transport": "streamable_http",
        "url": "http://public-ip-mcp-server-svc.mcp:9000/mcp/",
        "headers": {},
        "timeout": 60
    }
}
复制代码

指令

使用MCP工具,获取服务器公网ip

 

最后直接回复,注意选择变量Agent1 text

 

五、mysql8-mcp-server设置

核心代码

mysql8-mcp-server核心代码如下:

server.py

复制代码
from fastmcp import FastMCP
from mysql.connector import connect, Error
import os

mcp = FastMCP("operateMysql")


def get_db_config():
    """从环境变量获取数据库配置信息

    返回:
        dict: 包含数据库连接所需的配置信息
        - host: 数据库主机地址
        - port: 数据库端口
        - user: 数据库用户名
        - password: 数据库密码
        - database: 数据库名称

    异常:
        ValueError: 当必需的配置信息缺失时抛出
    """

    config = {
        "host": os.getenv("MYSQL_HOST", "localhost"),
        "port": int(os.getenv("MYSQL_PORT", "3306")),
        "user": os.getenv("MYSQL_USER"),
        "password": os.getenv("MYSQL_PASSWORD"),
        "database": os.getenv("MYSQL_DATABASE"),
    }
    print(config)
    if not all(
        [
            config["host"],
            config["port"],
            config["user"],
            config["password"],
            config["database"],
        ]
    ):
        raise ValueError("缺少必需的数据库配置")

    return config


@mcp.tool()
def execute_sql(query: str) -> list:
    """执行SQL查询语句

    参数:
        query (str): 要执行的SQL语句,支持多条语句以分号分隔

    返回:
        list: 包含查询结果的TextContent列表
        - 对于SELECT查询:返回CSV格式的结果,包含列名和数据
        - 对于SHOW TABLES:返回数据库中的所有表名
        - 对于其他查询:返回执行状态和影响行数
        - 多条语句的结果以"---"分隔

    异常:
        Error: 当数据库连接或查询执行失败时抛出
    """
    config = get_db_config()
    try:
        with connect(**config) as conn:
            with conn.cursor() as cursor:
                statements = [stmt.strip() for stmt in query.split(";") if stmt.strip()]
                results = []

                for statement in statements:
                    try:
                        cursor.execute(statement)

                        # 检查语句是否返回了结果集 (SELECT, SHOW, EXPLAIN, etc.)
                        if cursor.description:
                            columns = [desc[0] for desc in cursor.description]
                            rows = cursor.fetchall()

                            # 将每一行的数据转换为字符串,特殊处理None值
                            formatted_rows = []
                            for row in rows:
                                formatted_row = [
                                    "NULL" if value is None else str(value)
                                    for value in row
                                ]
                                formatted_rows.append(",".join(formatted_row))

                            # 将列名和数据合并为CSV格式
                            results.append(
                                "\n".join([",".join(columns)] + formatted_rows)
                            )

                        # 如果语句没有返回结果集 (INSERT, UPDATE, DELETE, etc.)
                        else:
                            conn.commit()  # 只有在非查询语句时才提交
                            results.append(f"查询执行成功。影响行数: {cursor.rowcount}")

                    except Error as stmt_error:
                        # 单条语句执行出错时,记录错误并继续执行
                        results.append(
                            f"执行语句 '{statement}' 出错: {str(stmt_error)}"
                        )
                        # 可以在这里选择是否继续执行后续语句,目前是继续

                return ["\n---\n".join(results)]

    except Error as e:
        print(f"执行SQL '{query}' 时出错: {e}")
        return [f"执行查询时出错: {str(e)}"]


@mcp.tool()
def get_table_name(text: str) -> list:
    """根据表的中文注释搜索数据库中的表名

    参数:
        text (str): 要搜索的表中文注释关键词

    返回:
        list: 包含查询结果的TextContent列表
        - 返回匹配的表名、数据库名和表注释信息
        - 结果以CSV格式返回,包含列名和数据
    """
    config = get_db_config()
    sql = "SELECT TABLE_SCHEMA, TABLE_NAME, TABLE_COMMENT "
    sql += f"FROM information_schema.TABLES WHERE TABLE_SCHEMA = '{config['database']}' AND TABLE_COMMENT LIKE '%{text}%';"
    return execute_sql(sql)


@mcp.tool()
def get_table_desc(text: str) -> list:
    """获取指定表的字段结构信息

    参数:
        text (str): 要查询的表名,多个表名以逗号分隔

    返回:
        list: 包含查询结果的列表
        - 返回表的字段名、字段注释等信息
        - 结果按表名和字段顺序排序
        - 结果以CSV格式返回,包含列名和数据
    """
    config = get_db_config()
    # 将输入的表名按逗号分割成列表
    table_names = [name.strip() for name in text.split(",")]
    # 构建IN条件
    table_condition = "','".join(table_names)
    sql = "SELECT TABLE_NAME, COLUMN_NAME, COLUMN_COMMENT "
    sql += (
        f"FROM information_schema.COLUMNS WHERE TABLE_SCHEMA = '{config['database']}' "
    )
    sql += f"AND TABLE_NAME IN ('{table_condition}') ORDER BY TABLE_NAME, ORDINAL_POSITION;"
    return execute_sql(sql)


@mcp.tool()
def get_lock_tables() -> list:
    """
    获取当前mysql服务器InnoDB 的行级锁

    返回:
        list: 包含查询结果的TextContent列表
    """
    sql = """SELECT
    p2.`HOST` AS 被阻塞方host,
    p2.`USER` AS 被阻塞方用户,
    r.trx_id AS 被阻塞方事务id,
    r.trx_mysql_thread_id AS 被阻塞方线程号,
    TIMESTAMPDIFF(SECOND, r.trx_wait_started, CURRENT_TIMESTAMP) AS 等待时间,
    r.trx_query AS 被阻塞的查询,
    l.OBJECT_NAME AS 阻塞方锁住的表,
    m.LOCK_MODE AS 被阻塞方的锁模式,
    m.LOCK_TYPE AS '被阻塞方的锁类型(表锁还是行锁)',
    m.INDEX_NAME AS 被阻塞方锁住的索引,
    m.OBJECT_SCHEMA AS 被阻塞方锁对象的数据库名,
    m.OBJECT_NAME AS 被阻塞方锁对象的表名,
    m.LOCK_DATA AS 被阻塞方事务锁定记录的主键值,
    p.`HOST` AS 阻塞方主机,
    p.`USER` AS 阻塞方用户,
    b.trx_id AS 阻塞方事务id,
    b.trx_mysql_thread_id AS 阻塞方线程号,
    b.trx_query AS 阻塞方查询,
    l.LOCK_MODE AS 阻塞方的锁模式,
    l.LOCK_TYPE AS '阻塞方的锁类型(表锁还是行锁)',
    l.INDEX_NAME AS 阻塞方锁住的索引,
    l.OBJECT_SCHEMA AS 阻塞方锁对象的数据库名,
    l.OBJECT_NAME AS 阻塞方锁对象的表名,
    l.LOCK_DATA AS 阻塞方事务锁定记录的主键值,
    IF(p.COMMAND = 'Sleep', CONCAT(p.TIME, ' 秒'), 0) AS 阻塞方事务空闲的时间
    FROM performance_schema.data_lock_waits w
    INNER JOIN performance_schema.data_locks l ON w.BLOCKING_ENGINE_LOCK_ID = l.ENGINE_LOCK_ID
    INNER JOIN performance_schema.data_locks m ON w.REQUESTING_ENGINE_LOCK_ID = m.ENGINE_LOCK_ID
    INNER JOIN information_schema.INNODB_TRX b ON b.trx_id = w.BLOCKING_ENGINE_TRANSACTION_ID
    INNER JOIN information_schema.INNODB_TRX r ON r.trx_id = w.REQUESTING_ENGINE_TRANSACTION_ID
    INNER JOIN information_schema.PROCESSLIST p ON p.ID = b.trx_mysql_thread_id
    INNER JOIN information_schema.PROCESSLIST p2 ON p2.ID = r.trx_mysql_thread_id
    ORDER BY 等待时间 DESC;"""

    return execute_sql(sql)


if __name__ == "__main__":
    mcp.run(transport="streamable-http", host="0.0.0.0", port=9000, path="/mcp")
复制代码

Agent配置

Agent 2详细配置如下:

 

MCP服务配置

复制代码
{
    "mysql8-mcp-server": {
        "transport": "streamable_http",
        "url": "http://mysql8-mcp-server-svc.mcp:9000/mcp/",
        "timeout": 60
    }
}
复制代码

指令

 View Code

 

六、desensitization-mcp-server设置

核心代码

desensitization-mcp-server核心代码如下:

server.py

 View Code

Agent配置

Agent 4详细配置如下:

注意:查询要选择变量Agent 2 text

 

MCP服务配置

复制代码
{ 
  "desensitization-mcp-server": {
        "transport": "streamable_http",
        "url": "http://desensitization-mcp-server-svc.mcp:9000/mcp/",
        "headers": {},
        "timeout": 60
    }
}
复制代码

指令

使用MCP工具,对文本进行脱敏处理

 

最后直接回复,注意选择变量Agent4 text

 

七、searxng-mcp-server设置

核心代码

searxng-mcp-server核心代码如下:

server.py

复制代码
from fastmcp import FastMCP
import requests
import os

mcp = FastMCP("searxng")


@mcp.tool()
def search(query: str) -> str:
    """
    搜索关键字,调用searxng的API接口
    参数:
        query (str): 要搜索的关键词
    返回:
        str: 查询结果
    """
    api_server = os.getenv("API_SERVER", None)
    if not api_server:
        print("缺少必需的API_SERVER配置")
        raise ValueError("缺少必需的API_SERVER配置")

    # API URL
    url = "%s/search?q=%s&format=json" % (api_server, query)
    print(url)

    try:
        # 发送GET请求
        response = requests.get(url)

        # 检查请求是否成功
        if response.status_code == 200:
            # 将响应内容解析为JSON
            data = response.json()
            # print("JSON内容:")
            # print(data,type(data))
            result_list = []
            for i in data["results"]:
                # print(i["content"])
                result_list.append(i["content"])
            content = "\n".join(result_list)
            # print(content)
            return content
        else:
            print(f"请求失败,状态码: {response.status_code}")
            return False

    except requests.exceptions.RequestException as e:
        print(f"请求过程中发生错误: {e}")
        return False


if __name__ == "__main__":
    mcp.run(transport="streamable-http", host="0.0.0.0", port=9000, path="/mcp")
复制代码

Agent配置

Agent 3详细配置如下:

MCP服务配置

复制代码
{
  "searxng-mcp-server": {
        "transport": "streamable_http",
        "url": "http://searxng-mcp-server-svc.mcp:9000/mcp/",
        "headers": {},
        "timeout": 60
    }
}
复制代码

指令

复制代码
## 技能
### 技能1:使用MCP工具进行联网搜索,获取到的相关内容进行总结分析
## 限制
- 如果没有相关内容,再进行联网搜索
- 你的回答应严格针对分析任务。使用结构化语言,逐步思考
- 使用的语言应和用户提问的语言相同
- 搜索的关键词必须和用户提问的内容一致
复制代码

 

最后直接回复,注意选择变量Agent3 text

八、dify测试

点击右上角的预览按钮

公网ip多少

注意:这里可以看到绿色的连接线条,可以清晰的看到工作流的走向,它确实是按照我预期的方向在走。

 

李华的老师,查询一下个人详细信息

可以看到通过问题分类器,分别走向AGENT 2,AGENT 4,最终得到的答案,是进行了脱敏处理。

上海今天天气如何

这里直接联网搜索答案了

 

赞(0) 打赏
分享到: 更多 (0)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

微信扫一扫打赏