001/*
002 * Configurate
003 * Copyright (C) zml and Configurate contributors
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 *    http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package ninja.leaping.configurate.reactive;
018
019import org.checkerframework.checker.nullness.qual.Nullable;
020import ninja.leaping.configurate.util.CheckedFunction;
021
022import java.util.concurrent.Executor;
023import java.util.concurrent.ForkJoinPool;
024
025/**
026 * A combination of an {@link Publisher} and {@link Subscriber}.
027 * <p>
028 * Processors are expected to broadcast their submitted values to any registered observers, though
029 * filtering or other transformations may be applied.
030 * <p>
031 * Submitting a completion event to the processor will result in a completion event being passed to
032 * every subscriber, and the rejection of further events being submitted.
033 *
034 * @param <I> The type observed
035 * @param <O> The type produced
036 */
037public interface Processor<I, O> extends Publisher<O>, Subscriber<I> {
038    /**
039     * Create a {@link Processor} instance that simply broadcasts submitted values to its
040     * subscribers. Broadcasts will occur on the common {@link ForkJoinPool}.
041     *
042     * @param <V> The type
043     * @return A new processor instance
044     */
045    static <V> Processor.Iso<V> create() {
046        return create(ForkJoinPool.commonPool());
047    }
048
049    /**
050     * Create a processor instance that is aware of transactions
051     *
052     * @param <V> The value type
053     * @return a new transactional processor
054     */
055    static <V> TransactionalIso<V> createTransactional() {
056        return createTransactional(ForkJoinPool.commonPool());
057    }
058
059    /**
060     * Create a {@link Processor} instance that simply broadcasts submitted values to its
061     * subscribers
062     *
063     * @param <V> The type
064     * @param executor task executor
065     * @return A new processor instance
066     */
067    static <V> Processor.Iso<V> create(Executor executor) {
068        return new ProcessorImpl<>(executor);
069    }
070
071    static <V> TransactionalIso<V> createTransactional(Executor exec) {
072        return new TransactionalProcessorImpl<>(exec);
073    }
074
075    /**
076     * {@inheritDoc}
077     */
078    @Override
079    default <R> Processor<O, R> map(CheckedFunction<? super O, ? extends R, TransactionFailedException> mapper) {
080        return new MappedProcessor<>(mapper, this);
081    }
082
083    /**
084     * Submit an element of the observed type, bypassing any mapping this Processor may do. If the
085     * input type of this processor equals the output type, this is equivalent to {@link
086     * #submit(Object)}
087     *
088     * @param element The element to submit
089     */
090    void inject(O element);
091
092    /**
093     * Provide a {@link Subscriber} that will handle events submitted to this processor, but only if
094     * no other subscription is active.
095     *
096     * @param subscriber The fallback subscriber to add. Provide {@code null} to remove the handler
097     */
098    void setFallbackHandler(@Nullable Subscriber<O> subscriber);
099
100    /**
101     * Close this processor if there are no remaining subscriptions. Any signals that have already
102     * been submitted will be processed.
103     * <p>
104     * Any call to this method after the {@link Processor} has been closed will simply return true.
105     *
106     * @return true if there are no subscribers and this processor is closed
107     */
108    boolean closeIfUnsubscribed();
109
110    /**
111     * A Processor that has the same type for inputs and outputs
112     *
113     * @param <V> The input and output type
114     */
115    interface Iso<V> extends Processor<V, V> {
116        @Override
117        default void inject(V element) {
118            submit(element);
119        }
120    }
121    
122    interface Transactional<I, O> extends Processor<I, O>, Publisher<O>, ninja.leaping.configurate.reactive.TransactionalSubscriber<I> {
123    }
124    
125    interface TransactionalIso<V> extends Transactional<V, V>, Iso<V> {
126
127    }
128    
129}