001 /*
002 * SPDX-License-Identifier: Apache-2.0
003 *
004 * Copyright 2008-2017 the original author or authors.
005 *
006 * Licensed under the Apache License, Version 2.0 (the "License");
007 * you may not use this file except in compliance with the License.
008 * You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package griffon.javafx.collections;
019
020 import javafx.beans.Observable;
021 import javafx.beans.binding.BooleanBinding;
022 import javafx.beans.binding.ObjectBinding;
023 import javafx.beans.value.ObservableLongValue;
024 import javafx.beans.value.ObservableValue;
025
026 import javax.annotation.Nonnull;
027 import javax.annotation.Nullable;
028 import java.util.ArrayList;
029 import java.util.Collections;
030 import java.util.Comparator;
031 import java.util.List;
032 import java.util.function.BiFunction;
033 import java.util.function.BinaryOperator;
034 import java.util.function.Function;
035 import java.util.function.Predicate;
036 import java.util.function.Supplier;
037 import java.util.stream.Stream;
038
039 import static java.util.Objects.requireNonNull;
040 import static javafx.beans.binding.Bindings.createBooleanBinding;
041 import static javafx.beans.binding.Bindings.createObjectBinding;
042
043 /**
044 * @author Andres Almiray
045 * @since 2.13.0
046 */
047 @SuppressWarnings("unchecked")
048 abstract class AbstractObservableStream<T> implements ObservableStream<T> {
049 protected static final String ERROR_PREDICATE_NULL = "Argument 'predicate' must not be null";
050 protected static final String ERROR_MAPPER_NULL = "Argument 'mapper' must not be null";
051 protected static final String ERROR_COMPARATOR_NULL = "Argument 'comparator' must not be null";
052 protected static final String ERROR_OBSERVABLE_NULL = "Argument 'observable' must not be null";
053 protected static final String ERROR_ACCUMULATOR = "Argument 'accumulator' must not be null";
054 protected static final String ERROR_SUPPLIER_NULL = "Argument 'supplier' must not be null";
055 protected static final String ERROR_COMBINER_NULL = "Argument 'combiner' must not be null";
056 protected static final String ERROR_IDENTITY_NULL = "Argument 'identity' must not be null";
057
058 protected final Observable observable;
059 protected final List<StreamOp> operations = new ArrayList<>();
060
061 AbstractObservableStream(@Nonnull Observable observable, @Nonnull List<StreamOp> operations) {
062 this.observable = requireNonNull(observable, ERROR_OBSERVABLE_NULL);
063 this.operations.addAll(operations);
064 }
065
066 @Nonnull
067 protected abstract <E> ObservableStream<E> createInstance(@Nonnull List<StreamOp> operations);
068
069 @Nonnull
070 protected abstract Stream createStream();
071
072 @Nonnull
073 @Override
074 public ObservableStream<T> limit(final long maxSize) {
075 return createInstance(push(operations, new StreamOpAdapter() {
076 @Nonnull
077 @Override
078 public Stream apply(@Nonnull Stream stream) {
079 return stream.limit(maxSize);
080 }
081 }));
082 }
083
084 @Nonnull
085 @Override
086 public ObservableStream<T> limit(@Nonnull final ObservableLongValue maxSize) {
087 requireNonNull(maxSize, ERROR_OBSERVABLE_NULL);
088 return createInstance(push(operations, new StreamOp() {
089 @Nonnull
090 @Override
091 public Stream apply(@Nonnull Stream stream) {
092 return stream.limit(maxSize.get());
093 }
094
095 @Nullable
096 @Override
097 public Observable dependency() {
098 return maxSize;
099 }
100 }));
101 }
102
103 @Nonnull
104 @Override
105 public ObservableStream<T> skip(final long n) {
106 return createInstance(push(operations, new StreamOpAdapter() {
107 @Nonnull
108 @Override
109 public Stream apply(@Nonnull Stream stream) {
110 return stream.skip(n);
111 }
112 }));
113 }
114
115 @Nonnull
116 @Override
117 public ObservableStream<T> skip(@Nonnull final ObservableLongValue n) {
118 requireNonNull(n, ERROR_OBSERVABLE_NULL);
119 return createInstance(push(operations, new StreamOp() {
120 @Nonnull
121 @Override
122 public Stream apply(@Nonnull Stream stream) {
123 return stream.skip(n.get());
124 }
125
126 @Nullable
127 @Override
128 public Observable dependency() {
129 return n;
130 }
131 }));
132 }
133
134 @Nonnull
135 @Override
136 public ObservableStream<T> distinct() {
137 return createInstance(push(operations, new StreamOpAdapter() {
138 @Nonnull
139 @Override
140 public Stream apply(@Nonnull Stream stream) {
141 return stream.distinct();
142 }
143 }));
144 }
145
146 @Nonnull
147 @Override
148 public ObservableStream<T> sorted() {
149 return createInstance(push(operations, new StreamOpAdapter() {
150 @Nonnull
151 @Override
152 public Stream apply(@Nonnull Stream stream) {
153 return stream.sorted();
154 }
155 }));
156 }
157
158 @Nonnull
159 @Override
160 public ObservableStream<T> sorted(@Nonnull final Comparator<? super T> comparator) {
161 requireNonNull(comparator, ERROR_COMPARATOR_NULL);
162 return createInstance(push(operations, new StreamOpAdapter() {
163 @Nonnull
164 @Override
165 public Stream apply(@Nonnull Stream stream) {
166 return stream.sorted(comparator);
167 }
168 }));
169 }
170
171 @Nonnull
172 @Override
173 public ObservableStream<T> sorted(@Nonnull final ObservableValue<Comparator<? super T>> comparator) {
174 requireNonNull(comparator, ERROR_COMPARATOR_NULL);
175 return createInstance(push(operations, new StreamOp() {
176 @Nonnull
177 @Override
178 public Stream apply(@Nonnull Stream stream) {
179 Comparator<? super T> c = comparator.getValue();
180 requireNonNull(c, ERROR_COMPARATOR_NULL);
181 return stream.sorted(c);
182 }
183
184 @Nullable
185 @Override
186 public Observable dependency() {
187 return comparator;
188 }
189 }));
190 }
191
192 @Nonnull
193 public ObservableStream<T> filter(@Nonnull final Predicate<? super T> predicate) {
194 requireNonNull(predicate, ERROR_PREDICATE_NULL);
195 return createInstance(push(operations, new StreamOpAdapter() {
196 @Nonnull
197 @Override
198 public Stream apply(@Nonnull Stream stream) {
199 return stream.filter(predicate);
200 }
201 }));
202 }
203
204 @Nonnull
205 @Override
206 public <R> ObservableStream<R> map(@Nonnull final Function<? super T, ? extends R> mapper) {
207 requireNonNull(mapper, ERROR_MAPPER_NULL);
208 return createInstance(push(operations, new StreamOpAdapter() {
209 @Nonnull
210 @Override
211 public Stream apply(@Nonnull Stream stream) {
212 return stream.map(mapper);
213 }
214 }));
215 }
216
217 @Nonnull
218 @Override
219 public <R> ObservableStream<R> flatMap(@Nonnull final Function<? super T, ? extends ObservableStream<? extends R>> mapper) {
220 requireNonNull(mapper, ERROR_MAPPER_NULL);
221 return createInstance(push(operations, new StreamOpAdapter() {
222 @Nonnull
223 @Override
224 public Stream apply(@Nonnull Stream stream) {
225 return stream.flatMap(mapper);
226 }
227 }));
228 }
229
230 @Nonnull
231 @Override
232 public ObservableStream<T> filter(@Nonnull final ObservableValue<Predicate<? super T>> predicate) {
233 requireNonNull(predicate, ERROR_PREDICATE_NULL);
234 return createInstance(push(operations, new StreamOp() {
235 @Nonnull
236 @Override
237 public Stream apply(@Nonnull Stream stream) {
238 Predicate<? super T> p = predicate.getValue();
239 requireNonNull(p, ERROR_PREDICATE_NULL);
240 return stream.filter(p);
241 }
242
243 @Nullable
244 @Override
245 public Observable dependency() {
246 return predicate;
247 }
248 }));
249 }
250
251 @Nonnull
252 @Override
253 public <R> ObservableStream<R> map(@Nonnull final ObservableValue<Function<? super T, ? extends R>> mapper) {
254 requireNonNull(mapper, ERROR_MAPPER_NULL);
255 return createInstance(push(operations, new StreamOp() {
256 @Nonnull
257 @Override
258 public Stream apply(@Nonnull Stream stream) {
259 Function<? super T, ? extends R> m = mapper.getValue();
260 requireNonNull(m, ERROR_MAPPER_NULL);
261 return stream.map(m);
262 }
263
264 @Nullable
265 @Override
266 public Observable dependency() {
267 return mapper;
268 }
269 }));
270 }
271
272 @Nonnull
273 @Override
274 public <R> ObservableStream<R> flatMap(@Nonnull final ObservableValue<Function<? super T, ? extends ObservableStream<? extends R>>> mapper) {
275 requireNonNull(mapper, ERROR_MAPPER_NULL);
276 return createInstance(push(operations, new StreamOp() {
277 @Nonnull
278 @Override
279 public Stream apply(@Nonnull Stream stream) {
280 Function<? super T, ? extends ObservableStream<? extends R>> m = mapper.getValue();
281 requireNonNull(m, ERROR_MAPPER_NULL);
282 return stream.flatMap(m);
283 }
284
285 @Nullable
286 @Override
287 public Observable dependency() {
288 return mapper;
289 }
290 }));
291 }
292
293 @Nonnull
294 @Override
295 public ObjectBinding<T> reduce(@Nonnull final BinaryOperator<T> accumulator) {
296 requireNonNull(accumulator, ERROR_ACCUMULATOR);
297 return createObjectBinding(() -> (T) stream().reduce(accumulator).orElse(null), dependencies());
298 }
299
300 @Nonnull
301 @Override
302 public ObjectBinding<T> reduce(@Nullable final T defaultValue, @Nonnull final BinaryOperator<T> accumulator) {
303 requireNonNull(accumulator, ERROR_ACCUMULATOR);
304 return createObjectBinding(() -> (T) stream().reduce(accumulator).orElse(defaultValue), dependencies());
305 }
306
307 @Nonnull
308 @Override
309 public ObjectBinding<T> reduce(@Nonnull final Supplier<T> supplier, @Nonnull final BinaryOperator<T> accumulator) {
310 requireNonNull(supplier, ERROR_SUPPLIER_NULL);
311 requireNonNull(accumulator, ERROR_ACCUMULATOR);
312 return createObjectBinding(() -> (T) stream().reduce(accumulator).orElseGet(supplier), dependencies());
313 }
314
315 @Nonnull
316 @Override
317 public <U> ObjectBinding<U> reduce(@Nullable final U identity, @Nonnull final BiFunction<U, ? super T, U> accumulator, @Nonnull final BinaryOperator<U> combiner) {
318 requireNonNull(combiner, ERROR_COMBINER_NULL);
319 return createObjectBinding(() -> (U) stream().reduce(identity, accumulator, combiner), dependencies());
320 }
321
322 @Nonnull
323 @Override
324 public ObjectBinding<T> reduce(@Nonnull final ObservableValue<BinaryOperator<T>> accumulator) {
325 requireNonNull(accumulator, ERROR_ACCUMULATOR);
326 return createObjectBinding(() -> {
327 BinaryOperator<T> a = accumulator.getValue();
328 requireNonNull(a, ERROR_ACCUMULATOR);
329 return (T) stream().reduce(a).orElse(null);
330 }, dependencies(accumulator));
331 }
332
333 @Nonnull
334 @Override
335 public ObjectBinding<T> reduce(@Nullable final T defaultValue, @Nonnull final ObservableValue<BinaryOperator<T>> accumulator) {
336 requireNonNull(accumulator, ERROR_ACCUMULATOR);
337 return createObjectBinding(() -> {
338 BinaryOperator<T> a = accumulator.getValue();
339 requireNonNull(a, ERROR_ACCUMULATOR);
340 return (T) stream().reduce(a).orElse(defaultValue);
341 }, dependencies(accumulator));
342 }
343
344 @Nonnull
345 @Override
346 public ObjectBinding<T> reduce(@Nonnull final Supplier<T> supplier, @Nonnull final ObservableValue<BinaryOperator<T>> accumulator) {
347 requireNonNull(supplier, ERROR_SUPPLIER_NULL);
348 requireNonNull(accumulator, ERROR_ACCUMULATOR);
349 return createObjectBinding(() -> {
350 BinaryOperator<T> a = accumulator.getValue();
351 requireNonNull(a, ERROR_ACCUMULATOR);
352 return (T) stream().reduce(a).orElseGet(supplier);
353 }, dependencies(accumulator));
354 }
355
356 @Nonnull
357 @Override
358 public <U> ObjectBinding<U> reduce(@Nonnull final ObservableValue<U> identity, @Nonnull final ObservableValue<BiFunction<U, ? super T, U>> accumulator, @Nonnull final ObservableValue<BinaryOperator<U>> combiner) {
359 requireNonNull(identity, ERROR_IDENTITY_NULL);
360 requireNonNull(accumulator, ERROR_ACCUMULATOR);
361 requireNonNull(combiner, ERROR_COMBINER_NULL);
362 return createObjectBinding(() -> {
363 U i = identity.getValue();
364 requireNonNull(i, ERROR_IDENTITY_NULL);
365 BiFunction<U, ? super T, U> a = accumulator.getValue();
366 requireNonNull(a, ERROR_ACCUMULATOR);
367 BinaryOperator<U> c = combiner.getValue();
368 requireNonNull(c, ERROR_COMBINER_NULL);
369 return (U) stream().reduce(i, a, c);
370 }, dependencies(identity, accumulator, combiner));
371 }
372
373 @Nonnull
374 @Override
375 public ObjectBinding<T> min(@Nonnull final Comparator<? super T> comparator) {
376 requireNonNull(comparator, ERROR_COMPARATOR_NULL);
377 return createObjectBinding(() -> (T) stream().min(comparator).orElse(null), dependencies());
378 }
379
380 @Nonnull
381 @Override
382 public ObjectBinding<T> max(@Nonnull final Comparator<? super T> comparator) {
383 requireNonNull(comparator, ERROR_COMPARATOR_NULL);
384 return createObjectBinding(() -> (T) stream().max(comparator).orElse(null), dependencies());
385 }
386
387 @Nonnull
388 @Override
389 public ObjectBinding<T> min(@Nullable final T defaultValue, @Nonnull final Comparator<? super T> comparator) {
390 requireNonNull(comparator, ERROR_COMPARATOR_NULL);
391 return createObjectBinding(() -> (T) stream().min(comparator).orElse(defaultValue), dependencies());
392 }
393
394 @Nonnull
395 @Override
396 public ObjectBinding<T> max(@Nullable final T defaultValue, @Nonnull final Comparator<? super T> comparator) {
397 requireNonNull(comparator, ERROR_COMPARATOR_NULL);
398 return createObjectBinding(() -> (T) stream().max(comparator).orElse(defaultValue), dependencies());
399 }
400
401 @Nonnull
402 @Override
403 public ObjectBinding<T> min(@Nonnull final Supplier<T> supplier, @Nonnull final Comparator<? super T> comparator) {
404 requireNonNull(comparator, ERROR_COMPARATOR_NULL);
405 requireNonNull(supplier, ERROR_SUPPLIER_NULL);
406 return createObjectBinding(() -> (T) stream().min(comparator).orElseGet(supplier), dependencies());
407 }
408
409 @Nonnull
410 @Override
411 public ObjectBinding<T> max(@Nonnull final Supplier<T> supplier, @Nonnull final Comparator<? super T> comparator) {
412 requireNonNull(supplier, ERROR_SUPPLIER_NULL);
413 requireNonNull(comparator, ERROR_COMPARATOR_NULL);
414 return createObjectBinding(() -> (T) stream().max(comparator).orElseGet(supplier), dependencies());
415 }
416
417 @Nonnull
418 @Override
419 public BooleanBinding anyMatch(@Nonnull final Predicate<? super T> predicate) {
420 requireNonNull(predicate, ERROR_PREDICATE_NULL);
421 return createBooleanBinding(() -> stream().anyMatch(predicate), dependencies());
422 }
423
424 @Nonnull
425 @Override
426 public BooleanBinding allMatch(@Nonnull final Predicate<? super T> predicate) {
427 requireNonNull(predicate, ERROR_PREDICATE_NULL);
428 return createBooleanBinding(() -> stream().allMatch(predicate), dependencies());
429 }
430
431 @Nonnull
432 @Override
433 public BooleanBinding noneMatch(@Nonnull final Predicate<? super T> predicate) {
434 requireNonNull(predicate, ERROR_PREDICATE_NULL);
435 return createBooleanBinding(() -> stream().noneMatch(predicate), dependencies());
436 }
437
438 @Nonnull
439 @Override
440 public ObjectBinding<T> min(@Nonnull final ObservableValue<Comparator<? super T>> comparator) {
441 requireNonNull(comparator, ERROR_COMPARATOR_NULL);
442 return createObjectBinding(() -> {
443 Comparator<? super T> c = comparator.getValue();
444 requireNonNull(c, ERROR_COMPARATOR_NULL);
445 return (T) stream().min(c).orElse(null);
446 }, dependencies(comparator));
447 }
448
449 @Nonnull
450 @Override
451 public ObjectBinding<T> max(@Nonnull final ObservableValue<Comparator<? super T>> comparator) {
452 requireNonNull(comparator, ERROR_COMPARATOR_NULL);
453 return createObjectBinding(() -> {
454 Comparator<? super T> c = comparator.getValue();
455 requireNonNull(c, ERROR_COMPARATOR_NULL);
456 return (T) stream().max(c).orElse(null);
457 }, dependencies(comparator));
458 }
459
460 @Nonnull
461 @Override
462 public ObjectBinding<T> min(@Nullable final T defaultValue, @Nonnull final ObservableValue<Comparator<? super T>> comparator) {
463 requireNonNull(comparator, ERROR_COMPARATOR_NULL);
464 return createObjectBinding(() -> {
465 Comparator<? super T> c = comparator.getValue();
466 requireNonNull(c, ERROR_COMPARATOR_NULL);
467 return (T) stream().min(c).orElse(defaultValue);
468 }, dependencies(comparator));
469 }
470
471 @Nonnull
472 @Override
473 public ObjectBinding<T> max(@Nullable final T defaultValue, @Nonnull final ObservableValue<Comparator<? super T>> comparator) {
474 requireNonNull(comparator, ERROR_COMPARATOR_NULL);
475 return createObjectBinding(() -> {
476 Comparator<? super T> c = comparator.getValue();
477 requireNonNull(c, ERROR_COMPARATOR_NULL);
478 return (T) stream().max(c).orElse(defaultValue);
479 }, dependencies(comparator));
480 }
481
482 @Nonnull
483 @Override
484 public ObjectBinding<T> min(@Nonnull final Supplier<T> supplier, @Nonnull final ObservableValue<Comparator<? super T>> comparator) {
485 requireNonNull(comparator, ERROR_COMPARATOR_NULL);
486 requireNonNull(supplier, ERROR_SUPPLIER_NULL);
487 return createObjectBinding(() -> {
488 Comparator<? super T> c = comparator.getValue();
489 requireNonNull(c, ERROR_COMPARATOR_NULL);
490 return (T) stream().min(c).orElseGet(supplier);
491 }, dependencies(comparator));
492 }
493
494 @Nonnull
495 @Override
496 public ObjectBinding<T> max(@Nonnull final Supplier<T> supplier, @Nonnull final ObservableValue<Comparator<? super T>> comparator) {
497 requireNonNull(comparator, ERROR_COMPARATOR_NULL);
498 requireNonNull(supplier, ERROR_SUPPLIER_NULL);
499 return createObjectBinding(() -> {
500 Comparator<? super T> c = comparator.getValue();
501 requireNonNull(c, ERROR_COMPARATOR_NULL);
502 return (T) stream().max(c).orElseGet(supplier);
503 }, dependencies(comparator));
504 }
505
506 @Nonnull
507 @Override
508 public BooleanBinding anyMatch(@Nonnull final ObservableValue<Predicate<? super T>> predicate) {
509 requireNonNull(predicate, ERROR_PREDICATE_NULL);
510 return createBooleanBinding(() -> {
511 Predicate<? super T> p = predicate.getValue();
512 requireNonNull(p, ERROR_PREDICATE_NULL);
513 return stream().anyMatch(p);
514 }, dependencies(predicate));
515 }
516
517 @Nonnull
518 @Override
519 public BooleanBinding allMatch(@Nonnull final ObservableValue<Predicate<? super T>> predicate) {
520 requireNonNull(predicate, ERROR_PREDICATE_NULL);
521 return createBooleanBinding(() -> {
522 Predicate<? super T> p = predicate.getValue();
523 requireNonNull(p, ERROR_PREDICATE_NULL);
524 return stream().allMatch(p);
525 }, dependencies(predicate));
526 }
527
528 @Nonnull
529 @Override
530 public BooleanBinding noneMatch(@Nonnull final ObservableValue<Predicate<? super T>> predicate) {
531 requireNonNull(predicate, ERROR_PREDICATE_NULL);
532 return createBooleanBinding(() -> {
533 Predicate<? super T> p = predicate.getValue();
534 requireNonNull(p, ERROR_PREDICATE_NULL);
535 return stream().noneMatch(p);
536 }, dependencies(predicate));
537 }
538
539 @Nonnull
540 @Override
541 public ObjectBinding<T> findFirst() {
542 return createObjectBinding(() -> (T) stream().findFirst().orElse(null), dependencies());
543 }
544
545 @Nonnull
546 @Override
547 public ObjectBinding<T> findFirst(@Nullable final T defaultValue) {
548 return createObjectBinding(() -> (T) stream().findFirst().orElse(defaultValue), dependencies());
549 }
550
551 @Nonnull
552 @Override
553 public ObjectBinding<T> findFirst(@Nonnull final Supplier<T> supplier) {
554 requireNonNull(supplier, ERROR_SUPPLIER_NULL);
555 return createObjectBinding(() -> (T) stream().findFirst().orElseGet(supplier), dependencies());
556 }
557
558 @Nonnull
559 @Override
560 public ObjectBinding<T> findAny() {
561 return createObjectBinding(() -> (T) stream().findAny().orElse(null), dependencies());
562 }
563
564 @Nonnull
565 @Override
566 public ObjectBinding<T> findAny(@Nullable final T defaultValue) {
567 return createObjectBinding(() -> (T) stream().findAny().orElse(defaultValue), dependencies());
568 }
569
570 @Nonnull
571 @Override
572 public ObjectBinding<T> findAny(@Nonnull final Supplier<T> supplier) {
573 requireNonNull(supplier, ERROR_SUPPLIER_NULL);
574 return createObjectBinding(() -> (T) stream().findAny().orElseGet(supplier), dependencies());
575 }
576
577 @Nonnull
578 private Stream stream() {
579 Stream stream = createStream();
580
581 for (StreamOp op : operations) {
582 stream = op.apply(stream);
583 }
584
585 return stream;
586 }
587
588 @Nonnull
589 private Observable[] dependencies(Observable... deps) {
590 List<Observable> dependencies = new ArrayList<>();
591 dependencies.add(observable);
592 if (deps != null) {
593 Collections.addAll(dependencies, deps);
594 }
595
596 for (StreamOp op : operations) {
597 Observable dependency = op.dependency();
598 if (dependency != null) {
599 dependencies.add(dependency);
600 }
601 }
602
603 return dependencies.toArray(new Observable[dependencies.size()]);
604 }
605
606 private static List<StreamOp> push(List<StreamOp> operations, StreamOp op) {
607 List<StreamOp> ops = new ArrayList<>(operations);
608 ops.add(op);
609 return ops;
610 }
611
612 interface StreamOp {
613 @Nonnull
614 Stream apply(@Nonnull Stream stream);
615
616 @Nullable
617 Observable dependency();
618 }
619
620 static class StreamOpAdapter implements StreamOp {
621 @Nonnull
622 @Override
623 public Stream apply(@Nonnull Stream stream) {
624 return stream;
625 }
626
627 @Nullable
628 @Override
629 public Observable dependency() {
630 return null;
631 }
632 }
633 }
|