Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Tiltfile
Original file line number Diff line number Diff line change
Expand Up @@ -394,14 +394,15 @@ extractor_docker_build_config = {
"ref": document_extractor_full_image_name,
"context": ".",
"dockerfile": extractor_dockerfile,
"ignore": IGNORE_BASE + libs_ignore_except(["extractor-api-lib"]),
"ignore": IGNORE_BASE + libs_ignore_except(["extractor-api-lib", "rag-core-lib"]),
}

# Add build args and live_update based on dev mode
if dev_mode:
extractor_docker_build_config["live_update"] = [
sync(extractor_context, "/app/services/document-extractor"),
sync(core_library_context +"/extractor-api-lib", "/app/libs/extractor-api-lib"),
sync(core_library_context +"/rag-core-lib", "/app/libs/rag-core-lib"),
]
else:
# Use prod-local for Tilt with production Dockerfile
Expand Down
5 changes: 4 additions & 1 deletion libs/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,14 @@ RUN poetry config virtualenvs.create false
RUN if [ "$TEST" = "1" ]; then \
rm ../poetry.lock; rm ../pyproject.toml; \
# Install rag-core-lib dependencies if testing libraries that depend on it \
if [ "${DIRECTORY}" = "admin-api-lib" ] || [ "${DIRECTORY}" = "rag-core-api" ]; then \
if [ "${DIRECTORY}" = "admin-api-lib" ] || [ "${DIRECTORY}" = "rag-core-api" ] || [ "${DIRECTORY}" = "extractor-api-lib" ]; then \
cd ../rag-core-lib && poetry install --with dev,test; \
cd ../${DIRECTORY}; \
fi; \
poetry install --with dev,test; \
if [ "${DIRECTORY}" = "admin-api-lib" ] || [ "${DIRECTORY}" = "rag-core-api" ] || [ "${DIRECTORY}" = "extractor-api-lib" ]; then \
pip install -e ../rag-core-lib; \
fi; \
else \
poetry install --with dev,lint; \
fi
6 changes: 3 additions & 3 deletions libs/admin-api-lib/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,12 @@ class DocumentDeleter(ABC):
"""Abstract base class for document deletion endpoint."""

@abstractmethod
async def adelete_document(self, identification: str, remove_from_key_value_store: bool = True) -> None:
async def adelete_document(
self,
identification: str,
remove_from_key_value_store: bool = True,
remove_from_storage: bool = True,
) -> None:
"""
Delete a document by its identification asynchronously.

Expand All @@ -17,6 +22,8 @@ async def adelete_document(self, identification: str, remove_from_key_value_stor
The unique identifier of the document to be deleted.
remove_from_key_value_store : bool, optional
If True, the document will also be removed from the key-value store (default is True).
remove_from_storage : bool, optional
If True, the document will also be removed from the file storage (default is True).

Returns
-------
Expand Down
78 changes: 3 additions & 75 deletions libs/admin-api-lib/src/admin_api_lib/file_services/file_service.py
Original file line number Diff line number Diff line change
@@ -1,77 +1,5 @@
"""Abstract class for dealing with I/O."""
"""Re-export core file service interface."""

import abc
from abc import ABC
from pathlib import Path
from typing import BinaryIO
from rag_core_lib.file_services.file_service import FileService


class FileService(ABC):
"""Abstract class for dealing with I/O."""

@abc.abstractmethod
def download_folder(self, source: str, target: Path) -> None:
"""Download the remote folder on "source" to the local "target" directory.

Parameters
----------
source: str
Path to the remote folder.
target: Path
Download destination path.
"""

@abc.abstractmethod
def download_file(self, source: str, target_file: BinaryIO) -> None:
"""Read a single remote file "source" into the local "target_file" file-like object.

Example usage
=============
```
s3_settings: S3Settings = get_s3_settings()
s3_service = S3Service(endpoint="endpoint", username="username", password="password", bucket_name="bucket")

with tempfile.SpooledTemporaryFile(max_size=self._iot_forecast_settings.max_model_size) as temp_file:
s3_service.download_file("remote_file", temp_file)
# do stuff with temp_file
```

Parameters
----------
source: str
Path to the remote folder.
target_file: BinaryIO
File-like object to save the data to.
"""

@abc.abstractmethod
def upload_file(self, file_path: str, file_name: str) -> None:
"""Upload a local file to the Fileservice.

Parameters
----------
file_path : str
The path to the local file to be uploaded.
file_name : str
The target path in the file storage where the file will be stored.
"""

@abc.abstractmethod
def get_all_sorted_file_names(self) -> list[str]:
"""Retrieve all file names stored in the file storage.

Returns
-------
list[str]
A list of file names stored in the file storage.
"""

@abc.abstractmethod
def delete_file(self, file_name: str) -> None:
"""Delete a file from the file storage.

Parameters
----------
file_name : str
The name of the file to be deleted from the file storage.
"""
__all__ = ["FileService"]
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,21 @@ def __init__(self, file_service: FileService, rag_api: RagApi, key_value_store:
self._rag_api = rag_api
self._key_value_store = key_value_store

async def adelete_document(self, identification: str, remove_from_key_value_store: bool = True) -> None:
@staticmethod
def _storage_key_from_identification(identification: str) -> str | None:
if identification.startswith("file:"):
storage_key = identification[len("file:") :]
return storage_key or None
if ":" in identification:
return None
return identification or None

async def adelete_document(
self,
identification: str,
remove_from_key_value_store: bool = True,
remove_from_storage: bool = True,
) -> None:
"""
Asynchronously delete a document identified by the given identification string.

Expand All @@ -57,6 +71,8 @@ async def adelete_document(self, identification: str, remove_from_key_value_stor
The unique identifier of the document to be deleted.
remove_from_key_value_store : bool, optional
If True, the document will also be removed from the key-value store (default is True).
remove_from_storage : bool, optional
If True, the document will also be removed from the file storage (default is True).

Raises
------
Expand All @@ -67,12 +83,12 @@ async def adelete_document(self, identification: str, remove_from_key_value_stor
error_messages = ""
# Delete the document from file service and vector database
logger.debug("Deleting existing document: %s", identification)
try:
if remove_from_key_value_store:
self._key_value_store.remove(identification)
self._file_service.delete_file(identification)
except Exception as e:
error_messages += f"Error while deleting {identification} from file storage\n {str(e)}\n"
if remove_from_key_value_store:
self._key_value_store.remove(identification)

if remove_from_storage:
error_messages = self._delete_from_storage(identification, error_messages)

try:
self._rag_api.remove_information_piece(
DeleteRequest(metadata=[KeyValuePair(key="document", value=json.dumps(identification))])
Expand All @@ -82,3 +98,14 @@ async def adelete_document(self, identification: str, remove_from_key_value_stor
error_messages += f"Error while deleting {identification} from vector db\n{str(e)}"
if error_messages:
raise HTTPException(404, error_messages)

def _delete_from_storage(self, identification: str, error_messages: str) -> str:
try:
storage_key = self._storage_key_from_identification(identification)
if storage_key:
self._file_service.delete_file(storage_key)
else:
logger.debug("Skipping file storage deletion for non-file source: %s", identification)
except Exception as e:
error_messages += f"Error while deleting {identification} from file storage\n {str(e)}\n"
return error_messages
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,11 @@ async def _handle_source_upload(
# Replace old document
# deletion is allowed to fail
with suppress(Exception):
await self._document_deleter.adelete_document(source_name, remove_from_key_value_store=False)
await self._document_deleter.adelete_document(
source_name,
remove_from_key_value_store=False,
remove_from_storage=False,
)

# Run blocking RAG API call in thread pool to avoid blocking event loop
await asyncio.to_thread(self._rag_api.upload_information_piece, rag_information_pieces)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,11 @@ async def _handle_source_upload(
rag_information_pieces.append(self._information_mapper.document2rag_information_piece(doc))

with suppress(Exception):
await self._document_deleter.adelete_document(source_name, remove_from_key_value_store=False)
await self._document_deleter.adelete_document(
source_name,
remove_from_key_value_store=False,
remove_from_storage=False,
)

# Run blocking RAG API call in thread pool to avoid blocking event loop
await asyncio.to_thread(self._rag_api.upload_information_piece, rag_information_pieces)
Expand Down
131 changes: 3 additions & 128 deletions libs/admin-api-lib/src/admin_api_lib/impl/file_services/s3_service.py
Original file line number Diff line number Diff line change
@@ -1,130 +1,5 @@
"""Class to handle I/O with S3 storage."""
"""Re-export core S3 service implementation."""

import logging
from pathlib import Path
from typing import BinaryIO
from rag_core_lib.impl.file_services.s3_service import S3Service

import boto3

from admin_api_lib.file_services.file_service import FileService
from admin_api_lib.impl.settings.s3_settings import S3Settings

logger = logging.getLogger(__name__)


class S3Service(FileService):
"""Class to handle I/O with S3 storage."""

def __init__(self, s3_settings: S3Settings):
"""Class to handle I/O with S3 storage.

Parameters
----------
s3_settings: S3Settings
Settings for the s3. Must contain at least the endpoint, access_key_id, secret_access_key and bucket.
"""
self._s3_settings = s3_settings
self._s3_client = boto3.client(
"s3",
endpoint_url=s3_settings.endpoint,
aws_access_key_id=s3_settings.access_key_id,
aws_secret_access_key=s3_settings.secret_access_key,
aws_session_token=None,
config=boto3.session.Config(signature_version="s3v4"),
verify=False,
)

def download_folder(self, source: str, target: Path) -> None:
"""Download the remote folder on "source" to the local "target" directory.

Parameters
----------
source: str
Path to the remote folder.
target: Path
Download destination path.
"""
target.mkdir(parents=True, exist_ok=True)

search_response = self._s3_client.list_objects_v2(
Bucket=self._s3_settings.bucket,
Prefix=source,
)
for found_content in search_response.get("Contents", []):
file_source = found_content["Key"]
target_path = target / file_source[len(source) :]
target_path.parent.mkdir(parents=True, exist_ok=True)
with open(target_path, "wb") as local_file:
self.download_file(file_source, local_file)

def download_file(self, source: str, target_file: BinaryIO) -> None:
"""Read a single remote file "source" into the local "target_file" file-like object.

Example usage
=============
```
s3_settings: S3Settings = get_s3_settings()
s3_service = S3Service(endpoint="endpoint", username="username", password="password", bucket_name="bucket")

with tempfile.SpooledTemporaryFile(max_size=self._iot_forecast_settings.max_model_size) as temp_file:
s3_service.download_file("remote_file", temp_file)
# do stuff with temp_file
```

Parameters
----------
source: str
Path to the remote folder.
target_file: BinaryIO
File-like object to save the data to.
"""
self._s3_client.download_fileobj(self._s3_settings.bucket, source, target_file)

def upload_file(self, file_path: str, file_name: str) -> None:
"""
Upload a local file to the S3 bucket.

Parameters
----------
source : Path
The path to the local file to upload.
target : str
The target path in the S3 bucket where the file will be stored.
"""
self._s3_client.upload_file(
Filename=file_path,
Bucket=self._s3_settings.bucket,
Key=file_name,
)

def get_all_sorted_file_names(self) -> list[str]:
"""Retrieve all file names stored in the S3 bucket.

Returns
-------
list[str]
A list of file names stored in the S3 bucket.
"""
file_names = []

resp = self._s3_client.list_objects_v2(Bucket=self._s3_settings.bucket)
if resp.get("Contents"):
for obj in resp["Contents"]:
file_names.append(obj["Key"])
return file_names

def delete_file(self, file_name: str) -> None:
"""Delete a file from the S3 bucket.

Parameters
----------
file_name : str
The name of the file to be deleted from the S3 bucket.
"""
try:
file_name = f"/{file_name}" if not file_name.startswith("/") else file_name
self._s3_client.delete_object(Bucket=self._s3_settings.bucket, Key=file_name)
logger.info("File %s successfully deleted.", file_name)
except Exception:
logger.exception("Error deleting file %s", file_name)
raise
__all__ = ["S3Service"]
Loading
Loading