Skip to content

Commit 0c3ac0b

Browse files
committed
Async version of Query and Traversal. Filter, Map, FlatMap, Fork, Of, SKip and TakeWhile.
1 parent c44af83 commit 0c3ac0b

13 files changed

+781
-0
lines changed
Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
/*
2+
* Copyright (c) 2020, Fernando Miguel Carvalho, mcarvalho@cc.isel.ipl.pt
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.jayield;
18+
19+
import org.jayield.async.AsyncQueryFilter;
20+
import org.jayield.async.AsyncQueryFlatMapConcat;
21+
import org.jayield.async.AsyncQueryFlatMapMerge;
22+
import org.jayield.async.AsyncQueryFork;
23+
import org.jayield.async.AsyncQueryMap;
24+
import org.jayield.async.AsyncQueryOf;
25+
import org.jayield.async.AsyncQueryOfIterator;
26+
import org.jayield.async.AsyncQueryOnNext;
27+
import org.jayield.async.AsyncQuerySkip;
28+
import org.jayield.async.AsyncQueryTakeWhile;
29+
30+
import java.util.Iterator;
31+
import java.util.function.BiConsumer;
32+
import java.util.function.Function;
33+
import java.util.function.Predicate;
34+
35+
/**
36+
* An asynchronous sequence of elements supporting sequential operations.
37+
* Query operations are composed into a pipeline to perform computation.
38+
*
39+
* @author Miguel Gamboa
40+
* created on 07-07-2020
41+
*/
42+
public abstract class AsyncQuery<T> implements AsyncTraverser<T>{
43+
44+
/**
45+
* Returns an asynchronous sequential ordered query whose elements
46+
* are the specified values in data parameter.
47+
*/
48+
public static <U> AsyncQuery<U> of(U...data) {
49+
return new AsyncQueryOf<>(data);
50+
}
51+
/**
52+
* Returns an asynchronous sequential ordered query whose elements
53+
* are the specified values in the Iterator parameter.
54+
*/
55+
public static <U> AsyncQuery<U> of(Iterator<U> iter) {
56+
return new AsyncQueryOfIterator<>(iter);
57+
}
58+
/**
59+
* Returns an asynchronous sequential ordered query whose elements
60+
* are the specified values in data parameter running on thread pool.
61+
*/
62+
public static <U> AsyncQuery<U> fork(U...data) {
63+
return new AsyncQueryFork<>(data);
64+
}
65+
66+
/**
67+
* Returns a new asynchronous query emitting the same items of this query,
68+
* additionally performing the provided action on each element as elements are consumed
69+
* from the resulting query.
70+
*/
71+
public final AsyncQuery<T> onNext(BiConsumer<? super T, ? super Throwable> action) {
72+
return new AsyncQueryOnNext<>(this, action);
73+
}
74+
75+
/**
76+
* Returns a new asynchronous query consisting of the remaining elements of
77+
* this query after discarding the first {@code n} elements of the query.
78+
*/
79+
public final AsyncQuery<T> skip(int n) {
80+
return new AsyncQuerySkip<>(this, n);
81+
}
82+
83+
/**
84+
* Returns an asynchronous query consisting of the elements of this query that match
85+
* the given predicate.
86+
*/
87+
public final AsyncQuery<T> filter(Predicate<? super T> p) {
88+
return new AsyncQueryFilter<>(this, p);
89+
}
90+
91+
/**
92+
* Returns an asynchronous query consisting of the results of applying the given
93+
* function to the elements of this query.
94+
*/
95+
public final <R> AsyncQuery<R> map(Function<? super T,? extends R> mapper) {
96+
return new AsyncQueryMap<>(this, mapper);
97+
}
98+
99+
/**
100+
* Returns a query consisting of the longest prefix of elements taken from
101+
* this query that match the given predicate.
102+
*/
103+
public final AsyncQuery<T> takeWhile(Predicate<? super T> predicate){
104+
return new AsyncQueryTakeWhile<>(this, predicate);
105+
}
106+
/**
107+
* Returns an asynchronous query consisting of the results of replacing each element of
108+
* this query with the contents of a mapped query produced by applying
109+
* the provided mapping function to each element.
110+
* It waits for the inner flow to complete before starting to collect the next one.
111+
*/
112+
public final <R> AsyncQuery<R> flatMapConcat(Function<? super T,? extends AsyncQuery<? extends R>> mapper) {
113+
return new AsyncQueryFlatMapConcat<>(this, mapper);
114+
}
115+
116+
public final <R> AsyncQuery<R> flatMapMerge(Function<? super T,? extends AsyncQuery<? extends R>> mapper) {
117+
return new AsyncQueryFlatMapMerge<>(this, mapper);
118+
}
119+
120+
public final void blockingSubscribe() {
121+
this
122+
.subscribe((item, err) -> { })
123+
.join(); // In both previous cases cf will raise an exception.
124+
}
125+
}
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Copyright (c) 2018, Fernando Miguel Carvalho, mcarvalho@cc.isel.ipl.pt
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.jayield;
18+
19+
import java.util.concurrent.CompletableFuture;
20+
import java.util.function.BiConsumer;
21+
22+
import static java.util.concurrent.CompletableFuture.supplyAsync;
23+
24+
/**
25+
* Asynchronous traversal.
26+
* Jayield uses traverse method as its first choice to
27+
* implement AsyncQuery operations.
28+
* This is a special kind of traversal that disallows individually access.
29+
*/
30+
public interface AsyncTraverser<T> {
31+
/**
32+
* Yields elements sequentially until all elements have been
33+
* processed or an exception is thrown.
34+
* The given consumer is invoked with the result (or null if none)
35+
* and the exception (or null if none).
36+
*
37+
* @return A CompletableFuture to signal finish to enable cancellation
38+
* through its cancel() method.
39+
*/
40+
CompletableFuture<Void> subscribe(BiConsumer<? super T,? super Throwable> cons);
41+
}
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* Copyright (c) 2020, Fernando Miguel Carvalho, mcarvalho@cc.isel.ipl.pt
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.jayield.async;
18+
19+
import org.jayield.AsyncQuery;
20+
21+
import java.util.concurrent.CompletableFuture;
22+
import java.util.function.BiConsumer;
23+
import java.util.function.Predicate;
24+
25+
public class AsyncQueryFilter<T> extends AsyncQuery<T> {
26+
private final AsyncQuery<T> upstream;
27+
private final Predicate<? super T> p;
28+
29+
public AsyncQueryFilter(AsyncQuery<T> upstream, Predicate<? super T> p) {
30+
this.upstream = upstream;
31+
this.p = p;
32+
}
33+
34+
@Override
35+
public CompletableFuture<Void> subscribe(BiConsumer<? super T, ? super Throwable> cons) {
36+
return upstream.subscribe((item, err) -> {
37+
if(err != null) {
38+
cons.accept(null, err);
39+
return;
40+
}
41+
if(p.test(item)) cons.accept(item, null);
42+
});
43+
}
44+
}
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* Copyright (c) 2020, Fernando Miguel Carvalho, mcarvalho@cc.isel.ipl.pt
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.jayield.async;
18+
19+
import org.jayield.AsyncQuery;
20+
21+
import java.util.concurrent.CompletableFuture;
22+
import java.util.function.BiConsumer;
23+
import java.util.function.Function;
24+
25+
public class AsyncQueryFlatMapConcat<T, R> extends AsyncQuery<R> {
26+
private final AsyncQuery<T> upstream;
27+
private final Function<? super T, ? extends AsyncQuery<? extends R>> mapper;
28+
29+
30+
public AsyncQueryFlatMapConcat(AsyncQuery<T> upstream, Function<? super T, ? extends AsyncQuery<? extends R>> mapper) {
31+
this.upstream = upstream;
32+
this.mapper = mapper;
33+
}
34+
35+
@Override
36+
public CompletableFuture<Void> subscribe(BiConsumer<? super R, ? super Throwable> cons) {
37+
return upstream.subscribe((item, err) -> {
38+
if(err != null) {
39+
cons.accept(null, err);
40+
return;
41+
}
42+
mapper
43+
.apply(item)
44+
.subscribe(cons::accept)
45+
.join(); // !!!! Replace this by Continuation !!!
46+
});
47+
}
48+
}
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Copyright (c) 2020, Fernando Miguel Carvalho, mcarvalho@cc.isel.ipl.pt
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.jayield.async;
18+
19+
import org.jayield.AsyncQuery;
20+
21+
import java.util.ArrayList;
22+
import java.util.List;
23+
import java.util.concurrent.CompletableFuture;
24+
import java.util.function.BiConsumer;
25+
import java.util.function.Function;
26+
27+
public class AsyncQueryFlatMapMerge<T, R> extends AsyncQuery<R> {
28+
private final AsyncQuery<T> upstream;
29+
private final Function<? super T, ? extends AsyncQuery<? extends R>> mapper;
30+
31+
32+
public AsyncQueryFlatMapMerge(AsyncQuery<T> upstream, Function<? super T, ? extends AsyncQuery<? extends R>> mapper) {
33+
this.upstream = upstream;
34+
this.mapper = mapper;
35+
}
36+
37+
@Override
38+
public CompletableFuture<Void> subscribe(BiConsumer<? super R, ? super Throwable> cons) {
39+
List<CompletableFuture<Void>> cfs = new ArrayList<>();
40+
return upstream
41+
.subscribe((item, err) -> {
42+
if (err != null) {
43+
cons.accept(null, err);
44+
return;
45+
}
46+
cfs.add(mapper
47+
.apply(item)
48+
.subscribe(cons::accept));
49+
})
50+
.thenCompose(ignore -> CompletableFuture.allOf(cfs.toArray(new CompletableFuture[cfs.size()])));
51+
}
52+
}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Copyright (c) 2020, Fernando Miguel Carvalho, mcarvalho@cc.isel.ipl.pt
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.jayield.async;
18+
19+
import org.jayield.AsyncQuery;
20+
import org.jayield.Query;
21+
22+
import java.util.concurrent.CompletableFuture;
23+
import java.util.function.BiConsumer;
24+
25+
import static java.util.concurrent.CompletableFuture.runAsync;
26+
27+
public class AsyncQueryFork<U> extends AsyncQuery<U> {
28+
private final U[] data;
29+
30+
public AsyncQueryFork(U[] data) {
31+
this.data = data;
32+
}
33+
34+
@Override
35+
public CompletableFuture<Void> subscribe(BiConsumer<? super U, ? super Throwable> cons) {
36+
return runAsync(() -> Query
37+
.of(data)
38+
.traverse(item -> cons.accept(item, null)));
39+
}
40+
}

0 commit comments

Comments
 (0)