3
3
import logging
4
4
from asyncio import run
5
5
from threading import Thread
6
+ import threading
6
7
7
8
from fastapi import HTTPException , status
9
+ from langchain_core .documents import Document
8
10
9
11
from admin_api_lib .api_endpoints .confluence_loader import ConfluenceLoader
10
12
from admin_api_lib .api_endpoints .document_deleter import DocumentDeleter
@@ -100,10 +102,16 @@ async def aload_from_confluence(self) -> None:
100
102
HTTPException
101
103
If the Confluence loader is not configured or if a load is already in progress.
102
104
"""
103
- if not (self ._settings .url .strip () and self ._settings .space_key .strip () and self ._settings .token .strip ()):
104
- raise HTTPException (
105
- status .HTTP_501_NOT_IMPLEMENTED , "The confluence loader is not configured! Required fields are missing."
106
- )
105
+ for index in range (len (self ._settings .url )):
106
+ if not (
107
+ self ._settings .url [index ].strip ()
108
+ and self ._settings .space_key [index ].strip ()
109
+ and self ._settings .token [index ].strip ()
110
+ ):
111
+ raise HTTPException (
112
+ status .HTTP_501_NOT_IMPLEMENTED ,
113
+ "The confluence loader is not configured! Required fields are missing." ,
114
+ )
107
115
108
116
if self ._background_thread is not None and self ._background_thread .is_alive ():
109
117
raise HTTPException (
@@ -113,51 +121,69 @@ async def aload_from_confluence(self) -> None:
113
121
self ._background_thread .start ()
114
122
115
123
async def _aload_from_confluence (self ) -> None :
116
- params = self ._settings_mapper .map_settings_to_params (self ._settings )
117
- try :
118
- self ._key_value_store .upsert (self ._settings .document_name , Status .PROCESSING )
119
- information_pieces = self ._extractor_api .extract_from_confluence_post (params )
120
- documents = [self ._information_mapper .extractor_information_piece2document (x ) for x in information_pieces ]
121
- chunked_documents = self ._chunker .chunk (documents )
122
- rag_information_pieces = [
123
- self ._information_mapper .document2rag_information_piece (doc ) for doc in chunked_documents
124
- ]
125
- except Exception as e :
126
- self ._key_value_store .upsert (self ._settings .document_name , Status .ERROR )
127
- logger .error ("Error while loading from Confluence: %s" , str (e ))
128
- raise HTTPException (
129
- status .HTTP_500_INTERNAL_SERVER_ERROR , f"Error loading from Confluence: { str (e )} "
130
- ) from e
131
-
132
- await self ._delete_previous_information_pieces ()
133
- self ._key_value_store .upsert (self ._settings .document_name , Status .UPLOADING )
134
- self ._upload_information_pieces (rag_information_pieces )
135
-
136
- async def _delete_previous_information_pieces (self ):
124
+ async def process_confluence (index ):
125
+ logger .info ("Loading from Confluence %s" , self ._settings .url [index ])
126
+ self ._sanitize_document_name (index = index )
127
+
128
+ params = self ._settings_mapper .map_settings_to_params (self ._settings , index )
129
+ try :
130
+ self ._key_value_store .upsert (self ._settings .document_name [index ], Status .PROCESSING )
131
+ information_pieces = self ._extractor_api .extract_from_confluence_post (params )
132
+ documents = [
133
+ self ._information_mapper .extractor_information_piece2document (x ) for x in information_pieces
134
+ ]
135
+ documents = await self ._aenhance_langchain_documents (documents )
136
+ chunked_documents = self ._chunker .chunk (documents )
137
+ rag_information_pieces = [
138
+ self ._information_mapper .document2rag_information_piece (doc ) for doc in chunked_documents
139
+ ]
140
+ except Exception as e :
141
+ self ._key_value_store .upsert (self ._settings .document_name [index ], Status .ERROR )
142
+
143
+ logger .error ("Error while loading from Confluence: %s" , str (e ))
144
+ raise HTTPException (
145
+ status .HTTP_500_INTERNAL_SERVER_ERROR , f"Error loading from Confluence: { str (e )} "
146
+ ) from e
147
+
148
+ await self ._delete_previous_information_pieces (index = index )
149
+ self ._key_value_store .upsert (self ._settings .document_name [index ], Status .UPLOADING )
150
+ self ._upload_information_pieces (rag_information_pieces , index = index )
151
+
152
+ threads = []
153
+ for idx in range (len (self ._settings .url )):
154
+ t = threading .Thread (target = lambda idx = idx : run (process_confluence (idx )))
155
+ threads .append (t )
156
+ t .start ()
157
+ for t in threads :
158
+ t .join ()
159
+
160
+ async def _delete_previous_information_pieces (self , index = 0 ):
137
161
try :
138
- await self ._document_deleter .adelete_document (self ._settings .document_name )
162
+ await self ._document_deleter .adelete_document (self ._settings .document_name [ index ] )
139
163
except HTTPException as e :
140
164
logger .error (
141
165
(
142
166
"Error while trying to delete documents with id: %s before uploading %s."
143
167
"NOTE: Still continuing with upload."
144
168
),
145
- self ._settings .document_name ,
169
+ self ._settings .document_name [ index ] ,
146
170
e ,
147
171
)
148
172
149
- def _upload_information_pieces (self , rag_api_documents ):
173
+ def _upload_information_pieces (self , rag_api_documents , index = 0 ):
150
174
try :
151
175
self ._rag_api .upload_information_piece (rag_api_documents )
152
- self ._key_value_store .upsert (self ._settings .document_name , Status .READY )
176
+ self ._key_value_store .upsert (self ._settings .document_name [ index ] , Status .READY )
153
177
logger .info ("Confluence loaded successfully" )
154
178
except Exception as e :
155
- self ._key_value_store .upsert (self ._settings .document_name , Status .ERROR )
179
+ self ._key_value_store .upsert (self ._settings .document_name [ index ] , Status .ERROR )
156
180
logger .error ("Error while uploading Confluence to the database: %s" , str (e ))
157
181
raise HTTPException (500 , f"Error loading from Confluence: { str (e )} " ) from e
158
182
159
- def _sanitize_document_name (self ) -> None :
160
- document_name = self ._settings .document_name if self ._settings .document_name else self ._settings .url
183
+ def _sanitize_document_name (self , index ) -> None :
184
+ document_name = (
185
+ self ._settings .document_name [index ] if self ._settings .document_name [index ] else self ._settings .url [index ]
186
+ )
161
187
document_name = document_name .replace ("http://" , "" ).replace ("https://" , "" )
162
188
163
- self ._settings .document_name = sanitize_document_name (document_name )
189
+ self ._settings .document_name [ index ] = sanitize_document_name (document_name )
0 commit comments