Skip to content

Commit 2bbec15

Browse files
[DEVX-452]: Added Schema for Data Ingestion Pipeline Transformations
1 parent a91f24a commit 2bbec15

File tree

2 files changed

+19
-6
lines changed

2 files changed

+19
-6
lines changed

clarifai_datautils/multimodal/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
from clarifai_datautils.multimodal.pipeline.base import Pipeline
22
from clarifai_datautils.multimodal.pipeline.Docx import DocxPartition
33
from clarifai_datautils.multimodal.pipeline.Markdown import MarkdownPartition
4-
from clarifai_datautils.multimodal.pipeline.PDF import PDFPartition
4+
from clarifai_datautils.multimodal.pipeline.PDF import PDFPartition, PDFPartitionMultimodal
55
from clarifai_datautils.multimodal.pipeline.Text import TextPartition
66

77
__all__ = [

clarifai_datautils/multimodal/pipeline/base.py

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,29 @@
11
import os
22
from concurrent.futures import ThreadPoolExecutor
33
from typing import List, Type
4+
from schema import And, Schema
45

56
from tqdm import tqdm
67

78
from .basetransform import BaseTransform
89
from .loaders import MultiModalLoader, TextDataLoader
910

1011

12+
def get_schema() -> Schema:
13+
"""Initialize the schema for Data Ingestion Pipeline transformations.
14+
15+
This schema validates:
16+
17+
- transformations must be a list
18+
- First item in the list must be one of the following: PDFPartition, TextPartition, PDFPartitionMultimodal, DocxPartition, MarkdownPartition
19+
- Each item in the list must be of BaseTransform instance
20+
21+
Returns:
22+
Schema: The schema for transformations.
23+
"""
24+
return Schema(And(list, lambda x: x[0].__class__.__name__ in ['PDFPartition', 'TextPartition', 'PDFPartitionMultimodal', 'DocxPartition', 'MarkdownPartition'], lambda x: all(isinstance(item, BaseTransform) for item in x)), error="Invalid transformations data.")
25+
26+
1127
class Pipeline:
1228
"""Text processing pipeline object from files"""
1329

@@ -25,11 +41,8 @@ def __init__(
2541
"""
2642
self.name = name
2743
self.transformations = transformations
28-
for transform in self.transformations:
29-
if not isinstance(transform, BaseTransform):
30-
raise ValueError('All transformations should be of type BaseTransform.')
31-
32-
#TODO: Schema for transformations
44+
self.transformation_schema = get_schema()
45+
self.transformation_schema.validate(self.transformations)
3346

3447
def run(self,
3548
files: str = None,

0 commit comments

Comments
 (0)