001/*
002 * Configurate
003 * Copyright (C) zml and Configurate contributors
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 *    http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.spongepowered.configurate.reactive;
018
019import org.checkerframework.checker.nullness.qual.Nullable;
020import org.spongepowered.configurate.util.CheckedFunction;
021
022import java.util.concurrent.Executor;
023import java.util.concurrent.ForkJoinPool;
024
025/**
026 * A combination of an {@link Publisher} and {@link Subscriber}.
027 *
028 * <p>Processors are expected to broadcast their submitted values to any
029 * registered observers, though filtering or other transformations may
030 * be applied.
031 *
032 * <p>Submitting a completion event to the processor will result in a completion
033 * event being passed to every subscriber, and the rejection of further events
034 * being submitted.
035 *
036 * @param <I> the type observed
037 * @param <O> the type produced
038 * @since 4.0.0
039 */
040public interface Processor<I, O> extends Publisher<O>, Subscriber<I> {
041
042    /**
043     * Create a {@link Processor} instance that simply broadcasts submitted
044     * values to its subscribers. Broadcasts will occur on the
045     * common {@link ForkJoinPool}.
046     *
047     * @param <V> the type
048     * @return a new processor instance
049     * @since 4.0.0
050     */
051    static <V> Processor.Iso<V> create() {
052        return create(ForkJoinPool.commonPool());
053    }
054
055    /**
056     * Create a {@link Processor} instance that simply broadcasts submitted
057     * values to its subscribers.
058     *
059     * @param <V> the type
060     * @param executor task executor
061     * @return a new processor instance
062     * @since 4.0.0
063     */
064    static <V> Processor.Iso<V> create(Executor executor) {
065        return new ProcessorImpl<>(executor);
066    }
067
068    /**
069     * Create a processor instance that is aware of transactions.
070     *
071     * @param <V> the value type
072     * @return a new transactional processor
073     * @since 4.0.0
074     */
075    static <V> Processor.TransactionalIso<V> createTransactional() {
076        return createTransactional(ForkJoinPool.commonPool());
077    }
078
079    /**
080     * Create a processor instance that is aware of transactions.
081     *
082     * <p>Operations will be submitted to the provided executor.</p>
083     *
084     * @param exec executor to run operations on
085     * @param <V> the value type
086     * @return a new transactional processor
087     * @since 4.0.0
088     */
089    static <V> Processor.TransactionalIso<V> createTransactional(final Executor exec) {
090        return new TransactionalProcessorImpl<>(exec);
091    }
092
093    /**
094     * {@inheritDoc}
095     */
096    @Override
097    default <R> Processor<O, R> map(CheckedFunction<? super O, ? extends R, TransactionFailedException> mapper) {
098        return new MappedProcessor<>(mapper, this);
099    }
100
101    /**
102     * Submit an element of the observed type, bypassing any mapping this
103     * Processor may do. If the input type of this processor equals the output
104     * type, this is equivalent to {@link #submit(Object)}
105     *
106     * @param element the element to submit
107     * @since 4.0.0
108     */
109    void inject(O element);
110
111    /**
112     * Provide a {@link Subscriber} that will handle events submitted to this
113     * processor, but only if no other subscription is active.
114     *
115     * @param subscriber the fallback subscriber to add. Provide {@code null} to
116     *                   remove the handler
117     * @since 4.0.0
118     */
119    void fallbackHandler(@Nullable Subscriber<O> subscriber);
120
121    /**
122     * Close this processor if there are no remaining subscriptions. Any signals
123     * that have already been submitted will be processed.
124     *
125     * <p>Any call to this method after the {@link Processor} has been closed
126     * will simply return true.
127     *
128     * @return true if there are no subscribers and this processor is closed
129     * @since 4.0.0
130     */
131    boolean closeIfUnsubscribed();
132
133    /**
134     * A Processor that has the same type for inputs and outputs.
135     *
136     * @param <V> the input and output type
137     * @since 4.0.0
138     */
139    interface Iso<V> extends Processor<V, V> {
140        @Override
141        default void inject(V element) {
142            submit(element);
143        }
144    }
145
146    /**
147     * A processor that supports transactions.
148     *
149     * @param <I> input type
150     * @param <O> output type
151     * @since 4.0.0
152     */
153    interface Transactional<I, O> extends Processor<I, O>, Publisher<O>, TransactionalSubscriber<I> {
154    }
155
156    /**
157     * A processor that supports transactions using the same input and outputs.
158     *
159     * @param <V> input/output type
160     * @since 4.0.0
161     */
162    interface TransactionalIso<V> extends Transactional<V, V>, Iso<V> {
163
164    }
165
166}