001/*
002 * Configurate
003 * Copyright (C) zml and Configurate contributors
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 *    http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.spongepowered.configurate.reactive;
018
019import static java.util.Objects.requireNonNull;
020
021import org.checkerframework.checker.nullness.qual.Nullable;
022import org.spongepowered.configurate.util.CheckedFunction;
023import org.spongepowered.configurate.util.CheckedSupplier;
024
025import java.util.concurrent.Executor;
026import java.util.concurrent.ForkJoinPool;
027
028/**
029 * Something that can publish events.
030 *
031 * <p>Each subscriber is responsible for removing itself from this stream, by
032 * using the Disposable returned upon subscription.
033 *
034 * @param <V> the type of notification received by subscribers
035 * @since 4.0.0
036 */
037public interface Publisher<V> {
038
039    /**
040     * Execute an action returning a single value on the common {@link ForkJoinPool},
041     * and pass the result to any subscribers.
042     *
043     * <p>Subscribers who only begin subscribing after the operation has been
044     * completed will receive the result of the operation.
045     *
046     * @param action the action to perform
047     * @param <V> returned value type
048     * @param <E> exception thrown
049     * @return a publisher
050     * @since 4.0.0
051     */
052    static <V, E extends Exception> Publisher<V> execute(CheckedSupplier<V, E> action) {
053        return execute(action, ForkJoinPool.commonPool());
054    }
055
056    /**
057     * Execute an action returning a single value on the provided {@link Executor},
058     * and pass the result to any subscribers.
059     *
060     * <p>Subscribers who only begin subscribing after the operation has been
061     * completed will receive the result of the operation.
062     *
063     * @param action the action to perform
064     * @param executor the executor to perform this operation on
065     * @param <V> returned value type
066     * @param <E> exception thrown
067     * @return a publisher
068     * @since 4.0.0
069     */
070    static <V, E extends Exception> Publisher<V> execute(CheckedSupplier<V, E> action, Executor executor) {
071        return new ExecutePublisher<>(requireNonNull(action, "action"), requireNonNull(executor, "executor"));
072    }
073
074    /**
075     * Subscribe to updates from this Publisher. If this is already closed, the
076     * Subscriber will receive an error event with an IllegalStateException, and
077     * the returned {@link Disposable} will be a no-op.
078     *
079     * @param subscriber the listener to register
080     * @return a disposable that can be used to cancel this subscription
081     * @since 4.0.0
082     */
083    Disposable subscribe(Subscriber<? super V> subscriber);
084
085    /**
086     * Return whether or not this Publisher has any subscribers.
087     *
088     * <p>In a concurrent environment, this value could change from the time
089     * of calling.
090     *
091     * @return if there are subscribers
092     * @since 4.0.0
093     */
094    boolean hasSubscribers();
095
096    /**
097     * Create a new publisher that will transform events published.
098     *
099     * @param mapper transformer function
100     * @param <R> output value type
101     * @return a new publisher
102     * @since 4.0.0
103     */
104    default <R> Publisher<R> map(CheckedFunction<? super V, ? extends R, TransactionFailedException> mapper) {
105        return new MappedProcessor<>(mapper, this);
106    }
107
108    /**
109     * Return a publisher that will track its most recent value. The provided
110     * processor won't have a value until one is submitted to this publisher.
111     *
112     * @return a publisher based on this one
113     * @since 4.0.0
114     */
115    default Cached<V> cache() {
116        return cache(null);
117    }
118
119    /**
120     * Create a cached publisher with an initial value.
121     *
122     * @param initialValue value to initialize the returned publisher with
123     * @return publisher that will cache future responses
124     * @since 4.0.0
125     */
126    default Cached<V> cache(@Nullable V initialValue) {
127        return new CachedPublisher<>(this, initialValue);
128    }
129
130    /**
131     * Get the executor used to handle published events.
132     *
133     * @return the executor
134     * @since 4.0.0
135     */
136    Executor executor();
137
138    /**
139     * A publisher that caches the last value received.
140     *
141     * @param <V> value type
142     * @since 4.0.0
143     */
144    interface Cached<V> extends Publisher<V> {
145
146        /**
147         * Get the last cached value.
148         *
149         * @return latest cached value
150         * @since 4.0.0
151         */
152        V get();
153
154        /**
155         * Directly submit a value to be the current cached value.
156         *
157         * <p>This will be passed on to downstream subscribers.</p>
158         *
159         * @param value new value
160         * @since 4.0.0
161         */
162        void submit(V value);
163
164    }
165
166}