Skip to content

Commit 6f06a29

Browse files
committed
#11 - Added WarcRecordFluxFactory and a reactive module.
1 parent 8aee045 commit 6f06a29

File tree

6 files changed

+102
-8
lines changed

6 files changed

+102
-8
lines changed

build.gradle

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,8 @@ subprojects {
9999
}
100100

101101
dependencies {
102+
implementation group: 'org.jetbrains', name: 'annotations', version: '22.0.0'
103+
102104
compileOnly group: 'org.projectlombok', name: 'lombok', version: '1.18.20'
103105
annotationProcessor group: 'org.projectlombok', name: 'lombok', version: '1.18.20'
104106
testCompileOnly group: 'org.projectlombok', name: 'lombok', version: '1.18.20'

java-warc-reactive/build.gradle

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
dependencies {
2+
implementation project(':java-warc')
3+
4+
implementation group: 'io.projectreactor', name: 'reactor-core', version: '3.4.9'
5+
}
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
package com.github.bottomlessarchive.warc.service;
2+
3+
import com.github.bottomlessarchive.warc.service.content.domain.WarcContentBlock;
4+
import com.github.bottomlessarchive.warc.service.record.domain.WarcRecord;
5+
import com.github.bottomlessarchive.warc.service.record.domain.WarcRecordType;
6+
import lombok.NonNull;
7+
import org.jetbrains.annotations.NotNull;
8+
import reactor.core.publisher.Flux;
9+
10+
import java.io.InputStream;
11+
import java.net.URI;
12+
import java.net.URL;
13+
import java.nio.charset.Charset;
14+
import java.util.Arrays;
15+
import java.util.List;
16+
17+
@SuppressWarnings("unused")
18+
public class WarcRecordFluxFactory {
19+
20+
private static final List<WarcRecordType> EVERY_WARC_RECORD_TYPE = Arrays.asList(WarcRecordType.values());
21+
22+
private WarcRecordFluxFactory() {
23+
}
24+
25+
public static <T extends WarcContentBlock> Flux<WarcRecord<T>> buildWarcRecordFlux(@NotNull @NonNull final URI warcLocation) {
26+
return Flux.fromStream(() -> WarcRecordStreamFactory.streamOf(warcLocation, EVERY_WARC_RECORD_TYPE));
27+
}
28+
29+
public static <T extends WarcContentBlock> Flux<WarcRecord<T>> buildWarcRecordFlux(@NotNull @NonNull final URI warcLocation,
30+
@NotNull @NonNull final WarcRecordType... requiredRecordTypes) {
31+
return Flux.fromStream(() -> WarcRecordStreamFactory.streamOf(warcLocation, requiredRecordTypes));
32+
}
33+
34+
public static <T extends WarcContentBlock> Flux<WarcRecord<T>> buildWarcRecordFlux(@NotNull @NonNull final URI warcLocation,
35+
@NotNull @NonNull final List<WarcRecordType> requiredRecordTypes) {
36+
return Flux.fromStream(() -> WarcRecordStreamFactory.streamOf(warcLocation, requiredRecordTypes));
37+
}
38+
39+
public static <T extends WarcContentBlock> Flux<WarcRecord<T>> buildWarcRecordFlux(@NotNull @NonNull final URL warcLocation) {
40+
return Flux.fromStream(() -> WarcRecordStreamFactory.streamOf(warcLocation, EVERY_WARC_RECORD_TYPE));
41+
}
42+
43+
public static <T extends WarcContentBlock> Flux<WarcRecord<T>> buildWarcRecordFlux(@NotNull @NonNull final URL warcLocation,
44+
@NotNull @NonNull final WarcRecordType... requiredRecordTypes) {
45+
return Flux.fromStream(() -> WarcRecordStreamFactory.streamOf(warcLocation, requiredRecordTypes));
46+
}
47+
48+
public static <T extends WarcContentBlock> Flux<WarcRecord<T>> buildWarcRecordFlux(@NotNull @NonNull final URL warcLocation,
49+
@NotNull @NonNull final List<WarcRecordType> requiredRecordTypes) {
50+
return Flux.fromStream(() -> WarcRecordStreamFactory.streamOf(warcLocation, requiredRecordTypes));
51+
}
52+
53+
public static <T extends WarcContentBlock> Flux<WarcRecord<T>> buildWarcRecordFlux(@NotNull @NonNull final InputStream warcFileLocation) {
54+
return Flux.fromStream(() -> WarcRecordStreamFactory.streamOf(warcFileLocation));
55+
}
56+
57+
public static <T extends WarcContentBlock> Flux<WarcRecord<T>> buildWarcRecordFlux(@NotNull @NonNull final InputStream warcFileLocation,
58+
@NotNull @NonNull final List<WarcRecordType> requiredRecordTypes) {
59+
return Flux.fromStream(() -> WarcRecordStreamFactory.streamOf(warcFileLocation, requiredRecordTypes));
60+
}
61+
62+
public static <T extends WarcContentBlock> Flux<WarcRecord<T>> buildWarcRecordFlux(@NotNull @NonNull final InputStream warcFileLocation,
63+
@NotNull @NonNull final Charset charset) {
64+
return Flux.fromStream(() -> WarcRecordStreamFactory.streamOf(warcFileLocation, charset));
65+
}
66+
67+
public static <T extends WarcContentBlock> Flux<WarcRecord<T>> buildWarcRecordFlux(@NotNull @NonNull final InputStream warcFileLocation,
68+
@NotNull @NonNull final Charset charset,
69+
@NotNull @NonNull final List<WarcRecordType> requiredRecordTypes) {
70+
return Flux.fromStream(() -> WarcRecordStreamFactory.streamOf(warcFileLocation, charset, requiredRecordTypes));
71+
}
72+
73+
public static <T extends WarcContentBlock> Flux<WarcRecord<T>> buildWarcRecordFlux(@NotNull @NonNull final InputStream warcFileLocation,
74+
@NotNull @NonNull final Charset charset,
75+
final boolean compressed) {
76+
return Flux.fromStream(() -> WarcRecordStreamFactory.streamOf(warcFileLocation, charset, compressed));
77+
}
78+
79+
public static <T extends WarcContentBlock> Flux<WarcRecord<T>> buildWarcRecordFlux(@NotNull @NonNull final InputStream warcFileLocation,
80+
@NotNull @NonNull final Charset charset,
81+
final boolean compressed,
82+
@NotNull @NonNull final List<WarcRecordType> requiredRecordTypes) {
83+
return Flux.fromStream(() -> WarcRecordStreamFactory.streamOf(warcFileLocation, charset, compressed, requiredRecordTypes));
84+
}
85+
}

java-warc/build.gradle

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,4 @@
11
dependencies {
2-
implementation group: 'org.jetbrains', name: 'annotations', version: '22.0.0'
3-
42
implementation group: 'org.apache.httpcomponents', name: 'httpcore', version: '4.4.14'
53
implementation group: 'commons-io', name: 'commons-io', version: '2.11.0'
64
implementation group: 'org.slf4j', name: 'slf4j-api', version: '1.7.32'

java-warc/src/main/java/com/github/bottomlessarchive/warc/service/WarcRecordStreamFactory.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -90,20 +90,22 @@ public static <T extends WarcContentBlock> Stream<WarcRecord<T>> streamOf(@NotNu
9090
}
9191

9292
public static <T extends WarcContentBlock> Stream<WarcRecord<T>> streamOf(@NotNull @NonNull final InputStream warcFileLocation,
93-
@NotNull @NonNull final Charset charset, @NotNull @NonNull final List<WarcRecordType> requiredRecordTypes) {
93+
@NotNull @NonNull final Charset charset,
94+
@NotNull @NonNull final List<WarcRecordType> requiredRecordTypes) {
9495
return streamOf(new BufferedInputStream(warcFileLocation), charset, true, requiredRecordTypes);
9596
}
9697

9798
public static <T extends WarcContentBlock> Stream<WarcRecord<T>> streamOf(@NotNull @NonNull final InputStream inputStream,
98-
@NotNull @NonNull final Charset charset, final boolean compressed) {
99+
@NotNull @NonNull final Charset charset,
100+
final boolean compressed) {
99101
return streamOf(inputStream, charset, compressed, EVERY_WARC_RECORD_TYPE);
100102
}
101103

102104
@SuppressWarnings("unchecked")
103-
public static <T extends WarcContentBlock> Stream<WarcRecord<T>> streamOf(
104-
@NotNull @NonNull final InputStream inputStream,
105-
@NotNull @NonNull final Charset charset, final boolean compressed,
106-
@NotNull @NonNull final List<WarcRecordType> requiredRecordTypes) {
105+
public static <T extends WarcContentBlock> Stream<WarcRecord<T>> streamOf(@NotNull @NonNull final InputStream inputStream,
106+
@NotNull @NonNull final Charset charset,
107+
final boolean compressed,
108+
@NotNull @NonNull final List<WarcRecordType> requiredRecordTypes) {
107109
final WarcReader warcReader = new WarcReader(inputStream, charset, compressed);
108110
return StreamSupport.stream(Spliterators.spliteratorUnknownSize(
109111
new SafeWarcRecordIterator(warcReader), Spliterator.ORDERED | Spliterator.NONNULL), false)

settings.gradle

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
11
rootProject.name = 'java-warc'
22

33
include 'java-warc'
4+
include 'java-warc-reactive'
5+

0 commit comments

Comments
 (0)