feat: add run_async to TextEmbeddingRetriever, MultiQueryEmbeddingRetriever, and MultiQueryTextRetriever#11367
Open
sachinn854 wants to merge 3 commits into
Open
Conversation
…riever, and MultiQueryTextRetriever
|
@sachinn854 is attempting to deploy a commit to the deepset Team on Vercel. A member of the Team first needs to authorize it. |
There was a problem hiding this comment.
Pull request overview
Note
Copilot was unable to run its full agentic suite in this review.
Adds native coroutine support (run_async) to several retriever components so they can execute efficiently inside AsyncPipeline, with a thread-based fallback for sync-only wrapped components.
Changes:
- Implemented
run_asyncinTextEmbeddingRetriever,MultiQueryTextRetriever, andMultiQueryEmbeddingRetriever. - Added async test coverage (including
AsyncPipelineintegration tests) for the new async behavior and sync fallbacks. - Added release notes describing the new coroutine execution and fallback behavior.
Reviewed changes
Copilot reviewed 7 out of 7 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
haystack/components/retrievers/text_embedding_retriever.py |
Adds run_async with async-first execution and thread fallback for sync embedders/retrievers. |
haystack/components/retrievers/multi_query_text_retriever.py |
Adds concurrent run_async for multi-query text retrieval with per-query async execution and dedup/sort. |
haystack/components/retrievers/multi_query_embedding_retriever.py |
Adds concurrent run_async for multi-query embedding retrieval, including async embedder + retriever execution. |
test/components/retrievers/test_text_embedding_retriever_async.py |
New tests for TextEmbeddingRetriever.run_async + pipeline integration + sync fallback. |
test/components/retrievers/test_multi_query_text_retriever_async.py |
New tests for MultiQueryTextRetriever.run_async, dedup, ordering, fallback, and pipeline integration. |
test/components/retrievers/test_multi_query_embedding_retriever_async.py |
New tests for MultiQueryEmbeddingRetriever.run_async, dedup, ordering, fallback, and pipeline integration. |
releasenotes/notes/add-run-async-to-retrievers-a265779e909abc2c.yaml |
Documents the new run_async support and fallback behavior. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Comment on lines
+109
to
+112
| @component.output_types(documents=list[Document]) | ||
| async def run_async( | ||
| self, queries: list[str], retriever_kwargs: dict[str, Any] | None = None | ||
| ) -> dict[str, list[Document]]: |
Comment on lines
+125
to
+134
| retriever_kwargs = retriever_kwargs or {} | ||
|
|
||
| if not self._is_warmed_up: | ||
| self.warm_up() | ||
|
|
||
| results = await asyncio.gather(*[self._run_one_async(q, retriever_kwargs) for q in queries]) | ||
| docs: list[Document] = [doc for result in results if result for doc in result] | ||
| docs = _deduplicate_documents(docs) | ||
| docs.sort(key=lambda x: x.score or 0.0, reverse=True) | ||
| return {"documents": docs} |
Comment on lines
+129
to
+132
| @component.output_types(documents=list[Document]) | ||
| async def run_async( | ||
| self, queries: list[str], retriever_kwargs: dict[str, Any] | None = None | ||
| ) -> dict[str, list[Document]]: |
Comment on lines
+145
to
+154
| retriever_kwargs = retriever_kwargs or {} | ||
|
|
||
| if not self._is_warmed_up: | ||
| self.warm_up() | ||
|
|
||
| results = await asyncio.gather(*[self._run_one_async(q, retriever_kwargs) for q in queries]) | ||
| docs: list[Document] = [doc for result in results if result for doc in result] | ||
| docs = _deduplicate_documents(docs) | ||
| docs.sort(key=lambda x: x.score or 0.0, reverse=True) | ||
| return {"documents": docs} |
Comment on lines
+128
to
+143
| loop = asyncio.get_running_loop() | ||
|
|
||
| if hasattr(self.text_embedder, "run_async") and callable(self.text_embedder.run_async): | ||
| embedding_result = await self.text_embedder.run_async(text=query) | ||
| else: | ||
| embedding_result = await loop.run_in_executor(None, lambda: self.text_embedder.run(text=query)) | ||
|
|
||
| if hasattr(self.retriever, "run_async") and callable(self.retriever.run_async): | ||
| result = await self.retriever.run_async( | ||
| query_embedding=embedding_result["embedding"], filters=filters, top_k=top_k | ||
| ) | ||
| else: | ||
| result = await loop.run_in_executor( | ||
| None, | ||
| lambda: self.retriever.run(query_embedding=embedding_result["embedding"], filters=filters, top_k=top_k), | ||
| ) |
| @pytest.mark.asyncio | ||
| async def test_run_async_deduplication(self): | ||
| doc2 = Document(content="Wind energy is clean", id="doc2", score=0.8) | ||
| # doc3 shares the same id as doc1 — simulates the same doc retrieved by different queries |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Related Issues
run_asynctoMultiQueryEmbeddingRetriever,MultiQueryTextRetriever, andTextEmbeddingRetriever#11358Proposed Changes:
TextEmbeddingRetriever,MultiQueryEmbeddingRetriever, andMultiQueryTextRetrieverdid notimplement
run_async, soAsyncPipelinefell back to running them in a thread executor even whentheir wrapped components supported native async execution.
TextEmbeddingRetriever.run_async: chainstext_embedderandretrievercalls sequentially,using each component's
run_asyncif available, otherwise falls back toloop.run_in_executorMultiQueryEmbeddingRetriever.run_async: replacesThreadPoolExecutorwithasyncio.gathervia a new
_run_one_asynchelper; usesrun_asyncon both the embedder and retriever when availableMultiQueryTextRetriever.run_async: same pattern asMultiQueryEmbeddingRetrieverAll three follow the pattern already established in
MultiRetriever.run_async.How did you test it?
test/components/retrievers/test_text_embedding_retriever_async.pytest/components/retrievers/test_multi_query_embedding_retriever_async.pytest/components/retrievers/test_multi_query_text_retriever_async.pyrun_asyncis absentNotes for the reviewer
_run_on_thread(used by syncrun()) is unchanged — only new async methods were added.The
run_asyncsignature matchesrunexactly in all three components.Checklist
fix:,feat:,build:,chore:,ci:,docs:,style:,refactor:,perf:,test:and added!in case the PR includes breaking changes.