AbstractObservableStream.java
001 /*
002  * SPDX-License-Identifier: Apache-2.0
003  *
004  * Copyright 2008-2017 the original author or authors.
005  *
006  * Licensed under the Apache License, Version 2.0 (the "License");
007  * you may not use this file except in compliance with the License.
008  * You may obtain a copy of the License at
009  *
010  *     http://www.apache.org/licenses/LICENSE-2.0
011  *
012  * Unless required by applicable law or agreed to in writing, software
013  * distributed under the License is distributed on an "AS IS" BASIS,
014  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015  * See the License for the specific language governing permissions and
016  * limitations under the License.
017  */
018 package griffon.javafx.collections;
019 
020 import javafx.beans.Observable;
021 import javafx.beans.binding.BooleanBinding;
022 import javafx.beans.binding.ObjectBinding;
023 import javafx.beans.value.ObservableLongValue;
024 import javafx.beans.value.ObservableValue;
025 
026 import javax.annotation.Nonnull;
027 import javax.annotation.Nullable;
028 import java.util.ArrayList;
029 import java.util.Collections;
030 import java.util.Comparator;
031 import java.util.List;
032 import java.util.function.BiFunction;
033 import java.util.function.BinaryOperator;
034 import java.util.function.Function;
035 import java.util.function.Predicate;
036 import java.util.function.Supplier;
037 import java.util.stream.Stream;
038 
039 import static java.util.Objects.requireNonNull;
040 import static javafx.beans.binding.Bindings.createBooleanBinding;
041 import static javafx.beans.binding.Bindings.createObjectBinding;
042 
043 /**
044  @author Andres Almiray
045  @since 2.13.0
046  */
047 @SuppressWarnings("unchecked")
048 abstract class AbstractObservableStream<T> implements ObservableStream<T> {
049     protected static final String ERROR_PREDICATE_NULL = "Argument 'predicate' must not be null";
050     protected static final String ERROR_MAPPER_NULL = "Argument 'mapper' must not be null";
051     protected static final String ERROR_COMPARATOR_NULL = "Argument 'comparator' must not be null";
052     protected static final String ERROR_OBSERVABLE_NULL = "Argument 'observable' must not be null";
053     protected static final String ERROR_ACCUMULATOR = "Argument 'accumulator' must not be null";
054     protected static final String ERROR_SUPPLIER_NULL = "Argument 'supplier' must not be null";
055     protected static final String ERROR_COMBINER_NULL = "Argument 'combiner' must not be null";
056     protected static final String ERROR_IDENTITY_NULL = "Argument 'identity' must not be null";
057 
058     protected final Observable observable;
059     protected final List<StreamOp> operations = new ArrayList<>();
060 
061     AbstractObservableStream(@Nonnull Observable observable, @Nonnull List<StreamOp> operations) {
062         this.observable = requireNonNull(observable, ERROR_OBSERVABLE_NULL);
063         this.operations.addAll(operations);
064     }
065 
066     @Nonnull
067     protected abstract <E> ObservableStream<E> createInstance(@Nonnull List<StreamOp> operations);
068 
069     @Nonnull
070     protected abstract Stream createStream();
071 
072     @Nonnull
073     @Override
074     public ObservableStream<T> limit(final long maxSize) {
075         return createInstance(push(operations, new StreamOpAdapter() {
076             @Nonnull
077             @Override
078             public Stream apply(@Nonnull Stream stream) {
079                 return stream.limit(maxSize);
080             }
081         }));
082     }
083 
084     @Nonnull
085     @Override
086     public ObservableStream<T> limit(@Nonnull final ObservableLongValue maxSize) {
087         requireNonNull(maxSize, ERROR_OBSERVABLE_NULL);
088         return createInstance(push(operations, new StreamOp() {
089             @Nonnull
090             @Override
091             public Stream apply(@Nonnull Stream stream) {
092                 return stream.limit(maxSize.get());
093             }
094 
095             @Nullable
096             @Override
097             public Observable dependency() {
098                 return maxSize;
099             }
100         }));
101     }
102 
103     @Nonnull
104     @Override
105     public ObservableStream<T> skip(final long n) {
106         return createInstance(push(operations, new StreamOpAdapter() {
107             @Nonnull
108             @Override
109             public Stream apply(@Nonnull Stream stream) {
110                 return stream.skip(n);
111             }
112         }));
113     }
114 
115     @Nonnull
116     @Override
117     public ObservableStream<T> skip(@Nonnull final ObservableLongValue n) {
118         requireNonNull(n, ERROR_OBSERVABLE_NULL);
119         return createInstance(push(operations, new StreamOp() {
120             @Nonnull
121             @Override
122             public Stream apply(@Nonnull Stream stream) {
123                 return stream.skip(n.get());
124             }
125 
126             @Nullable
127             @Override
128             public Observable dependency() {
129                 return n;
130             }
131         }));
132     }
133 
134     @Nonnull
135     @Override
136     public ObservableStream<T> distinct() {
137         return createInstance(push(operations, new StreamOpAdapter() {
138             @Nonnull
139             @Override
140             public Stream apply(@Nonnull Stream stream) {
141                 return stream.distinct();
142             }
143         }));
144     }
145 
146     @Nonnull
147     @Override
148     public ObservableStream<T> sorted() {
149         return createInstance(push(operations, new StreamOpAdapter() {
150             @Nonnull
151             @Override
152             public Stream apply(@Nonnull Stream stream) {
153                 return stream.sorted();
154             }
155         }));
156     }
157 
158     @Nonnull
159     @Override
160     public ObservableStream<T> sorted(@Nonnull final Comparator<? super T> comparator) {
161         requireNonNull(comparator, ERROR_COMPARATOR_NULL);
162         return createInstance(push(operations, new StreamOpAdapter() {
163             @Nonnull
164             @Override
165             public Stream apply(@Nonnull Stream stream) {
166                 return stream.sorted(comparator);
167             }
168         }));
169     }
170 
171     @Nonnull
172     @Override
173     public ObservableStream<T> sorted(@Nonnull final ObservableValue<Comparator<? super T>> comparator) {
174         requireNonNull(comparator, ERROR_COMPARATOR_NULL);
175         return createInstance(push(operations, new StreamOp() {
176             @Nonnull
177             @Override
178             public Stream apply(@Nonnull Stream stream) {
179                 Comparator<? super T> c = comparator.getValue();
180                 requireNonNull(c, ERROR_COMPARATOR_NULL);
181                 return stream.sorted(c);
182             }
183 
184             @Nullable
185             @Override
186             public Observable dependency() {
187                 return comparator;
188             }
189         }));
190     }
191 
192     @Nonnull
193     public ObservableStream<T> filter(@Nonnull final Predicate<? super T> predicate) {
194         requireNonNull(predicate, ERROR_PREDICATE_NULL);
195         return createInstance(push(operations, new StreamOpAdapter() {
196             @Nonnull
197             @Override
198             public Stream apply(@Nonnull Stream stream) {
199                 return stream.filter(predicate);
200             }
201         }));
202     }
203 
204     @Nonnull
205     @Override
206     public <R> ObservableStream<R> map(@Nonnull final Function<? super T, ? extends R> mapper) {
207         requireNonNull(mapper, ERROR_MAPPER_NULL);
208         return createInstance(push(operations, new StreamOpAdapter() {
209             @Nonnull
210             @Override
211             public Stream apply(@Nonnull Stream stream) {
212                 return stream.map(mapper);
213             }
214         }));
215     }
216 
217     @Nonnull
218     @Override
219     public <R> ObservableStream<R> flatMap(@Nonnull final Function<? super T, ? extends ObservableStream<? extends R>> mapper) {
220         requireNonNull(mapper, ERROR_MAPPER_NULL);
221         return createInstance(push(operations, new StreamOpAdapter() {
222             @Nonnull
223             @Override
224             public Stream apply(@Nonnull Stream stream) {
225                 return stream.flatMap(mapper);
226             }
227         }));
228     }
229 
230     @Nonnull
231     @Override
232     public ObservableStream<T> filter(@Nonnull final ObservableValue<Predicate<? super T>> predicate) {
233         requireNonNull(predicate, ERROR_PREDICATE_NULL);
234         return createInstance(push(operations, new StreamOp() {
235             @Nonnull
236             @Override
237             public Stream apply(@Nonnull Stream stream) {
238                 Predicate<? super T> p = predicate.getValue();
239                 requireNonNull(p, ERROR_PREDICATE_NULL);
240                 return stream.filter(p);
241             }
242 
243             @Nullable
244             @Override
245             public Observable dependency() {
246                 return predicate;
247             }
248         }));
249     }
250 
251     @Nonnull
252     @Override
253     public <R> ObservableStream<R> map(@Nonnull final ObservableValue<Function<? super T, ? extends R>> mapper) {
254         requireNonNull(mapper, ERROR_MAPPER_NULL);
255         return createInstance(push(operations, new StreamOp() {
256             @Nonnull
257             @Override
258             public Stream apply(@Nonnull Stream stream) {
259                 Function<? super T, ? extends R> m = mapper.getValue();
260                 requireNonNull(m, ERROR_MAPPER_NULL);
261                 return stream.map(m);
262             }
263 
264             @Nullable
265             @Override
266             public Observable dependency() {
267                 return mapper;
268             }
269         }));
270     }
271 
272     @Nonnull
273     @Override
274     public <R> ObservableStream<R> flatMap(@Nonnull final ObservableValue<Function<? super T, ? extends ObservableStream<? extends R>>> mapper) {
275         requireNonNull(mapper, ERROR_MAPPER_NULL);
276         return createInstance(push(operations, new StreamOp() {
277             @Nonnull
278             @Override
279             public Stream apply(@Nonnull Stream stream) {
280                 Function<? super T, ? extends ObservableStream<? extends R>> m = mapper.getValue();
281                 requireNonNull(m, ERROR_MAPPER_NULL);
282                 return stream.flatMap(m);
283             }
284 
285             @Nullable
286             @Override
287             public Observable dependency() {
288                 return mapper;
289             }
290         }));
291     }
292 
293     @Nonnull
294     @Override
295     public ObjectBinding<T> reduce(@Nonnull final BinaryOperator<T> accumulator) {
296         requireNonNull(accumulator, ERROR_ACCUMULATOR);
297         return createObjectBinding(() -> (Tstream().reduce(accumulator).orElse(null), dependencies());
298     }
299 
300     @Nonnull
301     @Override
302     public ObjectBinding<T> reduce(@Nullable final T defaultValue, @Nonnull final BinaryOperator<T> accumulator) {
303         requireNonNull(accumulator, ERROR_ACCUMULATOR);
304         return createObjectBinding(() -> (Tstream().reduce(accumulator).orElse(defaultValue), dependencies());
305     }
306 
307     @Nonnull
308     @Override
309     public ObjectBinding<T> reduce(@Nonnull final Supplier<T> supplier, @Nonnull final BinaryOperator<T> accumulator) {
310         requireNonNull(supplier, ERROR_SUPPLIER_NULL);
311         requireNonNull(accumulator, ERROR_ACCUMULATOR);
312         return createObjectBinding(() -> (Tstream().reduce(accumulator).orElseGet(supplier), dependencies());
313     }
314 
315     @Nonnull
316     @Override
317     public <U> ObjectBinding<U> reduce(@Nullable final U identity, @Nonnull final BiFunction<U, ? super T, U> accumulator, @Nonnull final BinaryOperator<U> combiner) {
318         requireNonNull(combiner, ERROR_COMBINER_NULL);
319         return createObjectBinding(() -> (Ustream().reduce(identity, accumulator, combiner), dependencies());
320     }
321 
322     @Nonnull
323     @Override
324     public ObjectBinding<T> reduce(@Nonnull final ObservableValue<BinaryOperator<T>> accumulator) {
325         requireNonNull(accumulator, ERROR_ACCUMULATOR);
326         return createObjectBinding(() -> {
327             BinaryOperator<T> a = accumulator.getValue();
328             requireNonNull(a, ERROR_ACCUMULATOR);
329             return (Tstream().reduce(a).orElse(null);
330         }, dependencies(accumulator));
331     }
332 
333     @Nonnull
334     @Override
335     public ObjectBinding<T> reduce(@Nullable final T defaultValue, @Nonnull final ObservableValue<BinaryOperator<T>> accumulator) {
336         requireNonNull(accumulator, ERROR_ACCUMULATOR);
337         return createObjectBinding(() -> {
338             BinaryOperator<T> a = accumulator.getValue();
339             requireNonNull(a, ERROR_ACCUMULATOR);
340             return (Tstream().reduce(a).orElse(defaultValue);
341         }, dependencies(accumulator));
342     }
343 
344     @Nonnull
345     @Override
346     public ObjectBinding<T> reduce(@Nonnull final Supplier<T> supplier, @Nonnull final ObservableValue<BinaryOperator<T>> accumulator) {
347         requireNonNull(supplier, ERROR_SUPPLIER_NULL);
348         requireNonNull(accumulator, ERROR_ACCUMULATOR);
349         return createObjectBinding(() -> {
350             BinaryOperator<T> a = accumulator.getValue();
351             requireNonNull(a, ERROR_ACCUMULATOR);
352             return (Tstream().reduce(a).orElseGet(supplier);
353         }, dependencies(accumulator));
354     }
355 
356     @Nonnull
357     @Override
358     public <U> ObjectBinding<U> reduce(@Nonnull final ObservableValue<U> identity, @Nonnull final ObservableValue<BiFunction<U, ? super T, U>> accumulator, @Nonnull final ObservableValue<BinaryOperator<U>> combiner) {
359         requireNonNull(identity, ERROR_IDENTITY_NULL);
360         requireNonNull(accumulator, ERROR_ACCUMULATOR);
361         requireNonNull(combiner, ERROR_COMBINER_NULL);
362         return createObjectBinding(() -> {
363             U i = identity.getValue();
364             requireNonNull(i, ERROR_IDENTITY_NULL);
365             BiFunction<U, ? super T, U> a = accumulator.getValue();
366             requireNonNull(a, ERROR_ACCUMULATOR);
367             BinaryOperator<U> c = combiner.getValue();
368             requireNonNull(c, ERROR_COMBINER_NULL);
369             return (Ustream().reduce(i, a, c);
370         }, dependencies(identity, accumulator, combiner));
371     }
372 
373     @Nonnull
374     @Override
375     public ObjectBinding<T> min(@Nonnull final Comparator<? super T> comparator) {
376         requireNonNull(comparator, ERROR_COMPARATOR_NULL);
377         return createObjectBinding(() -> (Tstream().min(comparator).orElse(null), dependencies());
378     }
379 
380     @Nonnull
381     @Override
382     public ObjectBinding<T> max(@Nonnull final Comparator<? super T> comparator) {
383         requireNonNull(comparator, ERROR_COMPARATOR_NULL);
384         return createObjectBinding(() -> (Tstream().max(comparator).orElse(null), dependencies());
385     }
386 
387     @Nonnull
388     @Override
389     public ObjectBinding<T> min(@Nullable final T defaultValue, @Nonnull final Comparator<? super T> comparator) {
390         requireNonNull(comparator, ERROR_COMPARATOR_NULL);
391         return createObjectBinding(() -> (Tstream().min(comparator).orElse(defaultValue), dependencies());
392     }
393 
394     @Nonnull
395     @Override
396     public ObjectBinding<T> max(@Nullable final T defaultValue, @Nonnull final Comparator<? super T> comparator) {
397         requireNonNull(comparator, ERROR_COMPARATOR_NULL);
398         return createObjectBinding(() -> (Tstream().max(comparator).orElse(defaultValue), dependencies());
399     }
400 
401     @Nonnull
402     @Override
403     public ObjectBinding<T> min(@Nonnull final Supplier<T> supplier, @Nonnull final Comparator<? super T> comparator) {
404         requireNonNull(comparator, ERROR_COMPARATOR_NULL);
405         requireNonNull(supplier, ERROR_SUPPLIER_NULL);
406         return createObjectBinding(() -> (Tstream().min(comparator).orElseGet(supplier), dependencies());
407     }
408 
409     @Nonnull
410     @Override
411     public ObjectBinding<T> max(@Nonnull final Supplier<T> supplier, @Nonnull final Comparator<? super T> comparator) {
412         requireNonNull(supplier, ERROR_SUPPLIER_NULL);
413         requireNonNull(comparator, ERROR_COMPARATOR_NULL);
414         return createObjectBinding(() -> (Tstream().max(comparator).orElseGet(supplier), dependencies());
415     }
416 
417     @Nonnull
418     @Override
419     public BooleanBinding anyMatch(@Nonnull final Predicate<? super T> predicate) {
420         requireNonNull(predicate, ERROR_PREDICATE_NULL);
421         return createBooleanBinding(() -> stream().anyMatch(predicate), dependencies());
422     }
423 
424     @Nonnull
425     @Override
426     public BooleanBinding allMatch(@Nonnull final Predicate<? super T> predicate) {
427         requireNonNull(predicate, ERROR_PREDICATE_NULL);
428         return createBooleanBinding(() -> stream().allMatch(predicate), dependencies());
429     }
430 
431     @Nonnull
432     @Override
433     public BooleanBinding noneMatch(@Nonnull final Predicate<? super T> predicate) {
434         requireNonNull(predicate, ERROR_PREDICATE_NULL);
435         return createBooleanBinding(() -> stream().noneMatch(predicate), dependencies());
436     }
437 
438     @Nonnull
439     @Override
440     public ObjectBinding<T> min(@Nonnull final ObservableValue<Comparator<? super T>> comparator) {
441         requireNonNull(comparator, ERROR_COMPARATOR_NULL);
442         return createObjectBinding(() -> {
443             Comparator<? super T> c = comparator.getValue();
444             requireNonNull(c, ERROR_COMPARATOR_NULL);
445             return (Tstream().min(c).orElse(null);
446         }, dependencies(comparator));
447     }
448 
449     @Nonnull
450     @Override
451     public ObjectBinding<T> max(@Nonnull final ObservableValue<Comparator<? super T>> comparator) {
452         requireNonNull(comparator, ERROR_COMPARATOR_NULL);
453         return createObjectBinding(() -> {
454             Comparator<? super T> c = comparator.getValue();
455             requireNonNull(c, ERROR_COMPARATOR_NULL);
456             return (Tstream().max(c).orElse(null);
457         }, dependencies(comparator));
458     }
459 
460     @Nonnull
461     @Override
462     public ObjectBinding<T> min(@Nullable final T defaultValue, @Nonnull final ObservableValue<Comparator<? super T>> comparator) {
463         requireNonNull(comparator, ERROR_COMPARATOR_NULL);
464         return createObjectBinding(() -> {
465             Comparator<? super T> c = comparator.getValue();
466             requireNonNull(c, ERROR_COMPARATOR_NULL);
467             return (Tstream().min(c).orElse(defaultValue);
468         }, dependencies(comparator));
469     }
470 
471     @Nonnull
472     @Override
473     public ObjectBinding<T> max(@Nullable final T defaultValue, @Nonnull final ObservableValue<Comparator<? super T>> comparator) {
474         requireNonNull(comparator, ERROR_COMPARATOR_NULL);
475         return createObjectBinding(() -> {
476             Comparator<? super T> c = comparator.getValue();
477             requireNonNull(c, ERROR_COMPARATOR_NULL);
478             return (Tstream().max(c).orElse(defaultValue);
479         }, dependencies(comparator));
480     }
481 
482     @Nonnull
483     @Override
484     public ObjectBinding<T> min(@Nonnull final Supplier<T> supplier, @Nonnull final ObservableValue<Comparator<? super T>> comparator) {
485         requireNonNull(comparator, ERROR_COMPARATOR_NULL);
486         requireNonNull(supplier, ERROR_SUPPLIER_NULL);
487         return createObjectBinding(() -> {
488             Comparator<? super T> c = comparator.getValue();
489             requireNonNull(c, ERROR_COMPARATOR_NULL);
490             return (Tstream().min(c).orElseGet(supplier);
491         }, dependencies(comparator));
492     }
493 
494     @Nonnull
495     @Override
496     public ObjectBinding<T> max(@Nonnull final Supplier<T> supplier, @Nonnull final ObservableValue<Comparator<? super T>> comparator) {
497         requireNonNull(comparator, ERROR_COMPARATOR_NULL);
498         requireNonNull(supplier, ERROR_SUPPLIER_NULL);
499         return createObjectBinding(() -> {
500             Comparator<? super T> c = comparator.getValue();
501             requireNonNull(c, ERROR_COMPARATOR_NULL);
502             return (Tstream().max(c).orElseGet(supplier);
503         }, dependencies(comparator));
504     }
505 
506     @Nonnull
507     @Override
508     public BooleanBinding anyMatch(@Nonnull final ObservableValue<Predicate<? super T>> predicate) {
509         requireNonNull(predicate, ERROR_PREDICATE_NULL);
510         return createBooleanBinding(() -> {
511             Predicate<? super T> p = predicate.getValue();
512             requireNonNull(p, ERROR_PREDICATE_NULL);
513             return stream().anyMatch(p);
514         }, dependencies(predicate));
515     }
516 
517     @Nonnull
518     @Override
519     public BooleanBinding allMatch(@Nonnull final ObservableValue<Predicate<? super T>> predicate) {
520         requireNonNull(predicate, ERROR_PREDICATE_NULL);
521         return createBooleanBinding(() -> {
522             Predicate<? super T> p = predicate.getValue();
523             requireNonNull(p, ERROR_PREDICATE_NULL);
524             return stream().allMatch(p);
525         }, dependencies(predicate));
526     }
527 
528     @Nonnull
529     @Override
530     public BooleanBinding noneMatch(@Nonnull final ObservableValue<Predicate<? super T>> predicate) {
531         requireNonNull(predicate, ERROR_PREDICATE_NULL);
532         return createBooleanBinding(() -> {
533             Predicate<? super T> p = predicate.getValue();
534             requireNonNull(p, ERROR_PREDICATE_NULL);
535             return stream().noneMatch(p);
536         }, dependencies(predicate));
537     }
538 
539     @Nonnull
540     @Override
541     public ObjectBinding<T> findFirst() {
542         return createObjectBinding(() -> (Tstream().findFirst().orElse(null), dependencies());
543     }
544 
545     @Nonnull
546     @Override
547     public ObjectBinding<T> findFirst(@Nullable final T defaultValue) {
548         return createObjectBinding(() -> (Tstream().findFirst().orElse(defaultValue), dependencies());
549     }
550 
551     @Nonnull
552     @Override
553     public ObjectBinding<T> findFirst(@Nonnull final Supplier<T> supplier) {
554         requireNonNull(supplier, ERROR_SUPPLIER_NULL);
555         return createObjectBinding(() -> (Tstream().findFirst().orElseGet(supplier), dependencies());
556     }
557 
558     @Nonnull
559     @Override
560     public ObjectBinding<T> findAny() {
561         return createObjectBinding(() -> (Tstream().findAny().orElse(null), dependencies());
562     }
563 
564     @Nonnull
565     @Override
566     public ObjectBinding<T> findAny(@Nullable final T defaultValue) {
567         return createObjectBinding(() -> (Tstream().findAny().orElse(defaultValue), dependencies());
568     }
569 
570     @Nonnull
571     @Override
572     public ObjectBinding<T> findAny(@Nonnull final Supplier<T> supplier) {
573         requireNonNull(supplier, ERROR_SUPPLIER_NULL);
574         return createObjectBinding(() -> (Tstream().findAny().orElseGet(supplier), dependencies());
575     }
576 
577     @Nonnull
578     private Stream stream() {
579         Stream stream = createStream();
580 
581         for (StreamOp op : operations) {
582             stream = op.apply(stream);
583         }
584 
585         return stream;
586     }
587 
588     @Nonnull
589     private Observable[] dependencies(Observable... deps) {
590         List<Observable> dependencies = new ArrayList<>();
591         dependencies.add(observable);
592         if (deps != null) {
593             Collections.addAll(dependencies, deps);
594         }
595 
596         for (StreamOp op : operations) {
597             Observable dependency = op.dependency();
598             if (dependency != null) {
599                 dependencies.add(dependency);
600             }
601         }
602 
603         return dependencies.toArray(new Observable[dependencies.size()]);
604     }
605 
606     private static List<StreamOp> push(List<StreamOp> operations, StreamOp op) {
607         List<StreamOp> ops = new ArrayList<>(operations);
608         ops.add(op);
609         return ops;
610     }
611 
612     interface StreamOp {
613         @Nonnull
614         Stream apply(@Nonnull Stream stream);
615 
616         @Nullable
617         Observable dependency();
618     }
619 
620     static class StreamOpAdapter implements StreamOp {
621         @Nonnull
622         @Override
623         public Stream apply(@Nonnull Stream stream) {
624             return stream;
625         }
626 
627         @Nullable
628         @Override
629         public Observable dependency() {
630             return null;
631         }
632     }
633 }