diff --git a/python/pyproject.toml b/python/pyproject.toml index c3044451cb76..675933c65c98 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -139,7 +139,7 @@ qdrant = [ redis = [ "redis[hiredis] >= 6,< 8", "types-redis ~= 4.6.0.20240425", - "redisvl ~= 0.4" + "redisvl ~= 0.5" ] realtime = [ "websockets >= 13, < 16", diff --git a/python/semantic_kernel/connectors/memory_stores/redis/redis_memory_store.py b/python/semantic_kernel/connectors/memory_stores/redis/redis_memory_store.py index 1f68fc268028..e2df3d4700fa 100644 --- a/python/semantic_kernel/connectors/memory_stores/redis/redis_memory_store.py +++ b/python/semantic_kernel/connectors/memory_stores/redis/redis_memory_store.py @@ -129,7 +129,7 @@ async def create_collection(self, collection_name: str) -> None: if await self.does_collection_exist(collection_name): logger.info(f'Collection "{collection_name}" already exists.') else: - index_def = IndexDefinition(prefix=f"{collection_name}:", index_type=IndexType.HASH) + index_def = IndexDefinition(prefix=[f"{collection_name}:"], index_type=IndexType.HASH) schema = ( TextField(name="key"), TextField(name="metadata"), diff --git a/python/semantic_kernel/connectors/redis.py b/python/semantic_kernel/connectors/redis.py index 575624895aca..acb72ad24a63 100644 --- a/python/semantic_kernel/connectors/redis.py +++ b/python/semantic_kernel/connectors/redis.py @@ -21,6 +21,7 @@ from redisvl.query.filter import FilterExpression, Num, Tag, Text from redisvl.query.query import BaseQuery, VectorQuery from redisvl.redis.utils import array_to_buffer, buffer_to_array, convert_bytes +from redisvl.schema import IndexSchema as _RedisVLIndexSchema from redisvl.schema import StorageType from semantic_kernel.connectors.ai.embedding_generator_base import EmbeddingGeneratorBase @@ -278,7 +279,7 @@ async def ensure_collection_exists(self, **kwargs) -> None: raise VectorStoreOperationException("Invalid index type supplied.") fields = _definition_to_redis_fields(self.definition, self.collection_type) index_definition = IndexDefinition( - prefix=f"{self.collection_name}:", index_type=INDEX_TYPE_MAP[self.collection_type] + prefix=[f"{self.collection_name}:"], index_type=INDEX_TYPE_MAP[self.collection_type] ) await self.redis_database.ft(self.collection_name).create_index(fields, definition=index_definition, **kwargs) @@ -321,7 +322,10 @@ async def _inner_search( results = await self.redis_database.ft(self.collection_name).search( # type: ignore query=query.query, query_params=query.params ) - processed = process_results(results, query, STORAGE_TYPE_MAP[self.collection_type]) + schema = _RedisVLIndexSchema.from_dict({ + "index": {"name": self.collection_name, "storage_type": STORAGE_TYPE_MAP[self.collection_type].value} + }) + processed = process_results(results, query, schema) return KernelSearchResults( results=self._get_vector_search_results_from_results(desync_list(processed)), total_count=results.total, @@ -616,8 +620,9 @@ def _deserialize_store_models_to_dicts( case FieldTypes.KEY: rec[field.name] = self._unget_redis_key(rec[field.name]) case "vector": - dtype = DATATYPE_MAP_VECTOR[field.type_ or "default"] - rec[field.name] = buffer_to_array(rec[field.name], dtype) + if field.name in rec: + dtype = DATATYPE_MAP_VECTOR[field.type_ or "default"] + rec[field.name] = buffer_to_array(rec[field.name], dtype) results.append(rec) return results @@ -706,7 +711,7 @@ def _add_key(self, key: TKey, record: dict[str, Any]) -> dict[str, Any]: @override async def _inner_delete(self, keys: Sequence[str], **kwargs: Any) -> None: - await asyncio.gather(*[self.redis_database.json().delete(key, **kwargs) for key in keys]) + await asyncio.gather(*[self.redis_database.json().delete(self._get_redis_key(key), **kwargs) for key in keys]) @override def _serialize_dicts_to_store_models( diff --git a/python/tests/integration/memory/test_redis_vector_store.py b/python/tests/integration/memory/test_redis_vector_store.py new file mode 100644 index 000000000000..3757b68ff13f --- /dev/null +++ b/python/tests/integration/memory/test_redis_vector_store.py @@ -0,0 +1,351 @@ +# Copyright (c) Microsoft. All rights reserved. + +"""Extended Redis connector integration tests. + +These supplement the single-record round-trip covered by +``test_vector_store.py`` and exercise the rest of the public surface +(`RedisStore.list_collection_names`, vector search with filters, batch +CRUD, `include_vectors`, manual index creation, and no-prefix mode) +to validate end-to-end compatibility with Redis and with Valkey + +valkey-search. + +All tests require a running Redis/Valkey server reachable via +``REDIS_CONNECTION_STRING``. +""" + +import asyncio +import contextlib +from dataclasses import dataclass, field +from typing import Annotated +from uuid import uuid4 + +import pytest + +from semantic_kernel.connectors.redis import ( + RedisHashsetCollection, + RedisJsonCollection, + RedisStore, +) +from semantic_kernel.data.vector import VectorStoreField, vectorstoremodel +from semantic_kernel.exceptions import VectorStoreInitializationException + +# Vector search was broken on main due to redisvl 0.5+ API change and a +# missing guard in the hashset deserializer. Fixed by updating +# _inner_search to pass IndexSchema and guarding buffer_to_array. +# See: https://github.com/microsoft/semantic-kernel/issues/13896 + + +@vectorstoremodel +@dataclass +class CoverageModel: + """Shared record shape for all coverage tests.""" + + vector: Annotated[ + list[float] | None, + VectorStoreField( + "vector", + index_kind="hnsw", + dimensions=5, + distance_function="cosine_similarity", + type="float", + ), + ] = None + id: Annotated[str, VectorStoreField("key", type="str")] = field(default_factory=lambda: str(uuid4())) + content: Annotated[str, VectorStoreField("data", type="str", is_full_text_indexed=True)] = "content" + + +def _records() -> list[CoverageModel]: + return [ + CoverageModel(id="cov-1", content="alpha", vector=[0.1, 0.2, 0.3, 0.4, 0.5]), + CoverageModel(id="cov-2", content="beta", vector=[0.2, 0.3, 0.4, 0.5, 0.6]), + CoverageModel(id="cov-3", content="gamma", vector=[0.9, 0.8, 0.7, 0.6, 0.5]), + ] + + +async def _collect(results): + """Consume KernelSearchResults.results into a list.""" + return [r async for r in results.results] + + +async def _wait_for_indexing(collection, vector, expected_count, *, timeout=5.0, interval=0.1): + """Poll search until the index has caught up with the expected record count. + + Avoids flaky ``asyncio.sleep`` waits by retrying until the index returns + at least ``expected_count`` results or the timeout is reached. + """ + deadline = asyncio.get_event_loop().time() + timeout + while True: + results = await _collect(await collection.search(vector=vector, top=expected_count)) + if len(results) >= expected_count: + return + if asyncio.get_event_loop().time() >= deadline: + pytest.fail(f"Indexing did not complete within {timeout}s: expected {expected_count}, got {len(results)}") + await asyncio.sleep(interval) + + +@pytest.fixture +def collection_cls(request): + """Parametrized fixture selecting the concrete collection class.""" + return request.param + + +@pytest.fixture +async def collection(collection_cls): + """Yields a freshly-created collection; cleans the index up at teardown. + + Uses ``prefix_collection_name_to_key_names=True`` so each parametrized + run has its own keyspace and hashset/json tests do not collide on + raw keys. + """ + name = f"sk_cov_{uuid4().hex[:8]}" + try: + col = collection_cls( + record_type=CoverageModel, + collection_name=name, + prefix_collection_name_to_key_names=True, + ) + except (VectorStoreInitializationException, ConnectionError) as exc: + pytest.xfail(f"Failed to connect to store: {exc}") + + async with col: + try: + await col.ensure_collection_deleted() + await col.ensure_collection_exists() + yield col + finally: + with contextlib.suppress(Exception): + await col.ensure_collection_deleted() + + +_COLLECTION_CLASSES = [ + pytest.param(RedisHashsetCollection, id="hashset"), + pytest.param(RedisJsonCollection, id="json"), +] + + +@pytest.mark.parametrize("collection_cls", _COLLECTION_CLASSES, indirect=True) +class TestRedisCoverage: + async def test_collection_exists_lifecycle(self, collection): + """collection_exists tracks ensure_collection_exists / _deleted.""" + assert await collection.collection_exists() is True + await collection.ensure_collection_deleted() + assert await collection.collection_exists() is False + await collection.ensure_collection_exists() + assert await collection.collection_exists() is True + + async def test_list_collection_names_includes_created(self, collection): + """RedisStore.list_collection_names surfaces the created index via FT._LIST.""" + try: + store = RedisStore() + except (VectorStoreInitializationException, ConnectionError) as exc: + pytest.xfail(f"Failed to connect to store: {exc}") + try: + names = await store.list_collection_names() + assert collection.collection_name in names + finally: + await store.redis_database.aclose() + + async def test_batch_upsert_get_delete(self, collection): + """Multi-record upsert, get, and delete round-trip.""" + records = _records() + await collection.upsert(records) + + fetched = await collection.get([r.id for r in records]) + assert fetched is not None + assert {r.id for r in fetched} == {r.id for r in records} + + await collection.delete([r.id for r in records]) + after = await collection.get([r.id for r in records]) + assert not after + + async def test_get_include_vectors(self, collection): + """get with include_vectors=True returns the vector, False hides it.""" + [first, *_] = _records() + await collection.upsert([first]) + + with_vec = await collection.get(first.id, include_vectors=True) + without_vec = await collection.get(first.id, include_vectors=False) + + assert with_vec is not None + assert without_vec is not None + assert with_vec.vector is not None + assert without_vec.vector is None + + async def test_vector_search_basic(self, collection): + """FT.SEARCH with an HNSW query returns results ordered by distance.""" + records = _records() + await collection.upsert(records) + await _wait_for_indexing(collection, [0.1, 0.2, 0.3, 0.4, 0.5], 3) + + results = await _collect(await collection.search(vector=[0.1, 0.2, 0.3, 0.4, 0.5], top=3)) + assert len(results) == 3 + assert results[0].record.id == "cov-1" + + async def test_vector_search_top_skip(self, collection): + """top/skip paging works end-to-end.""" + await collection.upsert(_records()) + await _wait_for_indexing(collection, [0.1, 0.2, 0.3, 0.4, 0.5], 3) + + page1 = await _collect(await collection.search(vector=[0.1, 0.2, 0.3, 0.4, 0.5], top=2, skip=0)) + page2 = await _collect(await collection.search(vector=[0.1, 0.2, 0.3, 0.4, 0.5], top=2, skip=2)) + assert len(page1) == 2 + assert len(page2) == 1 + seen = {r.record.id for r in page1} | {r.record.id for r in page2} + assert seen == {"cov-1", "cov-2", "cov-3"} + + async def test_vector_search_with_tag_filter(self, collection): + """Lambda filter on a text field is translated and honoured.""" + await collection.upsert(_records()) + await _wait_for_indexing(collection, [0.1, 0.2, 0.3, 0.4, 0.5], 3) + + results = await _collect( + await collection.search( + vector=[0.1, 0.2, 0.3, 0.4, 0.5], + top=5, + filter=lambda r: r.content == "beta", + ) + ) + assert len(results) == 1 + assert results[0].record.id == "cov-2" + + async def test_vector_search_include_vectors(self, collection): + """include_vectors toggles whether the vector is returned on search hits.""" + await collection.upsert(_records()) + await _wait_for_indexing(collection, [0.1, 0.2, 0.3, 0.4, 0.5], 3) + + with_vec = await _collect( + await collection.search(vector=[0.1, 0.2, 0.3, 0.4, 0.5], top=1, include_vectors=True) + ) + without_vec = await _collect( + await collection.search(vector=[0.1, 0.2, 0.3, 0.4, 0.5], top=1, include_vectors=False) + ) + assert with_vec[0].record.vector is not None + assert without_vec[0].record.vector is None + + +class TestRedisCoverageNoPrefix: + """prefix_collection_name_to_key_names=False should round-trip by raw key.""" + + @pytest.mark.parametrize( + "collection_cls", + [ + pytest.param(RedisHashsetCollection, id="hashset"), + pytest.param(RedisJsonCollection, id="json"), + ], + ) + async def test_upsert_get_delete_without_prefix(self, collection_cls): + name = f"sk_cov_np_{uuid4().hex[:8]}" + try: + col = collection_cls( + record_type=CoverageModel, + collection_name=name, + prefix_collection_name_to_key_names=False, + ) + except (VectorStoreInitializationException, ConnectionError) as exc: + pytest.xfail(f"Failed to connect to store: {exc}") + + async with col: + await col.ensure_collection_deleted() + await col.ensure_collection_exists() + try: + rec = CoverageModel( + id=f"np-{uuid4().hex[:6]}", + content="alpha", + vector=[0.1, 0.2, 0.3, 0.4, 0.5], + ) + await col.upsert([rec]) + fetched = await col.get(rec.id) + assert fetched is not None + assert fetched.id == rec.id + await col.delete(rec.id) + assert not await col.get(rec.id) + finally: + await col.ensure_collection_deleted() + + +@pytest.mark.parametrize("collection_cls", _COLLECTION_CLASSES, indirect=True) +class TestRedisCoverageExtended: + """Extra coverage for paths not exercised by TestRedisCoverage.""" + + async def test_ensure_collection_exists_with_explicit_index(self, collection): + """ensure_collection_exists(index_definition=..., fields=...) uses the provided override.""" + from redis.commands.search.field import TextField, VectorField + from redis.commands.search.index_definition import IndexDefinition, IndexType + + await collection.ensure_collection_deleted() + assert await collection.collection_exists() is False + + index_type = IndexType.JSON if isinstance(collection, RedisJsonCollection) else IndexType.HASH + content_field = ( + TextField("$.content", as_name="content") if index_type == IndexType.JSON else TextField("content") + ) + vector_field = ( + VectorField( + "$.vector", + "HNSW", + {"TYPE": "FLOAT32", "DIM": 5, "DISTANCE_METRIC": "COSINE"}, + as_name="vector", + ) + if index_type == IndexType.JSON + else VectorField( + "vector", + "HNSW", + {"TYPE": "FLOAT32", "DIM": 5, "DISTANCE_METRIC": "COSINE"}, + ) + ) + await collection.ensure_collection_exists( + index_definition=IndexDefinition(prefix=[f"{collection.collection_name}:"], index_type=index_type), + fields=[content_field, vector_field], + ) + assert await collection.collection_exists() is True + + async def test_ensure_collection_exists_invalid_index_definition(self, collection): + """Passing a non-IndexDefinition with fields should raise.""" + from semantic_kernel.exceptions import VectorStoreOperationException + + with pytest.raises(VectorStoreOperationException, match="Invalid index type supplied."): + await collection.ensure_collection_exists(index_definition="not-an-IndexDefinition", fields=["content"]) + + async def test_vector_search_not_equal_filter(self, collection): + await collection.upsert(_records()) + await _wait_for_indexing(collection, [0.1, 0.2, 0.3, 0.4, 0.5], 3) + results = await _collect( + await collection.search( + vector=[0.1, 0.2, 0.3, 0.4, 0.5], + top=5, + filter=lambda r: r.content != "alpha", + ) + ) + ids = {r.record.id for r in results} + assert ids == {"cov-2", "cov-3"} + + async def test_vector_search_and_filter(self, collection): + await collection.upsert(_records()) + await _wait_for_indexing(collection, [0.1, 0.2, 0.3, 0.4, 0.5], 3) + results = await _collect( + await collection.search( + vector=[0.1, 0.2, 0.3, 0.4, 0.5], + top=5, + filter=lambda r: (r.content != "alpha") and (r.content != "gamma"), + ) + ) + assert {r.record.id for r in results} == {"cov-2"} + + async def test_vector_search_or_filter(self, collection): + await collection.upsert(_records()) + await _wait_for_indexing(collection, [0.1, 0.2, 0.3, 0.4, 0.5], 3) + results = await _collect( + await collection.search( + vector=[0.1, 0.2, 0.3, 0.4, 0.5], + top=5, + filter=lambda r: (r.content == "alpha") or (r.content == "gamma"), + ) + ) + assert {r.record.id for r in results} == {"cov-1", "cov-3"} + + async def test_get_without_keys_not_implemented(self, collection): + """get with no keys should raise NotImplementedError via the connector.""" + from semantic_kernel.data.vector import GetFilteredRecordOptions + + with pytest.raises(NotImplementedError): + await collection._inner_get(keys=None, options=GetFilteredRecordOptions()) diff --git a/python/tests/unit/connectors/memory/test_redis_store.py b/python/tests/unit/connectors/memory/test_redis_store.py index e779ad945a97..b619fd91e2ee 100644 --- a/python/tests/unit/connectors/memory/test_redis_store.py +++ b/python/tests/unit/connectors/memory/test_redis_store.py @@ -276,6 +276,18 @@ async def test_delete(collection_hash, collection_json, type_): await collection._inner_delete(["id1"]) +async def test_delete_with_prefix_json(collection_with_prefix_json, mock_delete_json): + """Verify JSON delete prefixes keys when prefix_collection_name_to_key_names=True.""" + await collection_with_prefix_json._inner_delete(["id1"]) + mock_delete_json.assert_called_once_with("test:id1") + + +async def test_delete_with_prefix_hash(collection_with_prefix_hash, mock_delete_hash): + """Verify hashset delete prefixes keys when prefix_collection_name_to_key_names=True.""" + await collection_with_prefix_hash._inner_delete(["id1"]) + mock_delete_hash.assert_called_once_with("test:id1") + + async def test_collection_exists(collection_hash, mock_collection_exists): await collection_hash.collection_exists() @@ -295,14 +307,66 @@ async def test_create_index(collection_hash, mock_ensure_collection_exists): await collection_hash.ensure_collection_exists() +async def test_create_index_prefix_is_list(collection_hash, mock_ensure_collection_exists): + """Verify prefix is passed as a list, not a string (#13894).""" + await collection_hash.ensure_collection_exists() + mock_ensure_collection_exists.assert_called_once() + definition = mock_ensure_collection_exists.call_args.kwargs.get("definition") + assert definition is not None + prefix_idx = definition.args.index("PREFIX") + assert definition.args[prefix_idx + 1] == 1 + assert definition.args[prefix_idx + 2] == f"{collection_hash.collection_name}:" + + async def test_create_index_manual(collection_hash, mock_ensure_collection_exists): from redis.commands.search.index_definition import IndexDefinition, IndexType fields = ["fields"] - index_definition = IndexDefinition(prefix="test:", index_type=IndexType.HASH) + index_definition = IndexDefinition(prefix=["test:"], index_type=IndexType.HASH) await collection_hash.ensure_collection_exists(index_definition=index_definition, fields=fields) async def test_create_index_fail(collection_hash, mock_ensure_collection_exists): with raises(VectorStoreOperationException, match="Invalid index type supplied."): await collection_hash.ensure_collection_exists(index_definition="index_definition", fields="fields") + + +def test_deserialize_hashset_skips_missing_vector_field(collection_hash): + # Simulate search results with include_vectors=False: vector key is absent. + records = [{"id": "id1", "content": "hello"}] + result = collection_hash._deserialize_store_models_to_dicts(records) + assert len(result) == 1 + assert result[0]["id"] == "id1" + assert result[0]["content"] == "hello" + assert "vector" not in result[0] + + +@mark.parametrize("type_", ["hashset", "json"]) +async def test_inner_search_passes_index_schema_to_process_results(collection_hash, collection_json, type_): + from unittest.mock import MagicMock + + from redisvl.schema import IndexSchema, StorageType + + from semantic_kernel.data.vector import SearchType, VectorSearchOptions + + collection = collection_hash if type_ == "hashset" else collection_json + expected_storage = StorageType.HASH if type_ == "hashset" else StorageType.JSON + + mock_results = MagicMock() + mock_results.docs = [] + mock_results.total = 0 + + with ( + patch("redis.commands.search.AsyncSearch.search", new=AsyncMock(return_value=mock_results)), + patch("semantic_kernel.connectors.redis.process_results", return_value=[]) as mock_process, + ): + await collection._inner_search( + search_type=SearchType.VECTOR, + options=VectorSearchOptions(vector_property_name="vector", top=3), + vector=[1.0, 2.0, 3.0, 4.0, 5.0], + ) + + mock_process.assert_called_once() + _results_arg, _query_arg, schema_arg = mock_process.call_args.args + assert isinstance(schema_arg, IndexSchema), "process_results must receive an IndexSchema, not a StorageType" + assert schema_arg.index.storage_type == expected_storage diff --git a/python/uv.lock b/python/uv.lock index 430794738a60..4734213419d3 100644 --- a/python/uv.lock +++ b/python/uv.lock @@ -6481,13 +6481,13 @@ requires-dist = [ { name = "azure-cosmos", marker = "extra == 'azure'", specifier = "~=4.7" }, { name = "azure-identity", specifier = ">=1.13" }, { name = "azure-search-documents", marker = "extra == 'azure'", specifier = ">=11.6.0b4" }, - { name = "boto3", marker = "extra == 'aws'", specifier = ">=1.36.4,<1.41.0" }, + { name = "boto3", marker = "extra == 'aws'", specifier = ">=1.36.4,<1.43.0" }, { name = "chromadb", marker = "extra == 'chroma'", specifier = ">=0.5,<1.4" }, { name = "cloudevents", specifier = "~=1.0" }, { name = "defusedxml", specifier = "~=0.7" }, { name = "faiss-cpu", marker = "extra == 'faiss'", specifier = ">=1.10.0" }, { name = "google-cloud-aiplatform", marker = "extra == 'google'", specifier = ">=1.114,<1.134" }, - { name = "google-genai", marker = "extra == 'google'", specifier = "~=1.51.0" }, + { name = "google-genai", marker = "extra == 'google'", specifier = ">=1.51,<1.75" }, { name = "ipykernel", marker = "extra == 'notebooks'", specifier = ">=6.29,<8.0" }, { name = "jinja2", specifier = "~=3.1" }, { name = "mcp", specifier = ">=1.26.0" }, @@ -6515,14 +6515,14 @@ requires-dist = [ { name = "psycopg", extras = ["binary", "pool"], marker = "extra == 'postgres'", specifier = "~=3.2" }, { name = "pyarrow", marker = "extra == 'usearch'", specifier = ">=12.0,<22.0" }, { name = "pybars4", specifier = "~=0.9" }, - { name = "pydantic", specifier = ">=2.0,!=2.10.0,!=2.10.1,!=2.10.2,!=2.10.3,<2.13" }, + { name = "pydantic", specifier = ">=2.0,!=2.10.0,!=2.10.1,!=2.10.2,!=2.10.3,<2.14" }, { name = "pydantic-settings", specifier = "~=2.0" }, { name = "pymilvus", marker = "extra == 'milvus'", specifier = ">=2.3,<2.7" }, { name = "pymongo", marker = "extra == 'mongo'", specifier = ">=4.8.0,<4.16" }, { name = "pyodbc", marker = "extra == 'sql'", specifier = ">=5.2" }, { name = "qdrant-client", marker = "extra == 'qdrant'", specifier = "~=1.9" }, { name = "redis", extras = ["hiredis"], marker = "extra == 'redis'", specifier = ">=6,<8" }, - { name = "redisvl", marker = "extra == 'redis'", specifier = "~=0.4" }, + { name = "redisvl", marker = "extra == 'redis'", specifier = "~=0.5" }, { name = "scipy", specifier = ">=1.15.1" }, { name = "sentence-transformers", marker = "extra == 'hugging-face'", specifier = ">=2.2,<6.0" }, { name = "torch", marker = "extra == 'hugging-face'", specifier = "==2.8.0" },