001/*
002 * Configurate
003 * Copyright (C) zml and Configurate contributors
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 *    http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package ninja.leaping.configurate.reactive;
018
019import ninja.leaping.configurate.util.CheckedFunction;
020import ninja.leaping.configurate.util.CheckedSupplier;
021import org.checkerframework.checker.nullness.qual.Nullable;
022
023import java.util.concurrent.Executor;
024import java.util.concurrent.ForkJoinPool;
025
026import static java.util.Objects.requireNonNull;
027
028/**
029 * Something that can publish events.
030 * <p>
031 * Each subscriber is responsible for removing itself from this stream, by using the Disposable returned upon
032 * subscription
033 *
034 * @param <V> The type of notification received by subscribers
035 */
036public interface Publisher<V> {
037    /**
038     * Execute an action returning a single value on the common {@link ForkJoinPool}, and pass the result to any subscribers.
039     *
040     * Subscribers who only begin subscribing after the operation has been completed will receive the result of the operation.
041     *
042     * @param action The action to perform
043     * @param <V> returned value type
044     * @param <E> exception thrown
045     * @return a publisher
046     */
047    static <V, E extends Exception> Publisher<V> execute(CheckedSupplier<V, E> action) {
048        return execute(action, ForkJoinPool.commonPool());
049    }
050
051    /**
052     * Execute an action returning a single value on the provided {@link Executor}, and pass the result to any subscribers.
053     *
054     * Subscribers who only begin subscribing after the operation has been completed will receive the result of the operation.
055     *
056     * @param action The action to perform
057     * @param executor The executor to perform this operation on
058     * @param <V> returned value type
059     * @param <E> exception thrown
060     * @return a publisher
061     */
062    static <V, E extends Exception> Publisher<V> execute(CheckedSupplier<V, E> action, Executor executor) {
063        return new ExecutePublisher<>(requireNonNull(action, "action"), requireNonNull(executor, "executor"));
064    }
065
066    /**
067     * Subscribe to updates from this Publisher. If this is already closed, the Subscriber will receive an error event
068     * with an IllegalStateException, and the returned {@link Disposable} will be a no-op.
069     *
070     * @param subscriber The listener to register
071     * @return A disposable that can be used to cancel this subscription
072     */
073    Disposable subscribe(Subscriber<? super V> subscriber);
074
075    /**
076     * Return whether or not this Publisher has any subscribers.
077     * <p>
078     * In a concurrent environment, this value could change from the time of calling.
079     *
080     * @return if there are subscribers
081     */
082    boolean hasSubscribers();
083
084    default <R> Publisher<R> map(CheckedFunction<? super V, ? extends R, TransactionFailedException> mapper) {
085        return new MappedProcessor<>(mapper, this);
086    }
087
088    /**
089     * Return a publisher that will track its most recent value. The provided processor won't have a value until one is
090     * submitted to its owning publisher.
091     *
092     * @return A publisher based on this one
093     */
094    default Cached<V> cache() {
095        return cache(null);
096    }
097
098    /**
099     * A cached publisher with an initial value
100     *
101     * @param initialValue The value to
102     * @return The
103     */
104    default Cached<V> cache(@Nullable V initialValue) {
105        return new PublisherCached<>(this, initialValue);
106    }
107
108    /**
109     * Get the executor that will be used to handle published
110     *
111     * @return the executor
112     */
113    Executor getExecutor();
114
115    /**
116     * A publisher that caches the last value received
117     *
118     * @param <V> value type
119     */
120    interface Cached<V> extends Publisher<V> {
121        V get();
122
123        void submit(V value);
124    }
125}