diff --git a/langchain_couchbase/cache.py b/langchain_couchbase/cache.py index fcb7e18..4579270 100644 --- a/langchain_couchbase/cache.py +++ b/langchain_couchbase/cache.py @@ -11,7 +11,7 @@ import json import logging from datetime import timedelta -from typing import Any, Dict, Optional, Union +from typing import Any, Optional from couchbase.cluster import Cluster from couchbase.search import MatchQuery @@ -21,9 +21,14 @@ from langchain_core.load.load import loads from langchain_core.outputs import Generation +from langchain_couchbase.utils import ( + check_bucket_exists, + check_scope_and_collection_exists, + validate_ttl, +) from langchain_couchbase.vectorstores import CouchbaseSearchVectorStore -logger = logging.getLogger(__file__) +logger = logging.getLogger(__name__) def _hash(_input: str) -> str: @@ -51,7 +56,7 @@ def _dumps_generations(generations: RETURN_VAL_TYPE) -> str: return json.dumps([dumps(_item) for _item in generations]) -def _loads_generations(generations_str: str) -> Union[RETURN_VAL_TYPE, None]: +def _loads_generations(generations_str: str) -> Optional[RETURN_VAL_TYPE]: """ Deserialization of a string into a generic RETURN_VAL_TYPE (i.e. a sequence of `Generation`). @@ -90,16 +95,6 @@ def _loads_generations(generations_str: str) -> Union[RETURN_VAL_TYPE, None]: return None -def _validate_ttl(ttl: Optional[timedelta]) -> None: - """Validate the time to live""" - if not isinstance(ttl, timedelta): - raise ValueError(f"ttl should be of type timedelta but was {type(ttl)}.") - if ttl <= timedelta(seconds=0): - raise ValueError( - f"ttl must be greater than 0 but was {ttl.total_seconds()} seconds." - ) - - class CouchbaseCache(BaseCache): """Couchbase LLM Cache LLM Cache that uses Couchbase as the backend @@ -109,44 +104,6 @@ class CouchbaseCache(BaseCache): LLM = "llm" RETURN_VAL = "return_val" - def _check_bucket_exists(self) -> bool: - """Check if the bucket exists in the linked Couchbase cluster""" - bucket_manager = self._cluster.buckets() - try: - bucket_manager.get_bucket(self._bucket_name) - return True - except Exception: - return False - - def _check_scope_and_collection_exists(self) -> bool: - """Check if the scope and collection exists in the linked Couchbase bucket - Raises a ValueError if either is not found""" - scope_collection_map: Dict[str, Any] = {} - - # Get a list of all scopes in the bucket - for scope in self._bucket.collections().get_all_scopes(): - scope_collection_map[scope.name] = [] - - # Get a list of all the collections in the scope - for collection in scope.collections: - scope_collection_map[scope.name].append(collection.name) - - # Check if the scope exists - if self._scope_name not in scope_collection_map.keys(): - raise ValueError( - f"Scope {self._scope_name} not found in Couchbase " - f"bucket {self._bucket_name}" - ) - - # Check if the collection exists in the scope - if self._collection_name not in scope_collection_map[self._scope_name]: - raise ValueError( - f"Collection {self._collection_name} not found in scope " - f"{self._scope_name} in Couchbase bucket {self._bucket_name}" - ) - - return True - def __init__( self, cluster: Cluster, @@ -154,7 +111,7 @@ def __init__( scope_name: str, collection_name: str, ttl: Optional[timedelta] = None, - **kwargs: Dict[str, Any], + **kwargs: Any, ) -> None: """Initialize the Couchbase LLM Cache Args: @@ -173,24 +130,22 @@ def __init__( ) self._cluster = cluster - self._bucket_name = bucket_name self._scope_name = scope_name self._collection_name = collection_name - self._ttl = None # Check if the bucket exists - if not self._check_bucket_exists(): + if not check_bucket_exists(cluster, bucket_name): raise ValueError( - f"Bucket {self._bucket_name} does not exist. " - " Please create the bucket before searching." + f"Bucket {bucket_name} does not exist. " + "Please create the bucket before searching." ) try: - self._bucket = self._cluster.bucket(self._bucket_name) - self._scope = self._bucket.scope(self._scope_name) - self._collection = self._scope.collection(self._collection_name) + self._bucket = self._cluster.bucket(bucket_name) + self._scope = self._bucket.scope(scope_name) + self._collection = self._scope.collection(collection_name) except Exception as e: raise ValueError( "Error connecting to couchbase. " @@ -198,14 +153,13 @@ def __init__( ) from e # Check if the scope and collection exists. Throws ValueError if they don't - try: - self._check_scope_and_collection_exists() - except Exception as e: - raise e + check_scope_and_collection_exists( + self._bucket, scope_name, collection_name, bucket_name + ) # Check if the time to live is provided and valid if ttl is not None: - _validate_ttl(ttl) + validate_ttl(ttl) self._ttl = ttl def lookup(self, prompt: str, llm_string: str) -> Optional[RETURN_VAL_TYPE]: @@ -231,16 +185,13 @@ def update(self, prompt: str, llm_string: str, return_val: RETURN_VAL_TYPE) -> N } document_key = self._generate_key(prompt, llm_string) try: - if self._ttl: - self._collection.upsert( - key=document_key, - value=doc, - expiry=self._ttl, - ) - else: - self._collection.upsert(key=document_key, value=doc) + self._collection.upsert( + key=document_key, + value=doc, + **({"expiry": self._ttl} if self._ttl else {}), + ) except Exception: - logger.error("Error updating cache") + logger.exception("Error updating cache") def clear(self, **kwargs: Any) -> None: """Clear the cache. @@ -251,7 +202,7 @@ def clear(self, **kwargs: Any) -> None: query = f"DELETE FROM `{self._collection_name}`" self._scope.query(query).execute() except Exception: - logger.error("Error clearing cache. Please check if you have an index.") + logger.exception("Error clearing cache. Please check if you have an index.") class CouchbaseSemanticCache(BaseCache, CouchbaseSearchVectorStore): @@ -300,10 +251,10 @@ def __init__( self._ttl = None # Check if the bucket exists - if not self._check_bucket_exists(): + if not check_bucket_exists(cluster, bucket_name): raise ValueError( - f"Bucket {self._bucket_name} does not exist. " - " Please create the bucket before searching." + f"Bucket {bucket_name} does not exist. " + "Please create the bucket before searching." ) try: @@ -317,15 +268,14 @@ def __init__( ) from e # Check if the scope and collection exists. Throws ValueError if they don't - try: - self._check_scope_and_collection_exists() - except Exception as e: - raise e + check_scope_and_collection_exists( + self._bucket, scope_name, collection_name, bucket_name + ) self.score_threshold = score_threshold if ttl is not None: - _validate_ttl(ttl) + validate_ttl(ttl) self._ttl = ttl # Initialize the vector store @@ -370,7 +320,7 @@ def update(self, prompt: str, llm_string: str, return_val: RETURN_VAL_TYPE) -> N ttl=self._ttl, ) except Exception: - logger.error("Error updating cache") + logger.exception("Error updating cache") def clear(self, **kwargs: Any) -> None: """Clear the cache. @@ -381,4 +331,4 @@ def clear(self, **kwargs: Any) -> None: query = f"DELETE FROM `{self._collection_name}`" self._scope.query(query).execute() except Exception: - logger.error("Error clearing cache. Please check if you have an index.") + logger.exception("Error clearing cache. Please check if you have an index.") diff --git a/langchain_couchbase/chat_message_histories.py b/langchain_couchbase/chat_message_histories.py index 172b79b..dad479c 100644 --- a/langchain_couchbase/chat_message_histories.py +++ b/langchain_couchbase/chat_message_histories.py @@ -4,7 +4,7 @@ import time import uuid from datetime import timedelta -from typing import Any, Dict, List, Optional, Sequence +from typing import List, Optional, Sequence from couchbase.cluster import Cluster from langchain_core.chat_history import BaseChatMessageHistory @@ -14,6 +14,12 @@ messages_from_dict, ) +from langchain_couchbase.utils import ( + check_bucket_exists, + check_scope_and_collection_exists, + validate_ttl, +) + logger = logging.getLogger(__name__) DEFAULT_SESSION_ID_KEY = "session_id" @@ -23,60 +29,11 @@ DEFAULT_BATCH_SIZE = 100 -def _validate_ttl(ttl: Optional[timedelta]) -> None: - """Validate the time to live""" - if not isinstance(ttl, timedelta): - raise ValueError(f"ttl should be of type timedelta but was {type(ttl)}.") - if ttl <= timedelta(seconds=0): - raise ValueError( - f"ttl must be greater than 0 but was {ttl.total_seconds()} seconds." - ) - - class CouchbaseChatMessageHistory(BaseChatMessageHistory): """Couchbase Chat Message History Chat message history that uses Couchbase as the storage """ - def _check_bucket_exists(self) -> bool: - """Check if the bucket exists in the linked Couchbase cluster""" - bucket_manager = self._cluster.buckets() - try: - bucket_manager.get_bucket(self._bucket_name) - return True - except Exception: - return False - - def _check_scope_and_collection_exists(self) -> bool: - """Check if the scope and collection exists in the linked Couchbase bucket - Raises a ValueError if either is not found""" - scope_collection_map: Dict[str, Any] = {} - - # Get a list of all scopes in the bucket - for scope in self._bucket.collections().get_all_scopes(): - scope_collection_map[scope.name] = [] - - # Get a list of all the collections in the scope - for collection in scope.collections: - scope_collection_map[scope.name].append(collection.name) - - # Check if the scope exists - if self._scope_name not in scope_collection_map.keys(): - raise ValueError( - f"Scope {self._scope_name} not found in Couchbase " - f"bucket {self._bucket_name}" - ) - - # Check if the collection exists in the scope - if self._collection_name not in scope_collection_map[self._scope_name]: - raise ValueError( - f"Collection {self._collection_name} not found in scope " - f"{self._scope_name} in Couchbase bucket " - f"{self._bucket_name}" - ) - - return True - def __init__( self, *, @@ -121,10 +78,10 @@ def __init__( self._ttl = None # Check if the bucket exists - if not self._check_bucket_exists(): + if not check_bucket_exists(cluster, bucket_name): raise ValueError( - f"Bucket {self._bucket_name} does not exist. " - " Please create the bucket before searching." + f"Bucket {bucket_name} does not exist. " + "Please create the bucket before searching." ) try: @@ -138,10 +95,9 @@ def __init__( ) from e # Check if the scope and collection exists. Throws ValueError if they don't - try: - self._check_scope_and_collection_exists() - except Exception as e: - raise e + check_scope_and_collection_exists( + self._bucket, scope_name, collection_name, bucket_name + ) self._session_id_key = session_id_key self._message_key = message_key @@ -150,7 +106,7 @@ def __init__( self._ts_key = DEFAULT_TS_KEY if ttl is not None: - _validate_ttl(ttl) + validate_ttl(ttl) self._ttl = ttl # Create an index if it does not exist if requested @@ -165,8 +121,8 @@ def __init__( try: self._scope.query(index_creation_query).execute() - except Exception as e: - logger.error("Error creating index: ", e) + except Exception: + logger.exception("Error creating index") def add_message(self, message: BaseMessage) -> None: """Add a message to the cache""" @@ -196,8 +152,8 @@ def add_message(self, message: BaseMessage) -> None: self._ts_key: timestamp, }, ) - except Exception as e: - logger.error("Error adding message: ", e) + except Exception: + logger.exception("Error adding message") def add_messages(self, messages: Sequence[BaseMessage]) -> None: """Add messages to the cache in a batched manner""" @@ -227,20 +183,20 @@ def add_messages(self, messages: Sequence[BaseMessage]) -> None: self._collection.insert_multi(insert_batch, expiry=self._ttl) else: self._collection.insert_multi(insert_batch) - except Exception as e: - logger.error("Error adding messages: ", e) + except Exception: + logger.exception("Error adding messages") def clear(self) -> None: """Clear the cache""" # Delete all documents in the collection with the session_id clear_query = ( - f"DELETE FROM `{self._collection_name}`" - + f"WHERE {self._session_id_key}=$session_id" + f"DELETE FROM `{self._collection_name}` " + f"WHERE {self._session_id_key}=$session_id" ) try: self._scope.query(clear_query, session_id=self._session_id).execute() - except Exception as e: - logger.error("Error clearing cache: ", e) + except Exception: + logger.exception("Error clearing cache") @property def messages(self) -> List[BaseMessage]: @@ -256,8 +212,8 @@ def messages(self) -> List[BaseMessage]: result = self._scope.query(fetch_query, session_id=self._session_id) for document in result: message_items.append(document[f"{self._message_key}"]) - except Exception as e: - logger.error("Error fetching messages: ", e) + except Exception: + logger.exception("Error fetching messages") return messages_from_dict(message_items) diff --git a/langchain_couchbase/utils.py b/langchain_couchbase/utils.py new file mode 100644 index 0000000..78eacdc --- /dev/null +++ b/langchain_couchbase/utils.py @@ -0,0 +1,77 @@ +"""Shared utility functions for langchain-couchbase.""" + +from datetime import timedelta +from typing import Any, Optional + +from couchbase.cluster import Cluster + + +def check_bucket_exists(cluster: Cluster, bucket_name: str) -> bool: + """Check if the bucket exists in the linked Couchbase cluster. + + Args: + cluster: Couchbase cluster object with active connection. + bucket_name: Name of the bucket to check. + + Returns: + True if the bucket exists, False otherwise. + """ + bucket_manager = cluster.buckets() + try: + bucket_manager.get_bucket(bucket_name) + return True + except Exception: + return False + + +def check_scope_and_collection_exists( + bucket: Any, + scope_name: str, + collection_name: str, + bucket_name: str, +) -> bool: + """Check if the scope and collection exists in the linked Couchbase bucket. + + Uses early-exit pattern for better performance on large clusters. + + Args: + bucket: Couchbase bucket object. + scope_name: Name of the scope to check. + collection_name: Name of the collection to check. + bucket_name: Name of the bucket (for error messages). + + Returns: + True if both scope and collection exist. + + Raises: + ValueError: If scope or collection is not found. + """ + for scope in bucket.collections().get_all_scopes(): + if scope.name == scope_name: + if collection_name in [c.name for c in scope.collections]: + return True + raise ValueError( + f"Collection {collection_name} not found in scope " + f"{scope_name} in Couchbase bucket {bucket_name}" + ) + + raise ValueError( + f"Scope {scope_name} not found in Couchbase bucket {bucket_name}" + ) + + +def validate_ttl(ttl: Optional[timedelta]) -> None: + """Validate the time to live value. + + Args: + ttl: Time to live as a timedelta. + + Raises: + ValueError: If ttl is not a timedelta or is not greater than 0. + """ + if not isinstance(ttl, timedelta): + raise ValueError(f"ttl should be of type timedelta but was {type(ttl)}.") + if ttl <= timedelta(seconds=0): + raise ValueError( + f"ttl must be greater than 0 but was {ttl.total_seconds()} seconds." + ) diff --git a/langchain_couchbase/vectorstores/base_vector_store.py b/langchain_couchbase/vectorstores/base_vector_store.py index 6c67855..a790295 100644 --- a/langchain_couchbase/vectorstores/base_vector_store.py +++ b/langchain_couchbase/vectorstores/base_vector_store.py @@ -6,7 +6,6 @@ from typing import ( TYPE_CHECKING, Any, - Dict, List, Optional, ) @@ -16,6 +15,11 @@ from langchain_core.embeddings import Embeddings from langchain_core.vectorstores import VectorStore +from langchain_couchbase.utils import ( + check_bucket_exists, + check_scope_and_collection_exists, +) + if TYPE_CHECKING: from collections.abc import Iterable @@ -32,44 +36,6 @@ class BaseCouchbaseVectorStore(VectorStore): _default_text_key = "text" _default_embedding_key = "embedding" - def _check_bucket_exists(self) -> bool: - """Check if the bucket exists in the linked Couchbase cluster""" - bucket_manager = self._cluster.buckets() - try: - bucket_manager.get_bucket(self._bucket_name) - return True - except Exception: - return False - - def _check_scope_and_collection_exists(self) -> bool: - """Check if the scope and collection exists in the linked Couchbase bucket - Raises a ValueError if either is not found""" - scope_collection_map: Dict[str, Any] = {} - - # Get a list of all scopes in the bucket - for scope in self._bucket.collections().get_all_scopes(): - scope_collection_map[scope.name] = [] - - # Get a list of all the collections in the scope - for collection in scope.collections: - scope_collection_map[scope.name].append(collection.name) - - # Check if the scope exists - if self._scope_name not in scope_collection_map.keys(): - raise ValueError( - f"Scope {self._scope_name} not found in Couchbase " - f"bucket {self._bucket_name}" - ) - - # Check if the collection exists in the scope - if self._collection_name not in scope_collection_map[self._scope_name]: - raise ValueError( - f"Collection {self._collection_name} not found in scope " - f"{self._scope_name} in Couchbase bucket {self._bucket_name}" - ) - - return True - def __init__( self, cluster: Cluster, @@ -85,19 +51,15 @@ def __init__( Initialize the Couchbase Base Vector Store for data input and output. Args: - cluster (Cluster): couchbase cluster object with active connection. bucket_name (str): name of bucket to store documents in. scope_name (str): name of scope in the bucket to store documents in. collection_name (str): name of collection in the scope to store documents in embedding (Embeddings): embedding function to use. - index_name (str): name of the Search index to use. text_key (optional[str]): key in document to use as text. Set to text by default. embedding_key (optional[str]): key in document to use for the embeddings. Set to embedding by default. - scoped_index (optional[bool]): specify whether the index is a scoped index. - Set to True by default. """ if not isinstance(cluster, Cluster): raise ValueError( @@ -127,10 +89,10 @@ def __init__( self._embedding_key = embedding_key # Check if the bucket exists - if not self._check_bucket_exists(): + if not check_bucket_exists(cluster, bucket_name): raise ValueError( - f"Bucket {self._bucket_name} does not exist. " - " Please create the bucket before searching." + f"Bucket {bucket_name} does not exist. " + "Please create the bucket before searching." ) try: @@ -144,10 +106,9 @@ def __init__( ) from e # Check if the scope and collection exists. Throws ValueError if they don't - try: - self._check_scope_and_collection_exists() - except Exception as e: - raise e + check_scope_and_collection_exists( + self._bucket, scope_name, collection_name, bucket_name + ) def add_texts( self, diff --git a/langchain_couchbase/vectorstores/query_vector_store.py b/langchain_couchbase/vectorstores/query_vector_store.py index 827ec6c..7a40640 100644 --- a/langchain_couchbase/vectorstores/query_vector_store.py +++ b/langchain_couchbase/vectorstores/query_vector_store.py @@ -335,7 +335,10 @@ def similarity_search_with_score_by_vector( f"'{self._distance_metric.value}')" ) - escaped_fields = ", ".join(_escape_field(field) for field in fields) + ", " if fields else "" + if fields: + escaped_fields = ", ".join(_escape_field(field) for field in fields) + ", " + else: + escaped_fields = "" if not where_str: search_query = ( @@ -360,7 +363,7 @@ def similarity_search_with_score_by_vector( # Parse the results for row in search_iter.rows(): - text = row.pop(self._text_key) + text = row.pop(self._text_key, "") id = row.pop("id", "") distance = row.pop("distance", 0) metadata = {} @@ -531,7 +534,8 @@ def create_index( try: INDEX_CREATE_QUERY = ( f"CREATE VECTOR INDEX {index_name} ON {self._collection_name} " - f"({_escape_field(vector_field)} VECTOR) INCLUDE ({escaped_index_fields}) " + f"({_escape_field(vector_field)} VECTOR) " + f"INCLUDE ({escaped_index_fields}) " f"{where_clause} USING GSI WITH {index_params}" ) self._scope.query(INDEX_CREATE_QUERY).execute() diff --git a/langchain_couchbase/vectorstores/search_vector_store.py b/langchain_couchbase/vectorstores/search_vector_store.py index bf25f7b..2215b05 100644 --- a/langchain_couchbase/vectorstores/search_vector_store.py +++ b/langchain_couchbase/vectorstores/search_vector_store.py @@ -210,8 +210,7 @@ def _check_filter(self, filter: SearchQuery) -> bool: Raises a ValueError if the filter is not valid.""" if isinstance(filter, SearchQuery): return True - raise ValueError(f"filter must be a SearchQuery object, got" - f"{type(filter)}") + raise ValueError(f"filter must be a SearchQuery object, got {type(filter)}") def __init__( self, @@ -261,10 +260,7 @@ def __init__( self._scoped_index = scoped_index # Check if the index exists. Throws ValueError if it doesn't - try: - self._check_index_exists() - except Exception as e: - raise e + self._check_index_exists() def _format_metadata(self, row_fields: Dict[str, Any]) -> Dict[str, Any]: """Helper method to format the metadata from the Couchbase Search API. @@ -290,7 +286,7 @@ def similarity_search( self, query: str, k: int = 4, - search_options: Optional[Dict[str, Any]] = {}, + search_options: Optional[Dict[str, Any]] = None, filter: Optional[SearchQuery] = None, **kwargs: Any, ) -> List[Document]: @@ -352,7 +348,7 @@ def similarity_search_with_score_by_vector( self, embedding: List[float], k: int = 4, - search_options: Optional[Dict[str, Any]] = {}, + search_options: Optional[Dict[str, Any]] = None, filter: Optional[SearchQuery] = None, **kwargs: Any, ) -> List[Tuple[Document, float]]: @@ -405,6 +401,8 @@ def similarity_search_with_score_by_vector( - Use ``filter`` for efficient pre-search filtering, especially with large datasets - Both parameters can be used together for complex search scenarios """ # noqa: E501 + if search_options is None: + search_options = {} fields = kwargs.get("fields", ["*"]) @@ -479,7 +477,7 @@ def similarity_search_with_score( self, query: str, k: int = 4, - search_options: Optional[Dict[str, Any]] = {}, + search_options: Optional[Dict[str, Any]] = None, filter: Optional[SearchQuery] = None, **kwargs: Any, ) -> List[Tuple[Document, float]]: @@ -542,7 +540,7 @@ def similarity_search_by_vector( self, embedding: List[float], k: int = 4, - search_options: Optional[Dict[str, Any]] = {}, + search_options: Optional[Dict[str, Any]] = None, filter: Optional[SearchQuery] = None, **kwargs: Any, ) -> List[Document]: