Skip to content

Commit 10259c5

Browse files
committed
feat(plugin): add built-in SizeFileListFilter
1 parent 5d5031c commit 10259c5

File tree

2 files changed

+196
-0
lines changed

2 files changed

+196
-0
lines changed
Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
/*
2+
* Copyright 2019-2021 StreamThoughts.
3+
*
4+
* Licensed to the Apache Software Foundation (ASF) under one or more
5+
* contributor license agreements. See the NOTICE file distributed with
6+
* this work for additional information regarding copyright ownership.
7+
* The ASF licenses this file to You under the Apache License, Version 2.0
8+
* (the "License"); you may not use this file except in compliance with
9+
* the License. You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package io.streamthoughts.kafka.connect.filepulse.fs.filter;
20+
21+
import io.streamthoughts.kafka.connect.filepulse.fs.PredicateFileListFilter;
22+
import io.streamthoughts.kafka.connect.filepulse.source.FileObjectMeta;
23+
import org.apache.kafka.common.config.AbstractConfig;
24+
import org.apache.kafka.common.config.ConfigDef;
25+
import org.slf4j.Logger;
26+
import org.slf4j.LoggerFactory;
27+
28+
import java.util.Map;
29+
import java.util.function.Predicate;
30+
31+
/**
32+
* A {@link PredicateFileListFilter} that allows excluding from processing files
33+
* that have not been modified since either a given maximum or minimum time in ms.
34+
*/
35+
public class SizeFileListFilter extends PredicateFileListFilter {
36+
37+
private final static String GROUP = "SizeFileListFilter";
38+
39+
private static final Logger LOG = LoggerFactory.getLogger(SizeFileListFilter.class);
40+
41+
public static final String FILE_MINIMUM_SIZE_MS_CONFIG = "file.filter.minimum.size.bytes";
42+
private static final String FILE_MINIMUM_AGE_MS_DOC =
43+
"The minimum size in bytes of a file to be eligible for processing (default: 0).";
44+
private static final long FILE_MINIMUM_SIZE_MS_DEFAULT = 0L;
45+
46+
public static final String FILE_MAXIMUM_SIZE_MS_CONFIG = "file.filter.maximum.size.bytes";
47+
private static final String FILE_MAXIMUM_SIZE_MS_DOC =
48+
"The maximum size in bytes of a file to be eligible for processing (default: Long.MAX_VALUE).";
49+
private static final long FILE_MAXIMUM_SIZE_MS_DEFAULT = Long.MAX_VALUE;
50+
51+
private Predicate<FileObjectMeta> minimumSizePredicate;
52+
53+
private Predicate<FileObjectMeta> maximumSizePredicate;
54+
55+
/**
56+
* {@inheritDoc}
57+
*/
58+
@Override
59+
public void configure(final Map<String, ?> props) {
60+
final AbstractConfig abstractConfig = new AbstractConfig(getConfigDef(), props);
61+
final Long minimumSizeBytes = abstractConfig.getLong(FILE_MINIMUM_SIZE_MS_CONFIG);
62+
this.minimumSizePredicate = it -> it.contentLength() >= minimumSizeBytes;
63+
64+
final Long maximumSizeBytes = abstractConfig.getLong(FILE_MAXIMUM_SIZE_MS_CONFIG);
65+
this.maximumSizePredicate = it -> it.contentLength() <= maximumSizeBytes;
66+
}
67+
68+
/**
69+
* {@inheritDoc}
70+
*/
71+
@Override
72+
public boolean test(final FileObjectMeta meta) {
73+
74+
if (!minimumSizePredicate.test(meta)) {
75+
LOG.debug(
76+
"Filtered '{}'. File do not match minimum size bytes predicate.",
77+
meta
78+
);
79+
return false;
80+
}
81+
82+
if (!maximumSizePredicate.test(meta)) {
83+
LOG.debug(
84+
"Filtered '{}'. File do not match maximum size bytes predicate.",
85+
meta
86+
);
87+
return false;
88+
}
89+
90+
return true;
91+
}
92+
93+
private static ConfigDef getConfigDef() {
94+
int groupCounter = 0;
95+
return new ConfigDef()
96+
.define(
97+
FILE_MINIMUM_SIZE_MS_CONFIG,
98+
ConfigDef.Type.LONG,
99+
FILE_MINIMUM_SIZE_MS_DEFAULT,
100+
ConfigDef.Range.atLeast(0),
101+
ConfigDef.Importance.HIGH,
102+
FILE_MINIMUM_AGE_MS_DOC,
103+
GROUP,
104+
groupCounter++,
105+
ConfigDef.Width.NONE,
106+
FILE_MINIMUM_SIZE_MS_CONFIG
107+
)
108+
.define(
109+
FILE_MAXIMUM_SIZE_MS_CONFIG,
110+
ConfigDef.Type.LONG,
111+
FILE_MAXIMUM_SIZE_MS_DEFAULT,
112+
ConfigDef.Range.atLeast(0),
113+
ConfigDef.Importance.HIGH,
114+
FILE_MAXIMUM_SIZE_MS_DOC,
115+
GROUP,
116+
groupCounter++,
117+
ConfigDef.Width.NONE,
118+
FILE_MAXIMUM_SIZE_MS_CONFIG
119+
);
120+
}
121+
}
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
package io.streamthoughts.kafka.connect.filepulse.fs.filter;
2+
3+
import io.streamthoughts.kafka.connect.filepulse.source.FileObjectMeta;
4+
import org.jetbrains.annotations.NotNull;
5+
import org.junit.Assert;
6+
import org.junit.Test;
7+
8+
import java.net.URI;
9+
import java.util.Map;
10+
11+
import static org.junit.Assert.*;
12+
13+
public class SizeFileListFilterTest {
14+
15+
@Test
16+
public void should_not_filter_file_given_default_limit_size_bytes() {
17+
SizeFileListFilter filter = new SizeFileListFilter();
18+
filter.configure(Map.of());
19+
Assert.assertTrue(filter.test(newFileObjectMeta(0)));
20+
}
21+
22+
@Test
23+
public void should_filter_file_given_minimum_size_bytes_superior_to_zero() {
24+
SizeFileListFilter filter = new SizeFileListFilter();
25+
filter.configure(Map.of(
26+
SizeFileListFilter.FILE_MINIMUM_SIZE_MS_CONFIG, 1
27+
));
28+
Assert.assertFalse(filter.test(newFileObjectMeta(0)));
29+
}
30+
31+
@Test
32+
public void should_filter_file_given_maximum_size_bytes() {
33+
SizeFileListFilter filter = new SizeFileListFilter();
34+
filter.configure(Map.of(
35+
SizeFileListFilter.FILE_MAXIMUM_SIZE_MS_CONFIG, 0
36+
));
37+
Assert.assertFalse(filter.test(newFileObjectMeta(1)));
38+
}
39+
40+
@NotNull
41+
private FileObjectMeta newFileObjectMeta(final long contentLength) {
42+
return new FileObjectMeta() {
43+
@Override
44+
public URI uri() {
45+
return null;
46+
}
47+
48+
@Override
49+
public String name() {
50+
return null;
51+
}
52+
53+
@Override
54+
public Long contentLength() {
55+
return contentLength;
56+
}
57+
58+
@Override
59+
public Long lastModified() {
60+
return null;
61+
}
62+
63+
@Override
64+
public ContentDigest contentDigest() {
65+
return null;
66+
}
67+
68+
@Override
69+
public Map<String, Object> userDefinedMetadata() {
70+
return null;
71+
}
72+
};
73+
}
74+
75+
}

0 commit comments

Comments
 (0)