Skip to content

Commit 18489da

Browse files
tylerhutchersonSpartee
authored andcommitted
Expand query support (#95)
This PR introduces configurable `dialect` support and a new query method for `pagination` / batching mechanism.
1 parent 582f333 commit 18489da

File tree

5 files changed

+406
-222
lines changed

5 files changed

+406
-222
lines changed

redisvl/index.py

Lines changed: 147 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,17 @@
11
import json
22
from functools import wraps
3-
from typing import TYPE_CHECKING, Any, Callable, Dict, Iterable, List, Optional, Union
3+
from typing import (
4+
TYPE_CHECKING,
5+
Any,
6+
AsyncGenerator,
7+
Callable,
8+
Dict,
9+
Generator,
10+
Iterable,
11+
List,
12+
Optional,
13+
Union,
14+
)
415

516
if TYPE_CHECKING:
617
from redis.commands.search.document import Document
@@ -477,26 +488,81 @@ def search(self, *args, **kwargs) -> Union["Result", Any]:
477488
)
478489
return results
479490

491+
def _query(self, query: BaseQuery) -> List[Dict[str, Any]]:
492+
"""Execute a query and process results."""
493+
results = self.search(query.query, query_params=query.params)
494+
# post process the results
495+
return process_results(
496+
results, query=query, storage_type=self.schema.storage_type
497+
)
498+
480499
@check_modules_present("_redis_conn")
481500
@check_index_exists()
482-
def query(self, query: "BaseQuery") -> List[Dict[str, Any]]:
483-
"""Run a query on this index.
501+
def query(self, query: BaseQuery) -> List[Dict[str, Any]]:
502+
"""Execute a query on the index.
484503
485-
This is similar to the search method, but takes a BaseQuery
486-
object directly (does not allow for the usage of a raw
487-
redis query string) and post-processes results of the search.
504+
This method takes a BaseQuery object directly, runs the search, and
505+
handles post-processing of the search.
488506
489507
Args:
490508
query (BaseQuery): The query to run.
491509
492510
Returns:
493511
List[Result]: A list of search results.
512+
513+
Example:
514+
results = index.query(query)
494515
"""
495-
results = self.search(query.query, query_params=query.params)
496-
# post process the results
497-
return process_results(
498-
results, query=query, storage_type=self.schema.storage_type
499-
)
516+
return self._query(query)
517+
518+
@check_modules_present("_redis_conn")
519+
@check_index_exists()
520+
def query_batch(self, query: BaseQuery, batch_size: int = 30) -> Generator:
521+
"""Execute a query on the index with batching.
522+
523+
This method takes a BaseQuery object directly, handles optional paging
524+
support, and post-processing of the search results.
525+
526+
Args:
527+
query (BaseQuery): The query to run.
528+
batch_size (int): The size of batches to return on each iteration.
529+
530+
Returns:
531+
List[Result]: A list of search results.
532+
533+
Raises:
534+
TypeError: If the batch size is not an integer
535+
ValueError: If the batch size is less than or equal to zero.
536+
537+
Example:
538+
for batch in index.query_batch(query, batch_size=10):
539+
# process batched results
540+
pass
541+
"""
542+
if not isinstance(batch_size, int):
543+
raise TypeError("batch_size must be an integer")
544+
545+
if batch_size <= 0:
546+
raise ValueError("batch_size must be greater than 0")
547+
548+
first = 0
549+
while True:
550+
query.set_paging(first, batch_size)
551+
batch_results = self._query(query)
552+
if not batch_results:
553+
break
554+
yield batch_results
555+
# increment the pagination tracker
556+
first += batch_size
557+
558+
@check_modules_present("_redis_conn")
559+
def listall(self) -> List[str]:
560+
"""List all search indices in Redis database.
561+
562+
Returns:
563+
List[str]: The list of indices in the database.
564+
"""
565+
return convert_bytes(self._redis_conn.client.execute_command("FT._LIST")) # type: ignore
500566

501567
@check_modules_present("_redis_conn")
502568
def exists(self) -> bool:
@@ -505,8 +571,7 @@ def exists(self) -> bool:
505571
Returns:
506572
bool: True if the index exists, False otherwise.
507573
"""
508-
indices = convert_bytes(self._redis_conn.client.execute_command("FT._LIST")) # type: ignore
509-
return self.name in indices
574+
return self.name in self.listall()
510575

511576
@check_modules_present("_redis_conn")
512577
@check_index_exists()
@@ -648,25 +713,84 @@ async def asearch(self, *args, **kwargs) -> Union["Result", Any]:
648713
)
649714
return results
650715

716+
async def _aquery(self, query: BaseQuery) -> List[Dict[str, Any]]:
717+
"""Asynchronously execute a query and process results."""
718+
results = await self.asearch(query.query, query_params=query.params)
719+
# post process the results
720+
return process_results(
721+
results, query=query, storage_type=self.schema.storage_type
722+
)
723+
651724
@check_async_modules_present("_redis_conn")
652725
@check_async_index_exists()
653-
async def aquery(self, query: "BaseQuery") -> List[Dict[str, Any]]:
654-
"""Run a query on this index.
726+
async def aquery(self, query: BaseQuery) -> List[Dict[str, Any]]:
727+
"""Asynchronously execute a query on the index.
655728
656-
This is similar to the search method, but takes a BaseQuery
657-
object directly (does not allow for the usage of a raw
658-
redis query string) and post-processes results of the search.
729+
This method takes a BaseQuery object directly, runs the search, and
730+
handles post-processing of the search.
659731
660732
Args:
661733
query (BaseQuery): The query to run.
662734
663735
Returns:
664736
List[Result]: A list of search results.
737+
738+
Example:
739+
results = await aindex.query(query)
665740
"""
666-
results = await self.asearch(query.query, query_params=query.params)
667-
# post process the results
668-
return process_results(
669-
results, query=query, storage_type=self.schema.storage_type
741+
return await self._aquery(query)
742+
743+
@check_async_modules_present("_redis_conn")
744+
@check_async_index_exists()
745+
async def aquery_batch(
746+
self, query: BaseQuery, batch_size: int = 30
747+
) -> AsyncGenerator:
748+
"""Execute a query on the index with batching.
749+
750+
This method takes a BaseQuery object directly, handles optional paging
751+
support, and post-processing of the search results.
752+
753+
Args:
754+
query (BaseQuery): The query to run.
755+
batch_size (int): The size of batches to return on each iteration.
756+
757+
Returns:
758+
List[Result]: A list of search results.
759+
760+
Raises:
761+
TypeError: If the batch size is not an integer
762+
ValueError: If the batch size is less than or equal to zero.
763+
764+
Example:
765+
async for batch in index.aquery_batch(query, batch_size=10):
766+
# process batched results
767+
pass
768+
"""
769+
if not isinstance(batch_size, int):
770+
raise TypeError("batch_size must be an integer")
771+
772+
if batch_size <= 0:
773+
raise ValueError("batch_size must be greater than 0")
774+
775+
first = 0
776+
while True:
777+
query.set_paging(first, batch_size)
778+
batch_results = await self._aquery(query)
779+
if not batch_results:
780+
break
781+
yield batch_results
782+
# increment the pagination tracker
783+
first += batch_size
784+
785+
@check_async_modules_present("_redis_conn")
786+
async def alistall(self) -> List[str]:
787+
"""List all search indices in Redis database.
788+
789+
Returns:
790+
List[str]: The list of indices in the database.
791+
"""
792+
return convert_bytes(
793+
await self._redis_conn.client.execute_command("FT._LIST") # type: ignore
670794
)
671795

672796
@check_async_modules_present("_redis_conn")
@@ -676,8 +800,7 @@ async def aexists(self) -> bool:
676800
Returns:
677801
bool: True if the index exists, False otherwise.
678802
"""
679-
indices = await self._redis_conn.client.execute_command("FT._LIST") # type: ignore
680-
return self.name in convert_bytes(indices)
803+
return self.name in await self.alistall()
681804

682805
@check_async_modules_present("_redis_conn")
683806
@check_async_index_exists()

0 commit comments

Comments
 (0)