Skip to content

Commit

Permalink
feat: v0.1版本,支持接入问答助手、工作流、文本生成应用,且支持对话中用户上下文。
Browse files Browse the repository at this point in the history
  • Loading branch information
zfanswer committed Aug 19, 2024
1 parent 8716e25 commit b3bca5e
Show file tree
Hide file tree
Showing 19 changed files with 564 additions and 0 deletions.
7 changes: 7 additions & 0 deletions .bots.yaml.example
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
bots:
- name: 问答助手
dingtalk_app_client_id: <your-dingtalk-app-client-id>
dingtalk_app_client_secret: <your-dingtalk-app-client-secret>
dify_app_type: <chatbot or completion or workflow>
dify_app_api_key: <your-dify-api-key-per-app>
handler: DifyAiCardBotHandler
10 changes: 10 additions & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
tests/
tests/*.*
!tests/.coveragerc
docs/
docker/
.aliyun/
venv/
.env
.bots.yaml
.git/
11 changes: 11 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# app config
LOG_LEVEL=INFO
DEFAULT_MAX_WORKERS=2

# dify service config
DIFY_OPEN_API_URL="https://api.dify.ai/v1"
# 用户各自上下文维持时间,默认 15 minutes,只对chatbot app有效
DIFY_CONVERSATION_REMAIN_TIME=15

# dingtalk config
DINGTALK_AI_CARD_TEMPLATE_ID="<your-dingtalk-ai-card-temp-id>"
16 changes: 16 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
FROM python:3.10-slim-bookworm AS base
LABEL authors="zfanswer"

WORKDIR /app
COPY . /app

# 安装系统依赖
RUN apt-get update && apt-get install -y --no-install-recommends build-essential \
&& rm -rf /var/lib/apt/lists/*

# 安装python依赖
RUN pip config set global.index-url https://pypi.tuna.tsinghua.edu.cn/simple \
&& pip install --no-cache-dir -r requirements.txt \
&& rm -rf /root/.cache/pip

CMD ["python", "app.py"]
62 changes: 62 additions & 0 deletions app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# __author__ = 'zfanswer'
import sys
from concurrent.futures import ThreadPoolExecutor

import dingtalk_stream
from dingtalk_stream import CallbackHandler
from loguru import logger

from configs import DIFY_OPEN_API_URL, LOG_LEVEL, load_bots_config, DEFAULT_MAX_WORKERS
from core.dify_client import ChatClient, CompletionClient, WorkflowClient
from core.handlers import HandlerFactory

logger.remove()
logger.add(sys.stdout, level=LOG_LEVEL)


def start_dingtalk_stream_client(app_client_id: str, app_client_secret: str, callback_handler: CallbackHandler):
credential = dingtalk_stream.Credential(app_client_id, app_client_secret)
client = dingtalk_stream.DingTalkStreamClient(credential, logger)
# client.register_all_event_handler(event_handler())
client.register_callback_handler(dingtalk_stream.ChatbotMessage.TOPIC, callback_handler)
client.start_forever()


def run():
bots_conf = load_bots_config()
bots_cnt = len(bots_conf["bots"])
max_workers_num = 0
for bot in bots_conf["bots"]:
max_workers_num += bot.get("max_workers", DEFAULT_MAX_WORKERS)
logger.info(f"待启动机器人数量:{bots_cnt}, 预计使用最大线程数:{max_workers_num}")
with ThreadPoolExecutor(max_workers=max_workers_num) as executor:
futures = []
for i, bot in enumerate(bots_conf["bots"]):
logger.info(f"启动第{i+1}个机器人:{bot['name']}")
logger.debug(bot)
bot_worker_num = bot.get("max_workers", DEFAULT_MAX_WORKERS)
bot_app_client_id = bot["dingtalk_app_client_id"]
bot_app_client_secret = bot["dingtalk_app_client_secret"]
# 根据app类型,使用不同的dify api client
if bot["dify_app_type"].lower() == "chatbot":
bot_dify_client = ChatClient(api_key=bot["dify_app_api_key"], base_url=DIFY_OPEN_API_URL)
elif bot["dify_app_type"].lower() == "completion":
bot_dify_client = CompletionClient(api_key=bot["dify_app_api_key"], base_url=DIFY_OPEN_API_URL)
elif bot["dify_app_type"].lower() == "workflow":
bot_dify_client = WorkflowClient(api_key=bot["dify_app_api_key"], base_url=DIFY_OPEN_API_URL)
else:
raise ValueError(f"不支持的机器人类型:{bot['dify_app_type']}")
# bot_dify_client = ChatClient(api_key=bot["dify_app_api_key"], base_url=DIFY_OPEN_API_URL)
handler_params = {"dify_api_client": bot_dify_client}
bot_handler = HandlerFactory.create_handler(bot["handler"], **handler_params)
for _ in range(bot_worker_num):
futures.append(executor.submit(start_dingtalk_stream_client, bot_app_client_id, bot_app_client_secret, bot_handler))
# 等待所有线程完成
for future in futures:
future.result()


if __name__ == "__main__":
run()
37 changes: 37 additions & 0 deletions configs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# __author__ = 'zfanswer'

import os

import yaml
from dotenv import load_dotenv
from loguru import logger

load_dotenv(override=False, verbose=True)

try:
# app config
LOG_LEVEL = os.getenv("LOG_LEVEL", default="INFO")
DEFAULT_MAX_WORKERS = int(os.getenv("DEFAULT_MAX_WORKERS", default=2))
# dify service config
DIFY_OPEN_API_URL = os.getenv("DIFY_OPEN_API_URL", default="https://api.dify.ai/v1")
except (TypeError, ValueError) as e:
logger.error(f"Error converting environment variable: {e}")
raise e


def load_bots_config():
"""
load bots config from file
:return:
"""
with open(".bots.yaml", "r") as f:
bots_conf = yaml.safe_load(f)
return bots_conf


if __name__ == "__main__":
# 使用示例
print(os.getenv("DIFY_OPEN_API_URL"))
print(type(DEFAULT_MAX_WORKERS))
3 changes: 3 additions & 0 deletions core/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# __author__ = 'zfanswer'
52 changes: 52 additions & 0 deletions core/cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# __author__ = 'zfanswer'
import time


class Cache:
def __init__(self, expiry_time=60):
self.cache = {} # 普通的字典来存储数据
self.expiry_time = expiry_time

def _is_expired(self, key):
return time.time() - self.cache[key][1] > self.expiry_time

def set(self, key, value):
# 插入新的key-value,并存储当前时间戳
self.cache[key] = (value, time.time())

def get(self, key):
if key in self.cache:
if not self._is_expired(key):
return self.cache[key][0] # 返回值
else:
del self.cache[key] # 如果过期,删除该key
return None

def cleanup(self):
# 清除过期的缓存
keys_to_delete = []
for key in list(self.cache):
if self._is_expired(key):
keys_to_delete.append(key)
for key in keys_to_delete:
del self.cache[key]

def __str__(self):
# 用于查看缓存内容
return str({k: v[0] for k, v in self.cache.items()})


if __name__ == "__main__":
# 使用示例
cache = Cache(expiry_time=5) # 设置过期时间为5秒

cache.set("a", 1)
cache.set("b", 2)

time.sleep(3)
print(cache.get("a")) # 输出: 1

time.sleep(3)
print(cache.get("a")) # 输出: None, 因为 "a" 已经过期
125 changes: 125 additions & 0 deletions core/dify_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
import requests


class DifyClient:
def __init__(self, api_key, base_url: str = "https://api.dify.ai/v1"):
self.api_key = api_key
self.base_url = base_url

def query(self, *args, **kwargs):
# interface for subclasses to implement the api call entry point
raise NotImplementedError("Subclasses must implement this method.")

def _send_request(self, method, endpoint, json=None, params=None, stream=False):
headers = {"Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json"}
url = f"{self.base_url}{endpoint}"
response = requests.request(method, url, json=json, params=params, headers=headers, stream=stream)
return response

def _send_request_with_files(self, method, endpoint, data, files):
headers = {"Authorization": f"Bearer {self.api_key}"}
url = f"{self.base_url}{endpoint}"
response = requests.request(method, url, data=data, headers=headers, files=files)
return response

def message_feedback(self, message_id, rating, user):
data = {"rating": rating, "user": user}
return self._send_request("POST", f"/messages/{message_id}/feedbacks", data)

def get_application_parameters(self, user):
params = {"user": user}
return self._send_request("GET", "/parameters", params=params)

def file_upload(self, user, files):
data = {"user": user}
return self._send_request_with_files("POST", "/files/upload", data=data, files=files)


class ChatClient(DifyClient):

def query(self, inputs, query, user, response_mode="blocking", files=None, conversation_id=None, auto_generate_name=False):
return self.create_chat_messages(inputs, query, user, response_mode, conversation_id, files, auto_generate_name)

def create_chat_messages(
self, inputs, query, user, response_mode="blocking", conversation_id=None, files=None, auto_generate_name=False
):
data = {
"inputs": inputs,
"query": query,
"user": user,
"response_mode": response_mode,
"files": files,
"auto_generate_name": auto_generate_name,
}
if conversation_id:
data["conversation_id"] = conversation_id
streaming = True if response_mode == "streaming" else False
return self._send_request("POST", "/chat-messages", data, stream=streaming)

def get_conversation_messages(self, user, conversation_id=None, first_id=None, limit=None):
params = {"user": user}
if conversation_id:
params["conversation_id"] = conversation_id
if first_id:
params["first_id"] = first_id
if limit:
params["limit"] = limit
return self._send_request("GET", "/messages", params=params)

def get_conversations(self, user, last_id=None, limit=None, pinned=None):
params = {"user": user, "last_id": last_id, "limit": limit, "pinned": pinned}
return self._send_request("GET", "/conversations", params=params)

def rename_conversation(self, conversation_id, name, user):
data = {"name": name, "user": user}
return self._send_request("POST", f"/conversations/{conversation_id}/name", data)


class CompletionClient(DifyClient):
def query(self, inputs, query, user, response_mode="blocking", files=None, **kwargs):
inputs["query"] = query
return self.create_completion_messages(inputs, user, response_mode, files)

def create_completion_messages(self, inputs, user, response_mode="blocking", files=None):
data = {"inputs": inputs, "response_mode": response_mode, "user": user, "files": files}
streaming = True if response_mode == "streaming" else False
return self._send_request("POST", "/completion-messages", data, stream=streaming)


class WorkflowClient(DifyClient):
def query(self, inputs, query, user, response_mode="blocking", files=None, **kwargs):
inputs["query"] = query
return self.workflow_run(inputs, user, response_mode, files)

def workflow_run(self, inputs, user, response_mode="blocking", files=None):
data = {"inputs": inputs, "response_mode": response_mode, "user": user, "files": files}
streaming = True if response_mode == "streaming" else False
return self._send_request("POST", "/workflows/run", data, stream=streaming)


if __name__ == "__main__":
client = ChatClient(api_key="app-xxx", base_url="http://192.168.250.64/v1")
# client = WorkflowClient(api_key="app-xxx", base_url="http://192.168.250.64/v1")
# client = CompletionClient(api_key="app-xxx", base_url="http://192.168.250.64/v1")
# 测试 streaming 模式
query = "每月几号发工资?"
user = "user"
response_mode = "streaming"
conversation_id = None
files = None
inputs = {}

inputs["query"] = query
response = client.query(inputs, query, user, response_mode, files, conversation_id=conversation_id)

# 处理sse流式返回
from sseclient import SSEClient
import json

if response.status_code != 200:
print(response.text)
exit(1)
sse_client = SSEClient(response)
for event in sse_client.events():
# print(event.data)
print(json.loads(event.data))
Loading

0 comments on commit b3bca5e

Please sign in to comment.