| /* |
| * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved. |
| * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. |
| * |
| * This code is free software; you can redistribute it and/or modify it |
| * under the terms of the GNU General Public License version 2 only, as |
| * published by the Free Software Foundation. Oracle designates this |
| * particular file as subject to the "Classpath" exception as provided |
| * by Oracle in the LICENSE file that accompanied this code. |
| * |
| * This code is distributed in the hope that it will be useful, but WITHOUT |
| * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or |
| * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License |
| * version 2 for more details (a copy is included in the LICENSE file that |
| * accompanied this code). |
| * |
| * You should have received a copy of the GNU General Public License version |
| * 2 along with this work; if not, write to the Free Software Foundation, |
| * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. |
| * |
| * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA |
| * or visit www.oracle.com if you need additional information or have any |
| * questions. |
| */ |
| package java.util.stream; |
| |
| import java.util.Objects; |
| import java.util.Optional; |
| import java.util.OptionalDouble; |
| import java.util.OptionalInt; |
| import java.util.OptionalLong; |
| import java.util.Spliterator; |
| import java.util.concurrent.CountedCompleter; |
| import java.util.function.BiConsumer; |
| import java.util.function.BiFunction; |
| import java.util.function.BinaryOperator; |
| import java.util.function.DoubleBinaryOperator; |
| import java.util.function.IntBinaryOperator; |
| import java.util.function.LongBinaryOperator; |
| import java.util.function.ObjDoubleConsumer; |
| import java.util.function.ObjIntConsumer; |
| import java.util.function.ObjLongConsumer; |
| import java.util.function.Supplier; |
| |
| /** |
| * Factory for creating instances of {@code TerminalOp} that implement |
| * reductions. |
| * |
| * @since 1.8 |
| */ |
| final class ReduceOps { |
| |
| private ReduceOps() { } |
| |
| /** |
| * Constructs a {@code TerminalOp} that implements a functional reduce on |
| * reference values. |
| * |
| * @param <T> the type of the input elements |
| * @param <U> the type of the result |
| * @param seed the identity element for the reduction |
| * @param reducer the accumulating function that incorporates an additional |
| * input element into the result |
| * @param combiner the combining function that combines two intermediate |
| * results |
| * @return a {@code TerminalOp} implementing the reduction |
| */ |
| public static <T, U> TerminalOp<T, U> |
| makeRef(U seed, BiFunction<U, ? super T, U> reducer, BinaryOperator<U> combiner) { |
| Objects.requireNonNull(reducer); |
| Objects.requireNonNull(combiner); |
| class ReducingSink extends Box<U> implements AccumulatingSink<T, U, ReducingSink> { |
| @Override |
| public void begin(long size) { |
| state = seed; |
| } |
| |
| @Override |
| public void accept(T t) { |
| state = reducer.apply(state, t); |
| } |
| |
| @Override |
| public void combine(ReducingSink other) { |
| state = combiner.apply(state, other.state); |
| } |
| } |
| return new ReduceOp<T, U, ReducingSink>(StreamShape.REFERENCE) { |
| @Override |
| public ReducingSink makeSink() { |
| return new ReducingSink(); |
| } |
| }; |
| } |
| |
| /** |
| * Constructs a {@code TerminalOp} that implements a functional reduce on |
| * reference values producing an optional reference result. |
| * |
| * @param <T> The type of the input elements, and the type of the result |
| * @param operator The reducing function |
| * @return A {@code TerminalOp} implementing the reduction |
| */ |
| public static <T> TerminalOp<T, Optional<T>> |
| makeRef(BinaryOperator<T> operator) { |
| Objects.requireNonNull(operator); |
| class ReducingSink |
| implements AccumulatingSink<T, Optional<T>, ReducingSink> { |
| private boolean empty; |
| private T state; |
| |
| public void begin(long size) { |
| empty = true; |
| state = null; |
| } |
| |
| @Override |
| public void accept(T t) { |
| if (empty) { |
| empty = false; |
| state = t; |
| } else { |
| state = operator.apply(state, t); |
| } |
| } |
| |
| @Override |
| public Optional<T> get() { |
| return empty ? Optional.empty() : Optional.of(state); |
| } |
| |
| @Override |
| public void combine(ReducingSink other) { |
| if (!other.empty) |
| accept(other.state); |
| } |
| } |
| return new ReduceOp<T, Optional<T>, ReducingSink>(StreamShape.REFERENCE) { |
| @Override |
| public ReducingSink makeSink() { |
| return new ReducingSink(); |
| } |
| }; |
| } |
| |
| /** |
| * Constructs a {@code TerminalOp} that implements a mutable reduce on |
| * reference values. |
| * |
| * @param <T> the type of the input elements |
| * @param <I> the type of the intermediate reduction result |
| * @param collector a {@code Collector} defining the reduction |
| * @return a {@code ReduceOp} implementing the reduction |
| */ |
| public static <T, I> TerminalOp<T, I> |
| makeRef(Collector<? super T, I, ?> collector) { |
| Supplier<I> supplier = Objects.requireNonNull(collector).supplier(); |
| BiConsumer<I, ? super T> accumulator = collector.accumulator(); |
| BinaryOperator<I> combiner = collector.combiner(); |
| class ReducingSink extends Box<I> |
| implements AccumulatingSink<T, I, ReducingSink> { |
| @Override |
| public void begin(long size) { |
| state = supplier.get(); |
| } |
| |
| @Override |
| public void accept(T t) { |
| accumulator.accept(state, t); |
| } |
| |
| @Override |
| public void combine(ReducingSink other) { |
| state = combiner.apply(state, other.state); |
| } |
| } |
| return new ReduceOp<T, I, ReducingSink>(StreamShape.REFERENCE) { |
| @Override |
| public ReducingSink makeSink() { |
| return new ReducingSink(); |
| } |
| |
| @Override |
| public int getOpFlags() { |
| return collector.characteristics().contains(Collector.Characteristics.UNORDERED) |
| ? StreamOpFlag.NOT_ORDERED |
| : 0; |
| } |
| }; |
| } |
| |
| /** |
| * Constructs a {@code TerminalOp} that implements a mutable reduce on |
| * reference values. |
| * |
| * @param <T> the type of the input elements |
| * @param <R> the type of the result |
| * @param seedFactory a factory to produce a new base accumulator |
| * @param accumulator a function to incorporate an element into an |
| * accumulator |
| * @param reducer a function to combine an accumulator into another |
| * @return a {@code TerminalOp} implementing the reduction |
| */ |
| public static <T, R> TerminalOp<T, R> |
| makeRef(Supplier<R> seedFactory, |
| BiConsumer<R, ? super T> accumulator, |
| BiConsumer<R,R> reducer) { |
| Objects.requireNonNull(seedFactory); |
| Objects.requireNonNull(accumulator); |
| Objects.requireNonNull(reducer); |
| class ReducingSink extends Box<R> |
| implements AccumulatingSink<T, R, ReducingSink> { |
| @Override |
| public void begin(long size) { |
| state = seedFactory.get(); |
| } |
| |
| @Override |
| public void accept(T t) { |
| accumulator.accept(state, t); |
| } |
| |
| @Override |
| public void combine(ReducingSink other) { |
| reducer.accept(state, other.state); |
| } |
| } |
| return new ReduceOp<T, R, ReducingSink>(StreamShape.REFERENCE) { |
| @Override |
| public ReducingSink makeSink() { |
| return new ReducingSink(); |
| } |
| }; |
| } |
| |
| /** |
| * Constructs a {@code TerminalOp} that implements a functional reduce on |
| * {@code int} values. |
| * |
| * @param identity the identity for the combining function |
| * @param operator the combining function |
| * @return a {@code TerminalOp} implementing the reduction |
| */ |
| public static TerminalOp<Integer, Integer> |
| makeInt(int identity, IntBinaryOperator operator) { |
| Objects.requireNonNull(operator); |
| class ReducingSink |
| implements AccumulatingSink<Integer, Integer, ReducingSink>, Sink.OfInt { |
| private int state; |
| |
| @Override |
| public void begin(long size) { |
| state = identity; |
| } |
| |
| @Override |
| public void accept(int t) { |
| state = operator.applyAsInt(state, t); |
| } |
| |
| @Override |
| public Integer get() { |
| return state; |
| } |
| |
| @Override |
| public void combine(ReducingSink other) { |
| accept(other.state); |
| } |
| } |
| return new ReduceOp<Integer, Integer, ReducingSink>(StreamShape.INT_VALUE) { |
| @Override |
| public ReducingSink makeSink() { |
| return new ReducingSink(); |
| } |
| }; |
| } |
| |
| /** |
| * Constructs a {@code TerminalOp} that implements a functional reduce on |
| * {@code int} values, producing an optional integer result. |
| * |
| * @param operator the combining function |
| * @return a {@code TerminalOp} implementing the reduction |
| */ |
| public static TerminalOp<Integer, OptionalInt> |
| makeInt(IntBinaryOperator operator) { |
| Objects.requireNonNull(operator); |
| class ReducingSink |
| implements AccumulatingSink<Integer, OptionalInt, ReducingSink>, Sink.OfInt { |
| private boolean empty; |
| private int state; |
| |
| public void begin(long size) { |
| empty = true; |
| state = 0; |
| } |
| |
| @Override |
| public void accept(int t) { |
| if (empty) { |
| empty = false; |
| state = t; |
| } |
| else { |
| state = operator.applyAsInt(state, t); |
| } |
| } |
| |
| @Override |
| public OptionalInt get() { |
| return empty ? OptionalInt.empty() : OptionalInt.of(state); |
| } |
| |
| @Override |
| public void combine(ReducingSink other) { |
| if (!other.empty) |
| accept(other.state); |
| } |
| } |
| return new ReduceOp<Integer, OptionalInt, ReducingSink>(StreamShape.INT_VALUE) { |
| @Override |
| public ReducingSink makeSink() { |
| return new ReducingSink(); |
| } |
| }; |
| } |
| |
| /** |
| * Constructs a {@code TerminalOp} that implements a mutable reduce on |
| * {@code int} values. |
| * |
| * @param <R> The type of the result |
| * @param supplier a factory to produce a new accumulator of the result type |
| * @param accumulator a function to incorporate an int into an |
| * accumulator |
| * @param combiner a function to combine an accumulator into another |
| * @return A {@code ReduceOp} implementing the reduction |
| */ |
| public static <R> TerminalOp<Integer, R> |
| makeInt(Supplier<R> supplier, |
| ObjIntConsumer<R> accumulator, |
| BinaryOperator<R> combiner) { |
| Objects.requireNonNull(supplier); |
| Objects.requireNonNull(accumulator); |
| Objects.requireNonNull(combiner); |
| class ReducingSink extends Box<R> |
| implements AccumulatingSink<Integer, R, ReducingSink>, Sink.OfInt { |
| @Override |
| public void begin(long size) { |
| state = supplier.get(); |
| } |
| |
| @Override |
| public void accept(int t) { |
| accumulator.accept(state, t); |
| } |
| |
| @Override |
| public void combine(ReducingSink other) { |
| state = combiner.apply(state, other.state); |
| } |
| } |
| return new ReduceOp<Integer, R, ReducingSink>(StreamShape.INT_VALUE) { |
| @Override |
| public ReducingSink makeSink() { |
| return new ReducingSink(); |
| } |
| }; |
| } |
| |
| /** |
| * Constructs a {@code TerminalOp} that implements a functional reduce on |
| * {@code long} values. |
| * |
| * @param identity the identity for the combining function |
| * @param operator the combining function |
| * @return a {@code TerminalOp} implementing the reduction |
| */ |
| public static TerminalOp<Long, Long> |
| makeLong(long identity, LongBinaryOperator operator) { |
| Objects.requireNonNull(operator); |
| class ReducingSink |
| implements AccumulatingSink<Long, Long, ReducingSink>, Sink.OfLong { |
| private long state; |
| |
| @Override |
| public void begin(long size) { |
| state = identity; |
| } |
| |
| @Override |
| public void accept(long t) { |
| state = operator.applyAsLong(state, t); |
| } |
| |
| @Override |
| public Long get() { |
| return state; |
| } |
| |
| @Override |
| public void combine(ReducingSink other) { |
| accept(other.state); |
| } |
| } |
| return new ReduceOp<Long, Long, ReducingSink>(StreamShape.LONG_VALUE) { |
| @Override |
| public ReducingSink makeSink() { |
| return new ReducingSink(); |
| } |
| }; |
| } |
| |
| /** |
| * Constructs a {@code TerminalOp} that implements a functional reduce on |
| * {@code long} values, producing an optional long result. |
| * |
| * @param operator the combining function |
| * @return a {@code TerminalOp} implementing the reduction |
| */ |
| public static TerminalOp<Long, OptionalLong> |
| makeLong(LongBinaryOperator operator) { |
| Objects.requireNonNull(operator); |
| class ReducingSink |
| implements AccumulatingSink<Long, OptionalLong, ReducingSink>, Sink.OfLong { |
| private boolean empty; |
| private long state; |
| |
| public void begin(long size) { |
| empty = true; |
| state = 0; |
| } |
| |
| @Override |
| public void accept(long t) { |
| if (empty) { |
| empty = false; |
| state = t; |
| } |
| else { |
| state = operator.applyAsLong(state, t); |
| } |
| } |
| |
| @Override |
| public OptionalLong get() { |
| return empty ? OptionalLong.empty() : OptionalLong.of(state); |
| } |
| |
| @Override |
| public void combine(ReducingSink other) { |
| if (!other.empty) |
| accept(other.state); |
| } |
| } |
| return new ReduceOp<Long, OptionalLong, ReducingSink>(StreamShape.LONG_VALUE) { |
| @Override |
| public ReducingSink makeSink() { |
| return new ReducingSink(); |
| } |
| }; |
| } |
| |
| /** |
| * Constructs a {@code TerminalOp} that implements a mutable reduce on |
| * {@code long} values. |
| * |
| * @param <R> the type of the result |
| * @param supplier a factory to produce a new accumulator of the result type |
| * @param accumulator a function to incorporate an int into an |
| * accumulator |
| * @param combiner a function to combine an accumulator into another |
| * @return a {@code TerminalOp} implementing the reduction |
| */ |
| public static <R> TerminalOp<Long, R> |
| makeLong(Supplier<R> supplier, |
| ObjLongConsumer<R> accumulator, |
| BinaryOperator<R> combiner) { |
| Objects.requireNonNull(supplier); |
| Objects.requireNonNull(accumulator); |
| Objects.requireNonNull(combiner); |
| class ReducingSink extends Box<R> |
| implements AccumulatingSink<Long, R, ReducingSink>, Sink.OfLong { |
| @Override |
| public void begin(long size) { |
| state = supplier.get(); |
| } |
| |
| @Override |
| public void accept(long t) { |
| accumulator.accept(state, t); |
| } |
| |
| @Override |
| public void combine(ReducingSink other) { |
| state = combiner.apply(state, other.state); |
| } |
| } |
| return new ReduceOp<Long, R, ReducingSink>(StreamShape.LONG_VALUE) { |
| @Override |
| public ReducingSink makeSink() { |
| return new ReducingSink(); |
| } |
| }; |
| } |
| |
| /** |
| * Constructs a {@code TerminalOp} that implements a functional reduce on |
| * {@code double} values. |
| * |
| * @param identity the identity for the combining function |
| * @param operator the combining function |
| * @return a {@code TerminalOp} implementing the reduction |
| */ |
| public static TerminalOp<Double, Double> |
| makeDouble(double identity, DoubleBinaryOperator operator) { |
| Objects.requireNonNull(operator); |
| class ReducingSink |
| implements AccumulatingSink<Double, Double, ReducingSink>, Sink.OfDouble { |
| private double state; |
| |
| @Override |
| public void begin(long size) { |
| state = identity; |
| } |
| |
| @Override |
| public void accept(double t) { |
| state = operator.applyAsDouble(state, t); |
| } |
| |
| @Override |
| public Double get() { |
| return state; |
| } |
| |
| @Override |
| public void combine(ReducingSink other) { |
| accept(other.state); |
| } |
| } |
| return new ReduceOp<Double, Double, ReducingSink>(StreamShape.DOUBLE_VALUE) { |
| @Override |
| public ReducingSink makeSink() { |
| return new ReducingSink(); |
| } |
| }; |
| } |
| |
| /** |
| * Constructs a {@code TerminalOp} that implements a functional reduce on |
| * {@code double} values, producing an optional double result. |
| * |
| * @param operator the combining function |
| * @return a {@code TerminalOp} implementing the reduction |
| */ |
| public static TerminalOp<Double, OptionalDouble> |
| makeDouble(DoubleBinaryOperator operator) { |
| Objects.requireNonNull(operator); |
| class ReducingSink |
| implements AccumulatingSink<Double, OptionalDouble, ReducingSink>, Sink.OfDouble { |
| private boolean empty; |
| private double state; |
| |
| public void begin(long size) { |
| empty = true; |
| state = 0; |
| } |
| |
| @Override |
| public void accept(double t) { |
| if (empty) { |
| empty = false; |
| state = t; |
| } |
| else { |
| state = operator.applyAsDouble(state, t); |
| } |
| } |
| |
| @Override |
| public OptionalDouble get() { |
| return empty ? OptionalDouble.empty() : OptionalDouble.of(state); |
| } |
| |
| @Override |
| public void combine(ReducingSink other) { |
| if (!other.empty) |
| accept(other.state); |
| } |
| } |
| return new ReduceOp<Double, OptionalDouble, ReducingSink>(StreamShape.DOUBLE_VALUE) { |
| @Override |
| public ReducingSink makeSink() { |
| return new ReducingSink(); |
| } |
| }; |
| } |
| |
| /** |
| * Constructs a {@code TerminalOp} that implements a mutable reduce on |
| * {@code double} values. |
| * |
| * @param <R> the type of the result |
| * @param supplier a factory to produce a new accumulator of the result type |
| * @param accumulator a function to incorporate an int into an |
| * accumulator |
| * @param combiner a function to combine an accumulator into another |
| * @return a {@code TerminalOp} implementing the reduction |
| */ |
| public static <R> TerminalOp<Double, R> |
| makeDouble(Supplier<R> supplier, |
| ObjDoubleConsumer<R> accumulator, |
| BinaryOperator<R> combiner) { |
| Objects.requireNonNull(supplier); |
| Objects.requireNonNull(accumulator); |
| Objects.requireNonNull(combiner); |
| class ReducingSink extends Box<R> |
| implements AccumulatingSink<Double, R, ReducingSink>, Sink.OfDouble { |
| @Override |
| public void begin(long size) { |
| state = supplier.get(); |
| } |
| |
| @Override |
| public void accept(double t) { |
| accumulator.accept(state, t); |
| } |
| |
| @Override |
| public void combine(ReducingSink other) { |
| state = combiner.apply(state, other.state); |
| } |
| } |
| return new ReduceOp<Double, R, ReducingSink>(StreamShape.DOUBLE_VALUE) { |
| @Override |
| public ReducingSink makeSink() { |
| return new ReducingSink(); |
| } |
| }; |
| } |
| |
| /** |
| * A type of {@code TerminalSink} that implements an associative reducing |
| * operation on elements of type {@code T} and producing a result of type |
| * {@code R}. |
| * |
| * @param <T> the type of input element to the combining operation |
| * @param <R> the result type |
| * @param <K> the type of the {@code AccumulatingSink}. |
| */ |
| private interface AccumulatingSink<T, R, K extends AccumulatingSink<T, R, K>> |
| extends TerminalSink<T, R> { |
| public void combine(K other); |
| } |
| |
| /** |
| * State box for a single state element, used as a base class for |
| * {@code AccumulatingSink} instances |
| * |
| * @param <U> The type of the state element |
| */ |
| private static abstract class Box<U> { |
| U state; |
| |
| Box() {} // Avoid creation of special accessor |
| |
| public U get() { |
| return state; |
| } |
| } |
| |
| /** |
| * A {@code TerminalOp} that evaluates a stream pipeline and sends the |
| * output into an {@code AccumulatingSink}, which performs a reduce |
| * operation. The {@code AccumulatingSink} must represent an associative |
| * reducing operation. |
| * |
| * @param <T> the output type of the stream pipeline |
| * @param <R> the result type of the reducing operation |
| * @param <S> the type of the {@code AccumulatingSink} |
| */ |
| private static abstract class ReduceOp<T, R, S extends AccumulatingSink<T, R, S>> |
| implements TerminalOp<T, R> { |
| private final StreamShape inputShape; |
| |
| /** |
| * Create a {@code ReduceOp} of the specified stream shape which uses |
| * the specified {@code Supplier} to create accumulating sinks. |
| * |
| * @param shape The shape of the stream pipeline |
| */ |
| ReduceOp(StreamShape shape) { |
| inputShape = shape; |
| } |
| |
| public abstract S makeSink(); |
| |
| @Override |
| public StreamShape inputShape() { |
| return inputShape; |
| } |
| |
| @Override |
| public <P_IN> R evaluateSequential(PipelineHelper<T> helper, |
| Spliterator<P_IN> spliterator) { |
| return helper.wrapAndCopyInto(makeSink(), spliterator).get(); |
| } |
| |
| @Override |
| public <P_IN> R evaluateParallel(PipelineHelper<T> helper, |
| Spliterator<P_IN> spliterator) { |
| return new ReduceTask<>(this, helper, spliterator).invoke().get(); |
| } |
| } |
| |
| /** |
| * A {@code ForkJoinTask} for performing a parallel reduce operation. |
| */ |
| @SuppressWarnings("serial") |
| private static final class ReduceTask<P_IN, P_OUT, R, |
| S extends AccumulatingSink<P_OUT, R, S>> |
| extends AbstractTask<P_IN, P_OUT, S, ReduceTask<P_IN, P_OUT, R, S>> { |
| private final ReduceOp<P_OUT, R, S> op; |
| |
| ReduceTask(ReduceOp<P_OUT, R, S> op, |
| PipelineHelper<P_OUT> helper, |
| Spliterator<P_IN> spliterator) { |
| super(helper, spliterator); |
| this.op = op; |
| } |
| |
| ReduceTask(ReduceTask<P_IN, P_OUT, R, S> parent, |
| Spliterator<P_IN> spliterator) { |
| super(parent, spliterator); |
| this.op = parent.op; |
| } |
| |
| @Override |
| protected ReduceTask<P_IN, P_OUT, R, S> makeChild(Spliterator<P_IN> spliterator) { |
| return new ReduceTask<>(this, spliterator); |
| } |
| |
| @Override |
| protected S doLeaf() { |
| return helper.wrapAndCopyInto(op.makeSink(), spliterator); |
| } |
| |
| @Override |
| public void onCompletion(CountedCompleter<?> caller) { |
| if (!isLeaf()) { |
| S leftResult = leftChild.getLocalResult(); |
| leftResult.combine(rightChild.getLocalResult()); |
| setLocalResult(leftResult); |
| } |
| // GC spliterator, left and right child |
| super.onCompletion(caller); |
| } |
| } |
| } |