diff --git a/examples/pipelines/adaptive-rag/Dockerfile b/examples/pipelines/adaptive-rag/Dockerfile index 8ff0ad7..2e2ca8a 100644 --- a/examples/pipelines/adaptive-rag/Dockerfile +++ b/examples/pipelines/adaptive-rag/Dockerfile @@ -3,9 +3,12 @@ FROM pathwaycom/pathway:latest WORKDIR /app RUN apt-get update \ - && apt-get install -y python3-opencv \ + && apt-get install -y python3-opencv tesseract-ocr-eng \ && rm -rf /var/lib/apt/lists/* /var/cache/apt/archives/* +COPY requirements.txt . +RUN pip install -U --no-cache-dir -r requirements.txt + COPY . . EXPOSE 8000 diff --git a/examples/pipelines/adaptive-rag/app.py b/examples/pipelines/adaptive-rag/app.py index a35a1bf..68f224c 100644 --- a/examples/pipelines/adaptive-rag/app.py +++ b/examples/pipelines/adaptive-rag/app.py @@ -2,71 +2,43 @@ import pathway as pw from dotenv import load_dotenv -from pathway.udfs import DiskCache, ExponentialBackoffRetryStrategy -from pathway.xpacks.llm import embedders, llms, parsers, splitters -from pathway.xpacks.llm.question_answering import AdaptiveRAGQuestionAnswerer -from pathway.xpacks.llm.vector_store import VectorStoreServer +from pathway.xpacks.llm.question_answering import SummaryQuestionAnswerer +from pathway.xpacks.llm.servers import QASummaryRestServer +from pydantic import BaseModel, ConfigDict, InstanceOf # To use advanced features with Pathway Scale, get your free license key from # https://pathway.com/features and paste it below. # To use Pathway Community, comment out the line below. pw.set_license_key("demo-license-key-with-telemetry") -load_dotenv() - logging.basicConfig( level=logging.INFO, format="%(asctime)s %(name)s %(levelname)s %(message)s", datefmt="%Y-%m-%d %H:%M:%S", ) +load_dotenv() -if __name__ == "__main__": - path = "./data" - - my_folder = pw.io.fs.read( - path=path, - format="binary", - with_metadata=True, - ) - - sources = [ - my_folder - ] # define the inputs (local folders, google drive, sharepoint, ...) - - DEFAULT_GPT_MODEL = "gpt-3.5-turbo" - - chat = llms.OpenAIChat( - model=DEFAULT_GPT_MODEL, - retry_strategy=ExponentialBackoffRetryStrategy(max_retries=6), - cache_strategy=DiskCache(), - temperature=0.0, - ) - app_host = "0.0.0.0" - app_port = 8000 +class App(BaseModel): + question_answerer: InstanceOf[SummaryQuestionAnswerer] + host: str = "0.0.0.0" + port: int = 8000 - parser = parsers.ParseUnstructured() - text_splitter = splitters.TokenCountSplitter(max_tokens=400) - embedder = embedders.OpenAIEmbedder(cache_strategy=DiskCache()) + with_cache: bool = True + terminate_on_error: bool = False - vector_server = VectorStoreServer( - *sources, - embedder=embedder, - splitter=text_splitter, - parser=parser, - ) + def run(self) -> None: + server = QASummaryRestServer(self.host, self.port, self.question_answerer) + server.run( + with_cache=self.with_cache, terminate_on_error=self.terminate_on_error + ) - app = AdaptiveRAGQuestionAnswerer( - llm=chat, - indexer=vector_server, - default_llm_name=DEFAULT_GPT_MODEL, - n_starting_documents=2, - factor=2, - max_iterations=4, - strict_prompt=True, - ) + model_config = ConfigDict(extra="forbid") - app.build_server(host=app_host, port=app_port) - app.run_server(with_cache=True) +if __name__ == "__main__": + with open("app.yaml") as f: + config = pw.load_yaml(f) + app = App(**config) + app.run() diff --git a/examples/pipelines/adaptive-rag/app.yaml b/examples/pipelines/adaptive-rag/app.yaml new file mode 100644 index 0000000..e62536d --- /dev/null +++ b/examples/pipelines/adaptive-rag/app.yaml @@ -0,0 +1,72 @@ +$sources: + - !pw.io.fs.read + path: data + format: binary + with_metadata: true + + # - !pw.xpacks.connectors.sharepoint.read + # url: $SHAREPOINT_URL + # tenant: $SHAREPOINT_TENANT + # client_id: $SHAREPOINT_CLIENT_ID + # cert_path: sharepointcert.pem + # thumbprint: $SHAREPOINT_THUMBPRINT + # root_path: $SHAREPOINT_ROOT + # with_metadata: true + # refresh_interval: 30 + + # - !pw.io.gdrive.read + # object_id: $DRIVE_ID + # service_user_credentials_file: gdrive_indexer.json + # name_pattern: + # - "*.pdf" + # - "*.pptx" + # object_size_limit: null + # with_metadata: true + # refresh_interval: 30 + +$llm: !pw.xpacks.llm.llms.OpenAIChat + model: "gpt-3.5-turbo" + retry_strategy: !pw.udfs.ExponentialBackoffRetryStrategy + max_retries: 6 + cache_strategy: !pw.udfs.DiskCache + temperature: 0.05 + capacity: 8 + +$embedder: !pw.xpacks.llm.embedders.OpenAIEmbedder + model: "text-embedding-ada-002" + cache_strategy: !pw.udfs.DiskCache + +$splitter: !pw.xpacks.llm.splitters.TokenCountSplitter + max_tokens: 400 + +$parser: !pw.xpacks.llm.parsers.ParseUnstructured + +$retriever_factory: !pw.stdlib.indexing.BruteForceKnnFactory + reserved_space: 1000 + embedder: $embedder + metric: !pw.internals.yaml_loader.import_object + path: pw.stdlib.indexing.BruteForceKnnMetricKind.COS + dimensions: 1536 + + +$document_store: !pw.xpacks.llm.document_store.DocumentStore + docs: $sources + parser: $parser + splitter: $splitter + retriever_factory: $retriever_factory + +question_answerer: !pw.xpacks.llm.question_answering.AdaptiveRAGQuestionAnswerer + llm: $llm + indexer: $document_store + n_starting_documents: 2 + factor: 2 + max_iterations: 4 + strict_prompt: true + + +# Change host and port by uncommenting these files +# host: "0.0.0.0" +# port: 8000 + +# with_cache: true +# terminate_on_error: false diff --git a/examples/pipelines/adaptive-rag/requirements.txt b/examples/pipelines/adaptive-rag/requirements.txt new file mode 100644 index 0000000..3324f44 --- /dev/null +++ b/examples/pipelines/adaptive-rag/requirements.txt @@ -0,0 +1 @@ +python-dotenv==1.0.1 diff --git a/examples/pipelines/demo-document-indexing/.env b/examples/pipelines/demo-document-indexing/.env deleted file mode 100644 index 69269f2..0000000 --- a/examples/pipelines/demo-document-indexing/.env +++ /dev/null @@ -1 +0,0 @@ -OPENAI_API_KEY=sk-******* diff --git a/examples/pipelines/demo-document-indexing/Dockerfile b/examples/pipelines/demo-document-indexing/Dockerfile index b5b23a9..2e2ca8a 100644 --- a/examples/pipelines/demo-document-indexing/Dockerfile +++ b/examples/pipelines/demo-document-indexing/Dockerfile @@ -3,15 +3,14 @@ FROM pathwaycom/pathway:latest WORKDIR /app RUN apt-get update \ - && apt-get install -y python3-opencv \ + && apt-get install -y python3-opencv tesseract-ocr-eng \ && rm -rf /var/lib/apt/lists/* /var/cache/apt/archives/* COPY requirements.txt . - -RUN pip install --pre -U --no-cache-dir -r requirements.txt +RUN pip install -U --no-cache-dir -r requirements.txt COPY . . EXPOSE 8000 -CMD ["python", "main.py"] +CMD ["python", "app.py"] diff --git a/examples/pipelines/demo-document-indexing/app.py b/examples/pipelines/demo-document-indexing/app.py new file mode 100644 index 0000000..cee2cfb --- /dev/null +++ b/examples/pipelines/demo-document-indexing/app.py @@ -0,0 +1,44 @@ +import logging + +import pathway as pw +from dotenv import load_dotenv +from pathway.xpacks.llm.document_store import DocumentStore +from pathway.xpacks.llm.servers import DocumentStoreServer +from pydantic import BaseModel, ConfigDict, InstanceOf + +# To use advanced features with Pathway Scale, get your free license key from +# https://pathway.com/features and paste it below. +# To use Pathway Community, comment out the line below. +pw.set_license_key("demo-license-key-with-telemetry") + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s %(name)s %(levelname)s %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", +) + +load_dotenv() + + +class App(BaseModel): + document_store: InstanceOf[DocumentStore] + host: str = "0.0.0.0" + port: int = 8000 + + with_cache: bool = True + terminate_on_error: bool = False + + def run(self) -> None: + server = DocumentStoreServer(self.host, self.port, self.document_store) + server.run( + with_cache=self.with_cache, terminate_on_error=self.terminate_on_error + ) + + model_config = ConfigDict(extra="forbid") + + +if __name__ == "__main__": + with open("app.yaml") as f: + config = pw.load_yaml(f) + app = App(**config) + app.run() diff --git a/examples/pipelines/demo-document-indexing/app.yaml b/examples/pipelines/demo-document-indexing/app.yaml new file mode 100644 index 0000000..4f0f696 --- /dev/null +++ b/examples/pipelines/demo-document-indexing/app.yaml @@ -0,0 +1,67 @@ +$sources: + - !pw.io.fs.read + path: files-for-indexing + format: binary + with_metadata: true + + # - !pw.xpacks.connectors.sharepoint.read + # url: $SHAREPOINT_URL + # tenant: $SHAREPOINT_TENANT + # client_id: $SHAREPOINT_CLIENT_ID + # cert_path: sharepointcert.pem + # thumbprint: $SHAREPOINT_THUMBPRINT + # root_path: $SHAREPOINT_ROOT + # with_metadata: true + # refresh_interval: 30 + + # - !pw.io.gdrive.read + # object_id: $DRIVE_ID + # service_user_credentials_file: gdrive_indexer.json + # name_pattern: + # - "*.pdf" + # - "*.pptx" + # object_size_limit: null + # with_metadata: true + # refresh_interval: 30 + +$llm: !pw.xpacks.llm.llms.OpenAIChat + model: "gpt-3.5-turbo" + retry_strategy: !pw.udfs.ExponentialBackoffRetryStrategy + max_retries: 6 + cache_strategy: !pw.udfs.DiskCache + temperature: 0.05 + capacity: 8 + +$embedding_model: "mixedbread-ai/mxbai-embed-large-v1" + +$embedder: !pw.xpacks.llm.embedders.SentenceTransformerEmbedder + model: $embedding_model + call_kwargs: + show_progress_bar: False + +$splitter: !pw.xpacks.llm.splitters.TokenCountSplitter + max_tokens: 400 + +$parser: !pw.xpacks.llm.parsers.ParseUnstructured + +$retriever_factory: !pw.stdlib.indexing.BruteForceKnnFactory + reserved_space: 1000 + embedder: $embedder + metric: !pw.internals.yaml_loader.import_object + path: pw.stdlib.indexing.BruteForceKnnMetricKind.COS + dimensions: 1536 + + +document_store: !pw.xpacks.llm.document_store.DocumentStore + docs: $sources + parser: $parser + splitter: $splitter + retriever_factory: $retriever_factory + + +# Change host and port by uncommenting these files +# host: "0.0.0.0" +# port: 8000 + +# with_cache: true +# terminate_on_error: false diff --git a/examples/pipelines/demo-document-indexing/docker-compose.yml b/examples/pipelines/demo-document-indexing/docker-compose.yml index b0bfd32..3a44dca 100644 --- a/examples/pipelines/demo-document-indexing/docker-compose.yml +++ b/examples/pipelines/demo-document-indexing/docker-compose.yml @@ -4,7 +4,7 @@ services: build: context: . ports: - - "8001:8000" + - "8000:8000" environment: OPENAI_API_KEY: "${OPENAI_API_KEY}" volumes: diff --git a/examples/pipelines/demo-document-indexing/main.py b/examples/pipelines/demo-document-indexing/main.py deleted file mode 100644 index 9b33eb9..0000000 --- a/examples/pipelines/demo-document-indexing/main.py +++ /dev/null @@ -1,108 +0,0 @@ -import argparse -import os -import sys - -import pathway as pw -import pathway.io.fs as io_fs -import pathway.io.gdrive as io_gdrive -import yaml -from dotenv import load_dotenv -from pathway.xpacks.llm import embedders, parsers, splitters, vector_store - -# To use advanced features with Pathway Scale, get your free license key from -# https://pathway.com/features and paste it below. -# To use Pathway Community, comment out the line below. -pw.set_license_key("demo-license-key-with-telemetry") - - -def data_sources(source_configs): - sources = [] - for source_config in source_configs: - if source_config["kind"] == "local": - source = io_fs.read( - **source_config["config"], - format="binary", - with_metadata=True, - ) - sources.append(source) - elif source_config["kind"] == "gdrive": - source = io_gdrive.read( - **source_config["config"], - with_metadata=True, - ) - sources.append(source) - elif source_config["kind"] == "sharepoint": - try: - import pathway.xpacks.connectors.sharepoint as io_sp - - source = io_sp.read(**source_config["config"], with_metadata=True) - sources.append(source) - except ImportError: - print( - "The Pathway Sharepoint connector is part of the commercial offering, " - "please contact us for a commercial license." - ) - sys.exit(1) - - return sources - - -if __name__ == "__main__": - load_dotenv() - argparser = argparse.ArgumentParser( - prog="Pathway realtime document indexing demo", - description=""" - This is the demo of real-time indexing of the documents from various data sources. - It runs a simple web server that is capable of answering queries on the endpoints - /v1/retrieve, /v1/statistics, /v1/inputs. Please refer to the "Test the REST endpoint" - section at Hosted Pipelines website: https://cloud.pathway.com. - - Currently, it supports several data sources: the local one, Google Drive, and, - in a commercial offering, Microsoft SharePoint. - - For the demo, you need to store your Open AI key in the OPENAI_API_KEY environment variable, - the easiest way is to add it to the .env file. - """, - ) - argparser.add_argument( - "--host", - help="Host that will be used for running the web server.", - default="0.0.0.0", - ) - argparser.add_argument( - "--port", - help="Port that will be used by the web server.", - type=int, - default=8000, - ) - argparser.add_argument( - "--sources-config", - help="Path to sources configuration file", - default="sources_configuration.yaml", - ) - args = argparser.parse_args() - with open(args.sources_config) as config_f: - configuration = yaml.safe_load(config_f) - - openai_api_key = os.environ.get("OPENAI_API_KEY") - if openai_api_key is None: - print( - "Please set OPENAI_API_KEY either as a configuration variable, or in the .env file" - ) - sys.exit(1) - - splitter = splitters.TokenCountSplitter(max_tokens=200) - embedder = embedders.OpenAIEmbedder(api_key=openai_api_key) - parser = parsers.ParseUnstructured() - vs_server = vector_store.VectorStoreServer( - *data_sources(configuration["sources"]), - embedder=embedder, - splitter=splitter, - parser=parser, - ) - vs_server.run_server( - host=args.host, - port=args.port, - threaded=False, - with_cache=True, - ) diff --git a/examples/pipelines/demo-document-indexing/requirements.txt b/examples/pipelines/demo-document-indexing/requirements.txt index fb30529..3324f44 100644 --- a/examples/pipelines/demo-document-indexing/requirements.txt +++ b/examples/pipelines/demo-document-indexing/requirements.txt @@ -1,2 +1 @@ python-dotenv==1.0.1 -mpmath==1.3.0 diff --git a/examples/pipelines/demo-document-indexing/sources_configuration.yaml b/examples/pipelines/demo-document-indexing/sources_configuration.yaml deleted file mode 100644 index 9ad5742..0000000 --- a/examples/pipelines/demo-document-indexing/sources_configuration.yaml +++ /dev/null @@ -1,30 +0,0 @@ -sources: - - local_files: - kind: local - config: - # Please refer to - # https://pathway.com/developers/api-docs/pathway-io/fs#pathway.io.fs.read - # for options definition - path: "files-for-indexing/" - # - google_drive_folder: - # kind: gdrive - # config: - # # Please refer to - # # https://pathway.com/developers/api-docs/pathway-io/gdrive#pathway.io.gdrive.read - # # for options definition - # # Please follow https://pathway.com/developers/user-guide/connectors/gdrive-connector/#setting-up-google-drive - # # for instructions on getting credentials - # object_id: "1cULDv2OaViJBmOfG5WB0oWcgayNrGtVs" # folder used in the managed demo - # service_user_credentials_file: SERVICE_CREDENTIALS - # refresh_interval: 5 - # - sharepoint_folder: - # kind: sharepoint - # config: - # # The sharepoint is part of our commercial offering, please contact us to use it - # root_path: ROOT_PATH - # url: SHAREPOINT_URL - # tenant: SHAREPOINT_TENANT - # client_id: SHAREPOINT_CLIENT_ID - # cert_path: SHAREPOINT.pem - # thumbprint: SHAREPOINT_THUMBPRINT - # refresh_interval: 5 diff --git a/examples/pipelines/demo-question-answering/Dockerfile b/examples/pipelines/demo-question-answering/Dockerfile index a168de5..2e2ca8a 100644 --- a/examples/pipelines/demo-question-answering/Dockerfile +++ b/examples/pipelines/demo-question-answering/Dockerfile @@ -3,7 +3,7 @@ FROM pathwaycom/pathway:latest WORKDIR /app RUN apt-get update \ - && apt-get install -y python3-opencv \ + && apt-get install -y python3-opencv tesseract-ocr-eng \ && rm -rf /var/lib/apt/lists/* /var/cache/apt/archives/* COPY requirements.txt . diff --git a/examples/pipelines/demo-question-answering/README.md b/examples/pipelines/demo-question-answering/README.md index 21b65a4..e8a8637 100644 --- a/examples/pipelines/demo-question-answering/README.md +++ b/examples/pipelines/demo-question-answering/README.md @@ -281,7 +281,7 @@ curl -X 'POST' \ -H 'accept: */*' \ -H 'Content-Type: application/json' \ -d '{ - "query": "string", + "query": "What is the start date of the contract?", "k": 2 }' ``` diff --git a/examples/pipelines/demo-question-answering/app.py b/examples/pipelines/demo-question-answering/app.py index 12108ac..68f224c 100644 --- a/examples/pipelines/demo-question-answering/app.py +++ b/examples/pipelines/demo-question-answering/app.py @@ -2,11 +2,9 @@ import pathway as pw from dotenv import load_dotenv -from pathway.xpacks import llm -from pathway.xpacks.llm.question_answering import BaseRAGQuestionAnswerer -from pathway.xpacks.llm.vector_store import VectorStoreServer +from pathway.xpacks.llm.question_answering import SummaryQuestionAnswerer +from pathway.xpacks.llm.servers import QASummaryRestServer from pydantic import BaseModel, ConfigDict, InstanceOf -from typing_extensions import TypedDict # To use advanced features with Pathway Scale, get your free license key from # https://pathway.com/features and paste it below. @@ -21,41 +19,26 @@ load_dotenv() -host_config = TypedDict("host_config", {"host": str, "port": int}) - class App(BaseModel): - llm: InstanceOf[pw.UDF] - embedder: InstanceOf[llm.embedders.BaseEmbedder] - splitter: InstanceOf[pw.UDF] - parser: InstanceOf[pw.UDF] - - sources: list[InstanceOf[pw.Table]] - - host_config: host_config + question_answerer: InstanceOf[SummaryQuestionAnswerer] + host: str = "0.0.0.0" + port: int = 8000 - def run(self, config_file: str = "config.yaml") -> None: - # Unpack host and port from config - host, port = self.host_config["host"], self.host_config["port"] + with_cache: bool = True + terminate_on_error: bool = False - doc_store = VectorStoreServer( - *self.sources, - embedder=self.embedder, - splitter=self.splitter, - parser=self.parser, + def run(self) -> None: + server = QASummaryRestServer(self.host, self.port, self.question_answerer) + server.run( + with_cache=self.with_cache, terminate_on_error=self.terminate_on_error ) - rag_app = BaseRAGQuestionAnswerer(llm=self.llm, indexer=doc_store) - - rag_app.build_server(host=host, port=port) - - rag_app.run_server(with_cache=True, terminate_on_error=False) - model_config = ConfigDict(extra="forbid") if __name__ == "__main__": - with open("config.yaml") as f: + with open("app.yaml") as f: config = pw.load_yaml(f) app = App(**config) app.run() diff --git a/examples/pipelines/demo-question-answering/app.yaml b/examples/pipelines/demo-question-answering/app.yaml new file mode 100644 index 0000000..09458da --- /dev/null +++ b/examples/pipelines/demo-question-answering/app.yaml @@ -0,0 +1,68 @@ +$sources: + - !pw.io.fs.read + path: data + format: binary + with_metadata: true + + # - !pw.xpacks.connectors.sharepoint.read + # url: $SHAREPOINT_URL + # tenant: $SHAREPOINT_TENANT + # client_id: $SHAREPOINT_CLIENT_ID + # cert_path: sharepointcert.pem + # thumbprint: $SHAREPOINT_THUMBPRINT + # root_path: $SHAREPOINT_ROOT + # with_metadata: true + # refresh_interval: 30 + + # - !pw.io.gdrive.read + # object_id: $DRIVE_ID + # service_user_credentials_file: gdrive_indexer.json + # name_pattern: + # - "*.pdf" + # - "*.pptx" + # object_size_limit: null + # with_metadata: true + # refresh_interval: 30 + +$llm: !pw.xpacks.llm.llms.OpenAIChat + model: "gpt-3.5-turbo" + retry_strategy: !pw.udfs.ExponentialBackoffRetryStrategy + max_retries: 6 + cache_strategy: !pw.udfs.DiskCache + temperature: 0.05 + capacity: 8 + +$embedder: !pw.xpacks.llm.embedders.OpenAIEmbedder + model: "text-embedding-ada-002" + cache_strategy: !pw.udfs.DiskCache + +$splitter: !pw.xpacks.llm.splitters.TokenCountSplitter + max_tokens: 400 + +$parser: !pw.xpacks.llm.parsers.ParseUnstructured + +$retriever_factory: !pw.stdlib.indexing.BruteForceKnnFactory + reserved_space: 1000 + embedder: $embedder + metric: !pw.internals.yaml_loader.import_object + path: pw.stdlib.indexing.BruteForceKnnMetricKind.COS + dimensions: 1536 + + +$document_store: !pw.xpacks.llm.document_store.DocumentStore + docs: $sources + parser: $parser + splitter: $splitter + retriever_factory: $retriever_factory + +question_answerer: !pw.xpacks.llm.question_answering.BaseRAGQuestionAnswerer + llm: $llm + indexer: $document_store + + +# Change host and port by uncommenting these files +# host: "0.0.0.0" +# port: 8000 + +# with_cache: true +# terminate_on_error: false diff --git a/examples/pipelines/gpt_4o_multimodal_rag/README.md b/examples/pipelines/gpt_4o_multimodal_rag/README.md index 31f04f3..2061fcb 100644 --- a/examples/pipelines/gpt_4o_multimodal_rag/README.md +++ b/examples/pipelines/gpt_4o_multimodal_rag/README.md @@ -162,17 +162,16 @@ curl -X 'POST' 'http://0.0.0.0:8000/v1/pw_ai_answer' -H 'accept: */*' -H ' Looking good! -## Modifying the code +## Modifying the pipeline -In the main function of `app.py`, we define: -- input folders +This template is easily configurable in the `app.yaml` file. In there you can define: +- input sources - LLM - embedder - index - host and port to run the app -- run options (caching, cache folder) -You can modify any of the components by checking the options from the imported modules: `from pathway.xpacks.llm import embedders, llms, parsers, splitters`. +You can modify any of the components by checking the options from the [Pathway LLM xpack](https://pathway.com/developers/api-docs/pathway-xpacks-llm). It is also possible to easily create new components by extending the [`pw.UDF`](https://pathway.com/developers/user-guide/data-transformation/user-defined-functions) class and implementing the `__wrapped__` function. diff --git a/examples/pipelines/gpt_4o_multimodal_rag/app.py b/examples/pipelines/gpt_4o_multimodal_rag/app.py index 6731d40..623c38a 100644 --- a/examples/pipelines/gpt_4o_multimodal_rag/app.py +++ b/examples/pipelines/gpt_4o_multimodal_rag/app.py @@ -10,113 +10,45 @@ os.environ["TESSDATA_PREFIX"] = tesseract_dir # fix for tesseract ocr break -import sys - -import click import pathway as pw -import pathway.io.fs as io_fs -import pathway.io.gdrive as io_gdrive -import yaml from dotenv import load_dotenv -from pathway.udfs import DiskCache, ExponentialBackoffRetryStrategy -from pathway.xpacks.llm import embedders, llms, prompts # , parsers, splitters -from pathway.xpacks.llm.parsers import OpenParse -from pathway.xpacks.llm.question_answering import BaseRAGQuestionAnswerer -from pathway.xpacks.llm.vector_store import VectorStoreServer +from pathway.xpacks.llm.question_answering import SummaryQuestionAnswerer +from pathway.xpacks.llm.servers import QASummaryRestServer +from pydantic import BaseModel, ConfigDict, InstanceOf # To use advanced features with Pathway Scale, get your free license key from # https://pathway.com/features and paste it below. # To use Pathway Community, comment out the line below. pw.set_license_key("demo-license-key-with-telemetry") -load_dotenv() - logging.basicConfig( level=logging.INFO, format="%(asctime)s %(name)s %(levelname)s %(message)s", datefmt="%Y-%m-%d %H:%M:%S", ) +load_dotenv() -def data_sources(source_configs) -> list[pw.Table]: - sources = [] - for source_config in source_configs: - if source_config["kind"] == "local": - source = pw.io.fs.read( - **source_config["config"], - format="binary", - with_metadata=True, - ) - sources.append(source) - elif source_config["kind"] == "gdrive": - source = pw.io.gdrive.read( - **source_config["config"], - with_metadata=True, - ) - sources.append(source) - elif source_config["kind"] == "sharepoint": - try: - import pathway.xpacks.connectors.sharepoint as io_sp - - source = io_sp.read(**source_config["config"], with_metadata=True) - sources.append(source) - except ImportError: - print( - "The Pathway Sharepoint connector is part of the commercial offering, " - "please contact us for a commercial license." - ) - sys.exit(1) - - return sources +class App(BaseModel): + question_answerer: InstanceOf[SummaryQuestionAnswerer] + host: str = "0.0.0.0" + port: int = 8000 -@click.command() -@click.option("--config_file", default="config.yaml", help="Config file to be used.") -def run( - config_file: str = "config.yaml", -): - with open(config_file) as config_f: - configuration = yaml.safe_load(config_f) + with_cache: bool = True + terminate_on_error: bool = False - openai_api_key = os.environ.get("OPENAI_API_KEY") - if openai_api_key is None: - print( - "Please set OPENAI_API_KEY either as a configuration variable, or in the .env file" + def run(self) -> None: + server = QASummaryRestServer(self.host, self.port, self.question_answerer) + server.run( + with_cache=self.with_cache, terminate_on_error=self.terminate_on_error ) - sys.exit(1) - - sources = data_sources(configuration["sources"]) - - chat = llms.OpenAIChat( - model="gpt-4o", - retry_strategy=ExponentialBackoffRetryStrategy(max_retries=6), - cache_strategy=DiskCache(), - temperature=0.0, - ) - - parser = OpenParse() - embedder = embedders.OpenAIEmbedder(cache_strategy=DiskCache()) - - doc_store = VectorStoreServer( - *sources, - embedder=embedder, - splitter=None, # OpenParse parser handles the chunking - parser=parser, - ) - - app = BaseRAGQuestionAnswerer( - llm=chat, - indexer=doc_store, - search_topk=6, - short_prompt_template=prompts.prompt_qa, - ) - - host_config = configuration["host_config"] - host, port = host_config["host"], host_config["port"] - app.build_server(host=host, port=port) - app.run_server(with_cache=True, terminate_on_error=False) + model_config = ConfigDict(extra="forbid") if __name__ == "__main__": - run() + with open("app.yaml") as f: + config = pw.load_yaml(f) + app = App(**config) + app.run() diff --git a/examples/pipelines/demo-question-answering/config.yaml b/examples/pipelines/gpt_4o_multimodal_rag/app.yaml similarity index 54% rename from examples/pipelines/demo-question-answering/config.yaml rename to examples/pipelines/gpt_4o_multimodal_rag/app.yaml index ddc010d..42bed4a 100644 --- a/examples/pipelines/demo-question-answering/config.yaml +++ b/examples/pipelines/gpt_4o_multimodal_rag/app.yaml @@ -1,25 +1,4 @@ -llm: !pw.xpacks.llm.llms.OpenAIChat - model: "gpt-3.5-turbo" - retry_strategy: !pw.udfs.ExponentialBackoffRetryStrategy - max_retries: 6 - cache_strategy: !pw.udfs.DiskCache - temperature: 0.05 - capacity: 8 - -embedder: !pw.xpacks.llm.embedders.OpenAIEmbedder - model: "text-embedding-ada-002" - cache_strategy: !pw.udfs.DiskCache - -host_config: - host: "0.0.0.0" - port: 16003 - -splitter: !pw.xpacks.llm.splitters.TokenCountSplitter - max_tokens: 400 - -parser: !pw.xpacks.llm.parsers.ParseUnstructured - -sources: +$sources: - !pw.io.fs.read path: data format: binary @@ -44,3 +23,42 @@ sources: # object_size_limit: null # with_metadata: true # refresh_interval: 30 + +$llm: !pw.xpacks.llm.llms.OpenAIChat + model: "gpt-3.5-turbo" + retry_strategy: !pw.udfs.ExponentialBackoffRetryStrategy + max_retries: 6 + cache_strategy: !pw.udfs.DiskCache + temperature: 0.05 + capacity: 8 + +$embedder: !pw.xpacks.llm.embedders.OpenAIEmbedder + model: "text-embedding-ada-002" + cache_strategy: !pw.udfs.DiskCache + +$parser: !pw.xpacks.llm.parsers.OpenParse + cache_strategy: !pw.udfs.DiskCache + +$retriever_factory: !pw.stdlib.indexing.BruteForceKnnFactory + reserved_space: 1000 + embedder: $embedder + metric: !pw.engine.BruteForceKnnMetricKind.COS + dimensions: 1536 + + +$document_store: !pw.xpacks.llm.document_store.DocumentStore + docs: $sources + parser: $parser + retriever_factory: $retriever_factory + +question_answerer: !pw.xpacks.llm.question_answering.BaseRAGQuestionAnswerer + llm: $llm + indexer: $document_store + + +# Change host and port by uncommenting these files +# host: "0.0.0.0" +# port: 8000 + +# with_cache: true +# terminate_on_error: false diff --git a/examples/pipelines/gpt_4o_multimodal_rag/config.yaml b/examples/pipelines/gpt_4o_multimodal_rag/config.yaml deleted file mode 100644 index 31b5482..0000000 --- a/examples/pipelines/gpt_4o_multimodal_rag/config.yaml +++ /dev/null @@ -1,34 +0,0 @@ -host_config: - host: "0.0.0.0" - port: 8000 -sources: - - local_files: - kind: local - config: - # Please refer to - # https://pathway.com/developers/api-docs/pathway-io/fs#pathway.io.fs.read - # for options definition - path: "data/" - # - google_drive_folder: - # kind: gdrive - # config: - # # Please refer to - # # https://pathway.com/developers/api-docs/pathway-io/gdrive#pathway.io.gdrive.read - # # for options definition - # # Please follow https://pathway.com/developers/user-guide/connectors/gdrive-connector/#setting-up-google-drive - # # for instructions on getting credentials - # object_id: "1fUU13kooK2qFnGOO0FAO0a0RWmtZJhX-" # folder used in the managed demo - # service_user_credentials_file: sources/gdrive-indexer.json - # refresh_interval: 5 - # - sharepoint_folder: - # kind: sharepoint - # config: - # # The sharepoint is part of our commercial offering, please contact us to use it - # root_path: ROOT_PATH - # url: SHAREPOINT_URL - # tenant: SHAREPOINT_TENANT - # client_id: SHAREPOINT_CLIENT_ID - # cert_path: SHAREPOINT.pem - # thumbprint: SHAREPOINT_THUMBPRINT - # refresh_interval: 5 - diff --git a/examples/pipelines/private-rag/Dockerfile b/examples/pipelines/private-rag/Dockerfile index 8ff0ad7..2e2ca8a 100644 --- a/examples/pipelines/private-rag/Dockerfile +++ b/examples/pipelines/private-rag/Dockerfile @@ -3,9 +3,12 @@ FROM pathwaycom/pathway:latest WORKDIR /app RUN apt-get update \ - && apt-get install -y python3-opencv \ + && apt-get install -y python3-opencv tesseract-ocr-eng \ && rm -rf /var/lib/apt/lists/* /var/cache/apt/archives/* +COPY requirements.txt . +RUN pip install -U --no-cache-dir -r requirements.txt + COPY . . EXPOSE 8000 diff --git a/examples/pipelines/private-rag/app.py b/examples/pipelines/private-rag/app.py index 36b84a7..68f224c 100644 --- a/examples/pipelines/private-rag/app.py +++ b/examples/pipelines/private-rag/app.py @@ -1,79 +1,44 @@ import logging -import os import pathway as pw from dotenv import load_dotenv -from pathway.xpacks.llm import embedders, llms, parsers, splitters -from pathway.xpacks.llm.question_answering import AdaptiveRAGQuestionAnswerer -from pathway.xpacks.llm.vector_store import VectorStoreServer +from pathway.xpacks.llm.question_answering import SummaryQuestionAnswerer +from pathway.xpacks.llm.servers import QASummaryRestServer +from pydantic import BaseModel, ConfigDict, InstanceOf # To use advanced features with Pathway Scale, get your free license key from # https://pathway.com/features and paste it below. # To use Pathway Community, comment out the line below. pw.set_license_key("demo-license-key-with-telemetry") -load_dotenv() - logging.basicConfig( level=logging.INFO, format="%(asctime)s %(name)s %(levelname)s %(message)s", datefmt="%Y-%m-%d %H:%M:%S", ) +load_dotenv() -if __name__ == "__main__": - path = "./data" - - my_folder = pw.io.fs.read( - path=path, - format="binary", - with_metadata=True, - ) - - sources = [ - my_folder - ] # define the inputs (local folders, google drive, sharepoint, ...) - - DEFAULT_MODEL = "ollama/mistral" - api_base = os.environ.get("LLM_API_BASE", "http://localhost:11434") - - chat = llms.LiteLLMChat( - model=DEFAULT_MODEL, - temperature=0, - top_p=1, - api_base=api_base, # local deployment - format="json", # only available in Ollama local deploy, not usable in Mistral API - ) - - app_host = "0.0.0.0" - app_port = 8000 - - parser = parsers.ParseUnstructured() - text_splitter = splitters.TokenCountSplitter(max_tokens=400) - embedding_model = "avsolatorio/GIST-small-Embedding-v0" +class App(BaseModel): + question_answerer: InstanceOf[SummaryQuestionAnswerer] + host: str = "0.0.0.0" + port: int = 8000 - embedder = embedders.SentenceTransformerEmbedder( - embedding_model, call_kwargs={"show_progress_bar": False} - ) + with_cache: bool = True + terminate_on_error: bool = False - vector_server = VectorStoreServer( - *sources, - embedder=embedder, - splitter=text_splitter, - parser=parser, - ) + def run(self) -> None: + server = QASummaryRestServer(self.host, self.port, self.question_answerer) + server.run( + with_cache=self.with_cache, terminate_on_error=self.terminate_on_error + ) - app = AdaptiveRAGQuestionAnswerer( - llm=chat, - indexer=vector_server, - default_llm_name=DEFAULT_MODEL, - n_starting_documents=2, - factor=2, - max_iterations=4, - strict_prompt=True, - ) + model_config = ConfigDict(extra="forbid") - app.build_server(host=app_host, port=app_port) - app.run_server(with_cache=True) +if __name__ == "__main__": + with open("app.yaml") as f: + config = pw.load_yaml(f) + app = App(**config) + app.run() diff --git a/examples/pipelines/private-rag/app.yaml b/examples/pipelines/private-rag/app.yaml new file mode 100644 index 0000000..f6aa160 --- /dev/null +++ b/examples/pipelines/private-rag/app.yaml @@ -0,0 +1,78 @@ +$sources: + - !pw.io.fs.read + path: data + format: binary + with_metadata: true + + # - !pw.xpacks.connectors.sharepoint.read + # url: $SHAREPOINT_URL + # tenant: $SHAREPOINT_TENANT + # client_id: $SHAREPOINT_CLIENT_ID + # cert_path: sharepointcert.pem + # thumbprint: $SHAREPOINT_THUMBPRINT + # root_path: $SHAREPOINT_ROOT + # with_metadata: true + # refresh_interval: 30 + + # - !pw.io.gdrive.read + # object_id: $DRIVE_ID + # service_user_credentials_file: gdrive_indexer.json + # name_pattern: + # - "*.pdf" + # - "*.pptx" + # object_size_limit: null + # with_metadata: true + # refresh_interval: 30 + +$llm_model: "ollama/mistral" + +$llm: !pw.xpacks.llm.llms.LiteLLMChat + model: $llm_model + retry_strategy: !pw.udfs.ExponentialBackoffRetryStrategy + max_retries: 6 + cache_strategy: !pw.udfs.DiskCache + temperature: 0 + top_p: 1 + format: "json" # only available in Ollama local deploy, not usable in Mistral API + api_base: "http://localhost:11434" + +$embedding_model: "avsolatorio/GIST-small-Embedding-v0" + +$embedder: !pw.xpacks.llm.embedders.SentenceTransformerEmbedder + model: $embedding_model + call_kwargs: + show_progress_bar: False + +$splitter: !pw.xpacks.llm.splitters.TokenCountSplitter + max_tokens: 400 + +$parser: !pw.xpacks.llm.parsers.ParseUnstructured + +$retriever_factory: !pw.stdlib.indexing.BruteForceKnnFactory + reserved_space: 1000 + embedder: $embedder + metric: !pw.engine.BruteForceKnnMetricKind.COS + dimensions: 1536 + + +$document_store: !pw.xpacks.llm.document_store.DocumentStore + docs: $sources + parser: $parser + splitter: $splitter + retriever_factory: $retriever_factory + +question_answerer: !pw.xpacks.llm.question_answering.AdaptiveRAGQuestionAnswerer + llm: $llm + indexer: $document_store + n_starting_documents: 2 + factor: 2 + max_iterations: 4 + strict_prompt: true + + +# Change host and port by uncommenting these files +# host: "0.0.0.0" +# port: 8000 + +# with_cache: true +# terminate_on_error: false diff --git a/examples/pipelines/private-rag/requirements.txt b/examples/pipelines/private-rag/requirements.txt new file mode 100644 index 0000000..fb30529 --- /dev/null +++ b/examples/pipelines/private-rag/requirements.txt @@ -0,0 +1,2 @@ +python-dotenv==1.0.1 +mpmath==1.3.0 diff --git a/pyproject.toml b/pyproject.toml index 7462c50..428d5fc 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -81,7 +81,7 @@ profile = "black" [tool.mypy] python_version = "3.10" -exclude = ["examples/data", "examples/pipelines/adaptive-rag", "examples/pipelines/private-rag"] +exclude = ["examples/data", "examples/pipelines/adaptive-rag", "examples/pipelines/private-rag", "examples/pipelines/demo-document-indexing"] ignore_missing_imports = true check_untyped_defs = true warn_redundant_casts = true