diff --git a/.env.example b/.env.example index 396e0bb..ea3d4fc 100644 --- a/.env.example +++ b/.env.example @@ -5,6 +5,8 @@ SLACK_SIGNING_SECRET="xxxx" DYNAMODB_TABLE_NAME="gurumi-ai-bot-context" +KNOWLEDGE_BASE_ID="None" + MODEL_ID_TEXT="anthropic.claude-3-5-sonnet-20240620-v1:0" MODEL_ID_IMAGE="stability.stable-diffusion-xl-v1" diff --git a/.github/workflows/4-sync-notion.yml.stop b/.github/workflows/4-sync-notion.yml.stop new file mode 100644 index 0000000..4d0c74c --- /dev/null +++ b/.github/workflows/4-sync-notion.yml.stop @@ -0,0 +1,48 @@ +name: sync-notion + +on: + # push: + # branches: + # - main + + schedule: + - cron: "0 20 * * 0-4" + +env: + NOTION_PAGE_NAME: "notion" + NOTION_PAGE_ID: "0c7c08203a9b4435a4ca07b6454151d7" + + AWS_DEST_PATH: ${{ vars.AWS_DEST_PATH }} + +jobs: + sync: + runs-on: ubuntu-latest + + steps: + - name: Checkout ๐Ÿ›Ž๏ธ + uses: actions/checkout@v4 + with: + fetch-depth: 0 + + - name: Setup Python 3.9 ๐Ÿ + uses: actions/setup-python@v4 + with: + python-version: 3.9 + + - name: Setup Dependencies + run: pip install python-notion-exporter + + - name: Run Notion Exporter + env: + NOTION_TOKEN: ${{ secrets.NOTION_TOKEN }} + NOTION_FILE_TOKEN: ${{ secrets.NOTION_FILE_TOKEN }} + run: | + python bin/notion_exporter.py + + - name: Sync to AWS S3 Data Source + run: | + aws s3 sync build/ ${{ env.AWS_DEST_PATH }} --delete + env: + AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }} + AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }} + AWS_REGION: "us-east-1" diff --git a/.github/workflows/6-start-ingestion.yml.stop b/.github/workflows/6-start-ingestion.yml.stop new file mode 100644 index 0000000..762c3c5 --- /dev/null +++ b/.github/workflows/6-start-ingestion.yml.stop @@ -0,0 +1,38 @@ +name: start-ingestion-job + +on: + # push: + # branches: + # - main + + schedule: + - cron: "0 21 * * 0-4" + +env: + KNOWLEDGE_BASE_ID: ${{ vars.KNOWLEDGE_BASE_ID }} + DATA_SOURCE_ID: ${{ vars.DATA_SOURCE_ID }} + +jobs: + ingestion: + runs-on: ubuntu-latest + + steps: + - name: Checkout ๐Ÿ›Ž๏ธ + uses: actions/checkout@v4 + with: + fetch-depth: 0 + + - name: Setup Python 3.9 ๐Ÿ + uses: actions/setup-python@v4 + with: + python-version: 3.9 + + - name: Sync to AWS Bedrock Knowledge Base + run: | + aws bedrock-agent start-ingestion-job \ + --knowledge-base-id ${{ env.KNOWLEDGE_BASE_ID }} \ + --data-source-id ${{ env.DATA_SOURCE_ID }} + env: + AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }} + AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }} + AWS_REGION: "us-east-1" diff --git a/.github/workflows/push.yml b/.github/workflows/push.yml index f9df8c0..35aecdd 100644 --- a/.github/workflows/push.yml +++ b/.github/workflows/push.yml @@ -9,8 +9,7 @@ env: ALLOWED_CHANNEL_IDS: ${{ vars.ALLOWED_CHANNEL_IDS }} BOT_CURSOR: ${{ vars.BOT_CURSOR }} DYNAMODB_TABLE_NAME: ${{ vars.DYNAMODB_TABLE_NAME }} - ENABLE_IMAGE: ${{ vars.ENABLE_IMAGE }} - KB_ID: ${{ vars.KB_ID }} + KNOWLEDGE_BASE_ID: ${{ vars.KNOWLEDGE_BASE_ID }} MODEL_ID_IMAGE: ${{ vars.MODEL_ID_IMAGE }} MODEL_ID_TEXT: ${{ vars.MODEL_ID_TEXT }} SYSTEM_MESSAGE: ${{ vars.SYSTEM_MESSAGE }} @@ -52,8 +51,7 @@ jobs: echo "ALLOWED_CHANNEL_IDS=${ALLOWED_CHANNEL_IDS}" >> .env echo "BOT_CURSOR=${BOT_CURSOR}" >> .env echo "DYNAMODB_TABLE_NAME=${DYNAMODB_TABLE_NAME}" >> .env - echo "ENABLE_IMAGE=${ENABLE_IMAGE}" >> .env - echo "KB_ID=${KB_ID}" >> .env + echo "KNOWLEDGE_BASE_ID=${KNOWLEDGE_BASE_ID}" >> .env echo "MODEL_ID_IMAGE=${MODEL_ID_IMAGE}" >> .env echo "MODEL_ID_TEXT=${MODEL_ID_TEXT}" >> .env echo "SLACK_BOT_TOKEN=${SLACK_BOT_TOKEN}" >> .env diff --git a/.gitignore b/.gitignore index 8151dcd..8dfc142 100644 --- a/.gitignore +++ b/.gitignore @@ -1,33 +1,44 @@ -# Distribution / packaging -.Python -env/ -build/ -develop-eggs/ -dist/ -downloads/ -eggs/ -.eggs/ -lib/ -lib64/ -parts/ -sdist/ -var/ -*.egg-info/ -.installed.cfg -*.egg - -# NodeJS dependencies -node_modules/ - -# Serverless runtime cache -.serverless_sdk - -# Serverless directories -.serverless - -# Local configuration -.env +# Windows image file caches +Thumbs.db +ehthumbs.db + +# Folder config file +Desktop.ini +# Recycle Bin used on file shares +$RECYCLE.BIN/ + +# Windows shortcuts +*.lnk + +# Mac .DS_Store +# JetBrains +.idea/ +*.iml + +# Eclipse +.settings/ +.metadata/ + +# Build +target/ build/ +dist/ + +# Temp +*.pid +*.log +*.tmp + +# python +venv +*.pyc +staticfiles +.env +db.sqlite3 +__pycache__ + +# node +node_modules diff --git a/bin/.env.example b/bin/.env.example new file mode 100644 index 0000000..9600e30 --- /dev/null +++ b/bin/.env.example @@ -0,0 +1,5 @@ +NOTION_TOKEN = "v02%3Auser_token_or_cookies" +NOTION_FILE_TOKEN = "v02%3Afile_token" + +NOTION_PAGE_ID = "0c7c08203a9b4435a4ca07b6454151d7" +NOTION_PAGE_NAME = "notion" diff --git a/bin/README.md b/bin/README.md new file mode 100644 index 0000000..8351ff0 --- /dev/null +++ b/bin/README.md @@ -0,0 +1,19 @@ +# bin + +## Install + +```bash +$ brew install python@3.9 + +$ python -m pip install --upgrade -r requirements.txt +``` + +## Test + +```bash +python notion_exporter.py +``` + +## References + +* diff --git a/bin/__init__.py b/bin/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/bin/notion_exporter.py b/bin/notion_exporter.py new file mode 100644 index 0000000..e1216e4 --- /dev/null +++ b/bin/notion_exporter.py @@ -0,0 +1,26 @@ +import os + +from python_notion_exporter import NotionExporter, ExportType, ViewExportType + + +NOTION_TOKEN = os.getenv("NOTION_TOKEN") +NOTION_FILE_TOKEN = os.getenv("NOTION_FILE_TOKEN") + +NOTION_PAGE_ID = os.getenv("NOTION_PAGE_ID", "0c7c08203a9b4435a4ca07b6454151d7") +NOTION_PAGE_NAME = os.getenv("NOTION_PAGE_NAME", "demo") + + +if __name__ == "__main__": + exporter = NotionExporter( + token_v2=NOTION_TOKEN, + file_token=NOTION_FILE_TOKEN, + pages={NOTION_PAGE_NAME: NOTION_PAGE_ID}, + export_directory="build", + flatten_export_file_tree=True, + export_type=ExportType.MARKDOWN, + current_view_export_type=ViewExportType.CURRENT_VIEW, + include_files=False, + recursive=True, + export_name=NOTION_PAGE_NAME, + ) + exporter.process() diff --git a/bin/python_notion_exporter.py b/bin/python_notion_exporter.py new file mode 100644 index 0000000..b1561ce --- /dev/null +++ b/bin/python_notion_exporter.py @@ -0,0 +1,312 @@ +import concurrent +import json +import logging +import multiprocessing +import os +import shutil +import time +import requests + +from concurrent.futures import ThreadPoolExecutor +from datetime import datetime + +from tqdm import tqdm + + +class ExportType: + """Represent the different types of export formats.""" + + MARKDOWN = "markdown" + HTML = "html" + PDF = "pdf" + + +class ViewExportType: + """Represent the different view types for export.""" + + CURRENT_VIEW = "currentView" + ALL = "all" + + +class NotionExporter: + """Class to handle exporting Notion content.""" + + def __init__( + self, + token_v2: str, + file_token: str, + pages: dict, + export_directory: str = None, + flatten_export_file_tree: bool = True, + export_type: ExportType = ExportType.MARKDOWN, + current_view_export_type: ViewExportType = ViewExportType.CURRENT_VIEW, + include_files: bool = False, + recursive: bool = True, + workers: int = multiprocessing.cpu_count(), + export_name: str = None, + ): + """ + Initializes the NotionExporter class. + + Args: + token_v2 (str): The user's Notion V2 token. + file_token (str): The user's file token for Notion. + pages (dict): Dictionary of pages to be exported. + export_directory (str, optional): Directory where exports will be saved. Defaults to the current directory. + flatten_export_file_tree (bool, optional): If True, flattens the export file tree. Defaults to True. + export_type (ExportType, optional): Type of export (e.g., MARKDOWN, HTML, PDF). Defaults to MARKDOWN. + current_view_export_type (ViewExportType, optional): Type of view export (e.g., CURRENT_VIEW, ALL). Defaults to CURRENT_VIEW. + include_files (bool, optional): If True, includes files in the export. Defaults to False. + recursive (bool, optional): If True, exports will be recursive. Defaults to True. + workers (int, optional): Number of worker threads for exporting. Defaults to the number of CPUs available. + export_name (str, optional): Name of the export. Defaults to the current date and time. + """ + + self.export_name = ( + f"export-{datetime.now().strftime('%Y-%m-%d-%H-%M-%S')}" + if not export_name + else export_name + ) + self.token_v2 = token_v2 + self.file_token = file_token + self.include_files = include_files + self.recursive = recursive + self.pages = pages + self.current_view_export_type = current_view_export_type + self.flatten_export_file_tree = flatten_export_file_tree + self.export_type = export_type + self.export_directory = f"{export_directory}/" if export_directory else "" + self.download_headers = { + "content-type": "application/json", + "cookie": f"file_token={self.file_token};", + } + self.query_headers = { + "content-type": "application/json", + "cookie": f"token_v2={self.token_v2};", + } + self.workers = workers + os.makedirs(f"{self.export_directory}{self.export_name}", exist_ok=True) + + def _to_uuid_format(self, input_string: str) -> str: + """ + Converts a string to UUID format. + + Args: + input_string (str): The input string. + + Returns: + str: The string in UUID format. + """ + if ( + "-" == input_string[8] + and "-" == input_string[13] + and "-" == input_string[18] + and "-" == input_string[23] + ): + return input_string + return f"{input_string[:8]}-{input_string[8:12]}-{input_string[12:16]}-{input_string[16:20]}-{input_string[20:]}" + + def _get_format_options( + self, export_type: ExportType, include_files: bool = False + ) -> dict: + """ + Retrieves format options based on the export type and whether to include files. + + Args: + export_type (ExportType): Type of export (e.g., MARKDOWN, HTML, PDF). + include_files (bool, optional): If True, includes files in the export. Defaults to False. + + Returns: + dict: A dictionary containing format options. + """ + format_options = {} + if export_type == ExportType.PDF: + format_options["pdfFormat"] = "Letter" + + if not include_files: + format_options["includeContents"] = "no_files" + + return format_options + + def _export(self, page_id: str) -> str: + """ + Initiates the export of a Notion page. + + Args: + page_id (str): The ID of the Notion page. + + Returns: + str: The task ID of the initiated export. + """ + url = "https://www.notion.so/api/v3/enqueueTask" + page_id = self._to_uuid_format(input_string=page_id) + export_options = { + "exportType": self.export_type, + "locale": "en", + "timeZone": "Europe/London", + "collectionViewExportType": self.current_view_export_type, + "flattenExportFiletree": self.flatten_export_file_tree, + } + + # Update the exportOptions with format-specific options + export_options.update( + self._get_format_options( + export_type=self.export_type, include_files=self.include_files + ) + ) + + payload = json.dumps( + { + "task": { + "eventName": "exportBlock", + "request": { + "block": { + "id": page_id, + }, + "recursive": self.recursive, + "exportOptions": export_options, + }, + } + } + ) + + response = requests.request( + "POST", url, headers=self.query_headers, data=payload + ).json() + return response["taskId"] + + def _get_status(self, task_id: str) -> dict: + """ + Fetches the status of an export task. + + Args: + task_id (str): The ID of the export task. + + Returns: + dict: A dictionary containing details about the task status. + """ + url = "https://www.notion.so/api/v3/getTasks" + + payload = json.dumps({"taskIds": [task_id]}) + + response = requests.request( + "POST", url, headers=self.query_headers, data=payload + ).json() + + if not response["results"]: + # print(response) + return {"state": "failure", "error": "No results found."} + + return response["results"][0] + + def _download(self, url: str): + """ + Downloads an exported file from a given URL. + + Args: + url (str): The URL of the exported file. + """ + response = requests.request("GET", url, headers=self.download_headers) + file_name = url.split("/")[-1][100:] + with open( + f"{self.export_directory}{self.export_name}/{file_name}", + "wb", + ) as f: + f.write(response.content) + + def _process_page(self, page_details: tuple) -> dict: + """ + Processes an individual Notion page for export. + + Args: + page_details (tuple): Tuple containing the name and ID of the Notion page. + + Returns: + dict: Details about the export status and any errors. + """ + name, id = page_details + task_id = self._export(id) + + status, state, error, pages_exported = self._wait_for_export_completion( + task_id=task_id + ) + if state == "failure": + logging.error(f"Export failed for {name} with error: {error}") + return {"state": state, "name": name, "error": error} + + export_url = status.get("status", {}).get("exportURL") + if export_url: + self._download(export_url) + else: + logging.warning(f"Failed to get exportURL for {name}") + + return { + "state": state, + "name": name, + "exportURL": export_url, + "pagesExported": pages_exported, + } + + def _wait_for_export_completion(self, task_id: str) -> tuple[dict, str, str, int]: + """ + Waits until a given export task completes or fails. + + Args: + task_id (str): The ID of the export task. + + Returns: + tuple: A tuple containing the status, state, error, and number of pages exported. + """ + while True: + status = self._get_status(task_id) + + if not status: + time.sleep(5) + continue + state = status.get("state") + error = status.get("error") + if state == "failure" or status.get("status", {}).get("exportURL"): + return ( + status, + state, + error, + status.get("status", {}).get("pagesExported"), + ) + time.sleep(5) + + def _unpack(self): + """ + Unpacks and saves exported content from zip archives. + """ + directory_path = f"{self.export_directory}{self.export_name}" + for file in os.listdir(directory_path): + if file.endswith(".zip"): + full_file_path = os.path.join(directory_path, file) + shutil.unpack_archive(full_file_path, directory_path, "zip") + os.remove(full_file_path) + + def process(self): + """ + Processes and exports all provided Notion pages. + """ + logging.info(f"Exporting {len(self.pages)} pages...") + + with ThreadPoolExecutor(max_workers=self.workers) as executor: + with tqdm(total=len(self.pages), dynamic_ncols=True) as pbar: + futures = { + executor.submit(self._process_page, item): item + for item in self.pages.items() + } + for future in concurrent.futures.as_completed(futures): + result = future.result() + if result["state"] == "failure": + continue + name = result["name"] + pagesExported = result["pagesExported"] + + pbar.set_postfix_str( + f"Exporting {name}... {pagesExported} pages already exported" + ) + pbar.update(1) + + self._unpack() diff --git a/bin/requirements.txt b/bin/requirements.txt new file mode 100644 index 0000000..8388061 --- /dev/null +++ b/bin/requirements.txt @@ -0,0 +1,4 @@ +boto3 +python-notion-exporter +requests +tqdm diff --git a/handler.py b/handler.py index b34c4df..33feeae 100644 --- a/handler.py +++ b/handler.py @@ -5,9 +5,6 @@ import re import sys import time -import base64 -import requests -import io from botocore.client import Config @@ -24,40 +21,33 @@ SLACK_SIGNING_SECRET = os.environ["SLACK_SIGNING_SECRET"] # Keep track of conversation history by thread and user -DYNAMODB_TABLE_NAME = os.environ.get("DYNAMODB_TABLE_NAME", "gurumi-ai-bot-context") +DYNAMODB_TABLE_NAME = os.environ.get("DYNAMODB_TABLE_NAME", "gurumi-bot-context") # Amazon Bedrock Knowledge Base ID -KB_ID = os.environ.get("KB_ID", "None") +KNOWLEDGE_BASE_ID = os.environ.get("KNOWLEDGE_BASE_ID", "None") -# Amazon Bedrock Model ID -MODEL_ID_TEXT = os.environ.get("MODEL_ID_TEXT", "anthropic.claude-3") -MODEL_ID_IMAGE = os.environ.get("MODEL_ID_IMAGE", "stability.stable-diffusion-xl") +KB_RETRIEVE_COUNT = int(os.environ.get("KB_RETRIEVE_COUNT", 5)) +# Amazon Bedrock Model ID ANTHROPIC_VERSION = os.environ.get("ANTHROPIC_VERSION", "bedrock-2023-05-31") ANTHROPIC_TOKENS = int(os.environ.get("ANTHROPIC_TOKENS", 1024)) +MODEL_ID_TEXT = os.environ.get("MODEL_ID_TEXT", "anthropic.claude-3") +MODEL_ID_IMAGE = os.environ.get("MODEL_ID_IMAGE", "stability.stable-diffusion-xl") + # Set up the allowed channel ID ALLOWED_CHANNEL_IDS = os.environ.get("ALLOWED_CHANNEL_IDS", "None") -ENABLE_IMAGE = os.environ.get("ENABLE_IMAGE", "False") - # Set up System messages SYSTEM_MESSAGE = os.environ.get("SYSTEM_MESSAGE", "None") MAX_LEN_SLACK = int(os.environ.get("MAX_LEN_SLACK", 3000)) MAX_LEN_BEDROCK = int(os.environ.get("MAX_LEN_BEDROCK", 4000)) -KEYWARD_IMAGE = "๊ทธ๋ ค์ค˜" - +MSG_KNOWLEDGE = "์ง€์‹ ๊ธฐ๋ฐ˜ ๊ฒ€์ƒ‰ ์ค‘... " + BOT_CURSOR MSG_PREVIOUS = "์ด์ „ ๋Œ€ํ™” ๋‚ด์šฉ ํ™•์ธ ์ค‘... " + BOT_CURSOR -MSG_IMAGE_DESCRIBE = "์ด๋ฏธ์ง€ ๊ฐ์ƒ ์ค‘... " + BOT_CURSOR -MSG_IMAGE_GENERATE = "์ด๋ฏธ์ง€ ์ƒ์„ฑ ์ค€๋น„ ์ค‘... " + BOT_CURSOR -MSG_IMAGE_DRAW = "์ด๋ฏธ์ง€ ๊ทธ๋ฆฌ๋Š” ์ค‘... " + BOT_CURSOR MSG_RESPONSE = "์‘๋‹ต ๊ธฐ๋‹ค๋ฆฌ๋Š” ์ค‘... " + BOT_CURSOR -COMMAND_DESCRIBE = "Describe the image in great detail as if viewing a photo." -COMMAND_GENERATE = "Convert the above sentence into a command for stable-diffusion to generate an image within 1000 characters. Just give me a prompt." - CONVERSION_ARRAY = [ ["**", "*"], # ["#### ", "๐Ÿ”ธ "], @@ -176,6 +166,58 @@ def chat_update(say, channel, thread_ts, latest_ts, message="", continue_thread= return message, latest_ts +# Get thread messages using conversations.replies API method +def conversations_replies(channel, ts, client_msg_id): + contexts = [] + + try: + response = app.client.conversations_replies(channel=channel, ts=ts) + + print("conversations_replies: {}".format(response)) + + if not response.get("ok"): + print( + "conversations_replies: {}".format( + "Failed to retrieve thread messages." + ) + ) + + messages = response.get("messages", []) + messages.reverse() + messages.pop(0) # remove the first message + + for message in messages: + if message.get("client_msg_id", "") == client_msg_id: + continue + + role = "user" + if message.get("bot_id", "") != "": + role = "assistant" + + contexts.append( + { + "role": role, + "content": message.get("text", ""), + } + ) + + # print("conversations_replies: messages size: {}".format(sys.getsizeof(messages))) + + if sys.getsizeof(contexts) > MAX_LEN_BEDROCK: + contexts.pop(0) # remove the oldest message + break + + contexts.reverse() + + except Exception as e: + print("conversations_replies: Error: {}".format(e)) + + print("conversations_replies: getsizeof: {}".format(sys.getsizeof(contexts))) + # print("conversations_replies: {}".format(contexts)) + + return contexts + + def invoke_knowledge_base(content): """ Invokes the Amazon Bedrock Knowledge Base to retrieve information using the input @@ -185,33 +227,38 @@ def invoke_knowledge_base(content): :return: The retrieved contexts from the knowledge base. """ + contexts = [] + + if KNOWLEDGE_BASE_ID == "None": + return contexts + try: response = bedrock_agent_client.retrieve( retrievalQuery={"text": content}, - knowledgeBaseId=KB_ID, + knowledgeBaseId=KNOWLEDGE_BASE_ID, retrievalConfiguration={ "vectorSearchConfiguration": { - "numberOfResults": 3, + "numberOfResults": KB_RETRIEVE_COUNT, # "overrideSearchType": "HYBRID", # optional } }, ) - retrievalResults = response["retrievalResults"] + results = response["retrievalResults"] contexts = [] - for retrievedResult in retrievalResults: - contexts.append(retrievedResult["content"]["text"]) - - return contexts + for result in results: + contexts.append(result["content"]["text"]) except Exception as e: print("invoke_knowledge_base: Error: {}".format(e)) - raise e + print("invoke_knowledge_base: {}".format(contexts)) + + return contexts -def invoke_claude_3(content): +def invoke_claude_3(prompt): """ Invokes Anthropic Claude 3 Sonnet to run an inference using the input provided in the request body. @@ -227,14 +274,11 @@ def invoke_claude_3(content): "messages": [ { "role": "user", - "content": content, + "content": [{"type": "text", "text": prompt}], }, ], } - if SYSTEM_MESSAGE != "None": - body["system"] = SYSTEM_MESSAGE - response = bedrock.invoke_model( modelId=MODEL_ID_TEXT, body=json.dumps(body), @@ -258,263 +302,82 @@ def invoke_claude_3(content): raise e -def invoke_stable_diffusion(prompt, seed=0, style_preset="photographic"): - """ - Invokes the Stability.ai Stable Diffusion XL model to create an image using - the input provided in the request body. - - :param prompt: The prompt that you want Stable Diffusion to use for image generation. - :param seed: Random noise seed (omit this option or use 0 for a random seed) - :param style_preset: Pass in a style preset to guide the image model towards - a particular style. - :return: Base64-encoded inference response from the model. - """ - - try: - # The different model providers have individual request and response formats. - # For the format, ranges, and available style_presets of Stable Diffusion models refer to: - # https://docs.aws.amazon.com/bedrock/latest/userguide/model-parameters-stability-diffusion.html - - body = { - "text_prompts": [{"text": prompt}], - "seed": seed, - "cfg_scale": 10, - "steps": 30, - "samples": 1, - } - - if style_preset: - body["style_preset"] = style_preset - - response = bedrock.invoke_model( - modelId=MODEL_ID_IMAGE, - body=json.dumps(body), - ) - - body = json.loads(response["body"].read()) - - base64_image = body.get("artifacts")[0].get("base64") - base64_bytes = base64_image.encode("ascii") - - image = base64.b64decode(base64_bytes) - - return image - - except Exception as e: - print("invoke_stable_diffusion: Error: {}".format(e)) - - raise e - - -# Get thread messages using conversations.replies API method -def conversations_replies(channel, ts, client_msg_id): - messages = [] - - try: - response = app.client.conversations_replies(channel=channel, ts=ts) - - print("conversations_replies: {}".format(response)) - - if not response.get("ok"): - print( - "conversations_replies: {}".format( - "Failed to retrieve thread messages." - ) - ) - - res_messages = response.get("messages", []) - res_messages.reverse() - res_messages.pop(0) # remove the first message - - for message in res_messages: - if message.get("client_msg_id", "") == client_msg_id: - continue - - role = "user" - if message.get("bot_id", "") != "": - role = "assistant" - - messages.append( - { - "role": role, - "content": message.get("text", ""), - } - ) - - # print("conversations_replies: messages size: {}".format(sys.getsizeof(messages))) - - if sys.getsizeof(messages) > MAX_LEN_BEDROCK: - messages.pop(0) # remove the oldest message - break - - messages.reverse() - - except Exception as e: - print("conversations_replies: {}".format(e)) - - print("conversations_replies: {}".format(messages)) - - return messages - - # Handle the chatgpt conversation -def conversation(say: Say, thread_ts, content, channel, user, client_msg_id): - print("conversation: {}".format(json.dumps(content))) +def conversation(say: Say, thread_ts, query, channel, client_msg_id): + print("conversation: query: {}".format(query)) # Keep track of the latest message timestamp result = say(text=BOT_CURSOR, thread_ts=thread_ts) latest_ts = result["ts"] - prompt = content[0]["text"] - - type = "text" - if ENABLE_IMAGE == "True" and KEYWARD_IMAGE in prompt: - type = "image" - - print("conversation: {}".format(type)) - prompts = [] + prompts.append( + "Human: You are a advisor AI system, and provides answers to questions by using fact based and statistical information when possible." + ) + prompts.append( + "If you don't know the answer, just say that you don't know, don't try to make up an answer." + ) - try: - # Get the thread messages - if thread_ts != None: - chat_update(say, channel, thread_ts, latest_ts, MSG_PREVIOUS) - - replies = conversations_replies(channel, thread_ts, client_msg_id) - - prompts = [ - reply["content"] for reply in replies if reply["content"].strip() - ] - - # Get the image from the message - if type == "image" and len(content) > 1: - chat_update(say, channel, thread_ts, latest_ts, MSG_IMAGE_DESCRIBE) - - content[0]["text"] = COMMAND_DESCRIBE - - # Send the prompt to Bedrock - message = invoke_claude_3(content) - - prompts.append(message) - - if KB_ID != "None": - chat_update(say, channel, thread_ts, latest_ts, MSG_RESPONSE) + if SYSTEM_MESSAGE != "None": + prompts.append(SYSTEM_MESSAGE) - # Get the knowledge base contexts - contexts = invoke_knowledge_base(prompt) - - prompts.extend(contexts) + try: + # Get the knowledge base contexts + if KNOWLEDGE_BASE_ID != "None": + chat_update(say, channel, thread_ts, latest_ts, MSG_KNOWLEDGE) - if prompt: - prompts.append(prompt) + contexts = invoke_knowledge_base(query) - if type == "image": - chat_update(say, channel, thread_ts, latest_ts, MSG_IMAGE_GENERATE) + prompts.append( + "Use the following pieces of information to provide a concise answer to the question enclosed in tags." + ) + prompts.append("") + for reply in contexts: + prompts.append(reply["content"]) + prompts.append("") + else: + # Get the previous conversation contexts + if thread_ts != None: + chat_update(say, channel, thread_ts, latest_ts, MSG_PREVIOUS) - prompts.append(COMMAND_GENERATE) + contexts = conversations_replies(channel, thread_ts, client_msg_id) - prompt = "\n\n\n".join(prompts) + prompts.append("") + for context in contexts: + prompts.append(context["content"]) + prompts.append("") - content = [] - content.append({"type": "text", "text": prompt}) + # Add the question to the prompts + prompts.append("") + prompts.append("") + prompts.append(query) + prompts.append("") + prompts.append("") - # Send the prompt to Bedrock - message = invoke_claude_3(content) + # prompts.append("The response should be specific and use statistics or numbers when possible.") + prompts.append("Assistant:") - chat_update(say, channel, thread_ts, latest_ts, MSG_IMAGE_DRAW) + # Combine the prompts + prompt = "\n".join(prompts) - image = invoke_stable_diffusion(message) + # print("conversation: prompt: {}".format(prompt)) - if image: - # Update the message in Slack - chat_update(say, channel, thread_ts, latest_ts, message) + chat_update(say, channel, thread_ts, latest_ts, MSG_RESPONSE) - # Send the image to Slack - app.client.files_upload_v2( - channels=channel, - thread_ts=thread_ts, - file=io.BytesIO(image), - filename="image.jpg", - title="Generated Image", - initial_comment="Here is the generated image.", - ) - else: - chat_update(say, channel, thread_ts, latest_ts, MSG_RESPONSE) + # Send the prompt to Bedrock + message = invoke_claude_3(prompt) - prompt = "\n\n\n".join(prompts) + print("conversation: message: {}".format(message)) - content[0]["text"] = prompt - - # Send the prompt to Bedrock - message = invoke_claude_3(content) - - # Update the message in Slack - chat_update(say, channel, thread_ts, latest_ts, message) + # Update the message in Slack + chat_update(say, channel, thread_ts, latest_ts, message) except Exception as e: - print("conversation: Error: {}".format(e)) + print("conversation: error: {}".format(e)) chat_update(say, channel, thread_ts, latest_ts, f"```{e}```") -# Get image from URL -def get_image_from_url(image_url, token=None): - headers = {} - if token: - headers["Authorization"] = f"Bearer {token}" - - response = requests.get(image_url, headers=headers) - - if response.status_code == 200: - return response.content - else: - print("Failed to fetch image: {}".format(image_url)) - - return None - - -# Get image from Slack -def get_image_from_slack(image_url): - return get_image_from_url(image_url, SLACK_BOT_TOKEN) - - -# Get encoded image from Slack -def get_encoded_image_from_slack(image_url): - image = get_image_from_slack(image_url) - - if image: - return base64.b64encode(image).decode("utf-8") - - return None - - -# Extract content from the message -def content_from_message(prompt, event): - content = [] - content.append({"type": "text", "text": prompt}) - - if "files" in event: - files = event.get("files", []) - for file in files: - mimetype = file["mimetype"] - if mimetype.startswith("image"): - image_url = file.get("url_private") - base64_image = get_encoded_image_from_slack(image_url) - if base64_image: - content.append( - { - "type": "image", - "source": { - "type": "base64", - "media_type": mimetype, - "data": base64_image, - }, - } - ) - - return content - - # Handle the app_mention event @app.event("app_mention") def handle_mention(body: dict, say: Say): @@ -526,23 +389,23 @@ def handle_mention(body: dict, say: Say): # # Ignore messages from the bot itself # return + thread_ts = event["thread_ts"] if "thread_ts" in event else event["ts"] + channel = event["channel"] + client_msg_id = event["client_msg_id"] if ALLOWED_CHANNEL_IDS != "None": allowed_channel_ids = ALLOWED_CHANNEL_IDS.split(",") if channel not in allowed_channel_ids: - # say("Sorry, I'm not allowed to respond in this channel.") + say( + text="Sorry, I'm not allowed to respond in this channel.", + thread_ts=thread_ts, + ) return - thread_ts = event["thread_ts"] if "thread_ts" in event else event["ts"] - user = event["user"] - client_msg_id = event["client_msg_id"] - prompt = re.sub(f"<@{bot_id}>", "", event["text"]).strip() - content = content_from_message(prompt, event) - - conversation(say, thread_ts, content, channel, user, client_msg_id) + conversation(say, thread_ts, prompt, channel, client_msg_id) # Handle the DM (direct message) event @@ -557,15 +420,20 @@ def handle_message(body: dict, say: Say): return channel = event["channel"] - user = event["user"] client_msg_id = event["client_msg_id"] prompt = event["text"].strip() - content = content_from_message(prompt, event) - # Use thread_ts=None for regular messages, and user ID for DMs - conversation(say, None, content, channel, user, client_msg_id) + conversation(say, None, prompt, channel, client_msg_id) + + +def success(): + return { + "statusCode": 200, + "headers": {"Content-type": "application/json"}, + "body": json.dumps({"status": "Success"}), + } # Handle the Lambda function @@ -584,22 +452,14 @@ def lambda_handler(event, context): # Duplicate execution prevention if "event" not in body or "client_msg_id" not in body["event"]: - return { - "statusCode": 200, - "headers": {"Content-type": "application/json"}, - "body": json.dumps({"status": "Success"}), - } + return success() # Get the context from DynamoDB token = body["event"]["client_msg_id"] prompt = get_context(token, body["event"]["user"]) if prompt != "": - return { - "statusCode": 200, - "headers": {"Content-type": "application/json"}, - "body": json.dumps({"status": "Success"}), - } + return success() # Put the context in DynamoDB put_context(token, body["event"]["user"], body["event"]["text"]) diff --git a/serverless.yml b/serverless.yml index 0b0c1e8..b3b4ad8 100644 --- a/serverless.yml +++ b/serverless.yml @@ -7,17 +7,18 @@ provider: # memorySize: 5120 timeout: 600 environment: - DYNAMODB_TABLE_NAME: gurumi-ai-bot-context + BASE_NAME: gurumi-ai-bot iamRoleStatements: - Effect: Allow Action: - dynamodb:* Resource: - - "arn:aws:dynamodb:*:*:table/${self:provider.environment.DYNAMODB_TABLE_NAME}" + - "arn:aws:dynamodb:*:*:table/${self:provider.environment.BASE_NAME}-*" - Effect: Allow Action: - bedrock:Retrieve Resource: + # - "arn:aws:bedrock:*:*:knowledge-base/${self:provider.environment.BASE_NAME}-*" - "arn:aws:bedrock:*:*:knowledge-base/*" - Effect: Allow Action: @@ -39,7 +40,7 @@ resources: DynamoDBTable: Type: AWS::DynamoDB::Table Properties: - TableName: ${self:provider.environment.DYNAMODB_TABLE_NAME} + TableName: ${self:provider.environment.BASE_NAME}-context AttributeDefinitions: - AttributeName: id AttributeType: S @@ -52,6 +53,15 @@ resources: TimeToLiveSpecification: AttributeName: expire_at Enabled: true + # S3Bucket: + # Type: AWS::S3::Bucket + # Properties: + # BucketName: + # Fn::Sub: ${self:provider.environment.BASE_NAME}-${AWS::AccountId} + # KnowledgeBase: + # Type: AWS::Bedrock::KnowledgeBase + # Properties: + # Name: ${self:provider.environment.BASE_NAME}-knowledge-base plugins: - serverless-python-requirements