1
1
import asyncio
2
- import re
3
2
from typing import AsyncIterator
4
3
5
4
import async_timeout
11
10
from langchain_core .language_models .chat_models import BaseChatModel
12
11
from pydantic import BaseModel
13
12
14
- from backend .constants import (
15
- DATA_POINT_FQN_METADATA_KEY ,
16
- DATA_POINT_HASH_METADATA_KEY ,
17
- DATA_POINT_SIGNED_URL_METADATA_KEY ,
18
- )
13
+ from backend .constants import DATA_POINT_FQN_METADATA_KEY , DATA_POINT_HASH_METADATA_KEY
19
14
from backend .logger import logger
20
15
from backend .modules .metadata_store .client import get_client
21
16
from backend .modules .model_gateway .model_gateway import model_gateway
22
17
from backend .modules .query_controllers .types import *
23
18
from backend .modules .vector_db .client import VECTOR_STORE_CLIENT
24
19
from backend .settings import settings
25
- from backend .utils import _get_read_signed_url
26
20
27
21
28
22
class BaseQueryController :
29
23
required_metadata = [
30
24
"_id" ,
31
25
DATA_POINT_FQN_METADATA_KEY ,
32
26
DATA_POINT_HASH_METADATA_KEY ,
33
- DATA_POINT_SIGNED_URL_METADATA_KEY ,
34
27
"_data_source_fqn" ,
35
28
"filename" ,
36
29
"collection_name" ,
@@ -162,33 +155,23 @@ async def _get_retriever(self, vector_store, retriever_name, retriever_config):
162
155
raise HTTPException (status_code = 404 , detail = "Retriever not found" )
163
156
return retriever
164
157
165
- def _enrich_context_for_stream_response (
166
- self , docs , artifact_repo_cache , signed_url_cache
167
- ):
158
+ def _enrich_context_for_stream_response (self , docs ):
168
159
"""
169
160
Enrich the context for the stream response
170
161
"""
171
- enriched_docs = []
172
-
173
- for doc in docs :
174
- # Enrich the original doc with signed URL
175
- enriched_doc = self ._enrich_metadata_with_signed_url (
176
- doc , artifact_repo_cache , signed_url_cache
177
- )
178
162
179
- # Create a new Document with only the required metadata
180
- enriched_docs .append (
181
- Document (
182
- page_content = enriched_doc .page_content ,
183
- metadata = {
184
- key : enriched_doc .metadata [key ]
185
- for key in self .required_metadata
186
- if key in enriched_doc .metadata
187
- },
188
- )
163
+ # Create a new Document with only the required metadata
164
+ return [
165
+ Document (
166
+ page_content = doc .page_content ,
167
+ metadata = {
168
+ key : doc .metadata [key ]
169
+ for key in self .required_metadata
170
+ if key in doc .metadata
171
+ },
189
172
)
190
-
191
- return enriched_docs
173
+ for doc in docs
174
+ ]
192
175
193
176
def _enrich_context_for_non_stream_response (self , outputs ):
194
177
"""
@@ -197,53 +180,7 @@ def _enrich_context_for_non_stream_response(self, outputs):
197
180
if "context" not in outputs :
198
181
return []
199
182
200
- # Cache to store the artifact repo paths for the current request
201
- artifact_repo_cache = {}
202
- # Cache to store the signed urls for the files for the current request
203
- signed_url_cache = {}
204
-
205
- return [
206
- self ._enrich_metadata_with_signed_url (
207
- doc , artifact_repo_cache , signed_url_cache
208
- )
209
- for doc in outputs ["context" ]
210
- ]
211
-
212
- def _enrich_metadata_with_signed_url (
213
- self , doc , artifact_repo_cache , signed_url_cache = None
214
- ):
215
- """
216
- Enrich the metadata with the signed url
217
- """
218
- fqn_with_source = doc .metadata .get (DATA_POINT_FQN_METADATA_KEY )
219
-
220
- # Return if FQN is not present or if it's already in the cache
221
- if not fqn_with_source or fqn_with_source in signed_url_cache :
222
- return doc
223
-
224
- # Use a single regex to extract both data-dir FQN and file path
225
- match = re .search (r"(data-dir:[^:]+).*?(files/.+)$" , fqn_with_source )
226
-
227
- # Return if the regex does not match
228
- if not match :
229
- return doc
230
-
231
- # Extract the data-dir FQN and the file path from the FQN with source
232
- data_dir_fqn , file_path = match .groups ()
233
-
234
- # Generate a signed url for the file
235
- signed_url = _get_read_signed_url (
236
- fqn = data_dir_fqn ,
237
- file_path = file_path ,
238
- cache = artifact_repo_cache ,
239
- )
240
-
241
- # Add the signed url to the metadata if it's not None
242
- if signed_url :
243
- doc .metadata [DATA_POINT_SIGNED_URL_METADATA_KEY ] = signed_url [0 ].signed_url
244
- signed_url_cache [fqn_with_source ] = signed_url
245
-
246
- return doc
183
+ return [doc for doc in outputs ["context" ]]
247
184
248
185
def _intent_summary_search (self , query : str ):
249
186
url = f"https://api.search.brave.com/res/v1/web/search?q={ query } &summary=1"
@@ -291,17 +228,13 @@ async def _sse_wrap(self, gen):
291
228
async def _stream_answer (self , rag_chain , query ) -> AsyncIterator [BaseModel ]:
292
229
async with async_timeout .timeout (GENERATION_TIMEOUT_SEC ):
293
230
try :
294
- # Caches to store the artifact repo paths and signed urls for the current request
295
- artifact_repo_cache = {}
296
- # Cache to store the signed urls for the files for the current request
297
- signed_url_cache = {}
298
231
# Process each chunk of the stream
299
232
async for chunk in rag_chain .astream (query ):
300
- # If the chunk has the context key, enrich the context with the signed urls
233
+ # If the chunk has the context key, enrich the context of the chunk
301
234
if "context" in chunk :
302
235
yield Docs (
303
236
content = self ._enrich_context_for_stream_response (
304
- chunk ["context" ], artifact_repo_cache , signed_url_cache
237
+ chunk ["context" ]
305
238
)
306
239
)
307
240
# If the chunk has the answer key, yield the answer
0 commit comments