001/* 002 * Configurate 003 * Copyright (C) zml and Configurate contributors 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package ninja.leaping.configurate.reactive; 018 019import ninja.leaping.configurate.util.CheckedFunction; 020import ninja.leaping.configurate.util.CheckedSupplier; 021import org.checkerframework.checker.nullness.qual.Nullable; 022 023import java.util.concurrent.Executor; 024import java.util.concurrent.ForkJoinPool; 025 026import static java.util.Objects.requireNonNull; 027 028/** 029 * Something that can publish events. 030 * <p> 031 * Each subscriber is responsible for removing itself from this stream, by using the Disposable returned upon 032 * subscription 033 * 034 * @param <V> The type of notification received by subscribers 035 */ 036public interface Publisher<V> { 037 /** 038 * Execute an action returning a single value on the common {@link ForkJoinPool}, and pass the result to any subscribers. 039 * 040 * Subscribers who only begin subscribing after the operation has been completed will receive the result of the operation. 041 * 042 * @param action The action to perform 043 * @param <V> returned value type 044 * @param <E> exception thrown 045 * @return a publisher 046 */ 047 static <V, E extends Exception> Publisher<V> execute(CheckedSupplier<V, E> action) { 048 return execute(action, ForkJoinPool.commonPool()); 049 } 050 051 /** 052 * Execute an action returning a single value on the provided {@link Executor}, and pass the result to any subscribers. 053 * 054 * Subscribers who only begin subscribing after the operation has been completed will receive the result of the operation. 055 * 056 * @param action The action to perform 057 * @param executor The executor to perform this operation on 058 * @param <V> returned value type 059 * @param <E> exception thrown 060 * @return a publisher 061 */ 062 static <V, E extends Exception> Publisher<V> execute(CheckedSupplier<V, E> action, Executor executor) { 063 return new ExecutePublisher<>(requireNonNull(action, "action"), requireNonNull(executor, "executor")); 064 } 065 066 /** 067 * Subscribe to updates from this Publisher. If this is already closed, the Subscriber will receive an error event 068 * with an IllegalStateException, and the returned {@link Disposable} will be a no-op. 069 * 070 * @param subscriber The listener to register 071 * @return A disposable that can be used to cancel this subscription 072 */ 073 Disposable subscribe(Subscriber<? super V> subscriber); 074 075 /** 076 * Return whether or not this Publisher has any subscribers. 077 * <p> 078 * In a concurrent environment, this value could change from the time of calling. 079 * 080 * @return if there are subscribers 081 */ 082 boolean hasSubscribers(); 083 084 default <R> Publisher<R> map(CheckedFunction<? super V, ? extends R, TransactionFailedException> mapper) { 085 return new MappedProcessor<>(mapper, this); 086 } 087 088 /** 089 * Return a publisher that will track its most recent value. The provided processor won't have a value until one is 090 * submitted to its owning publisher. 091 * 092 * @return A publisher based on this one 093 */ 094 default Cached<V> cache() { 095 return cache(null); 096 } 097 098 /** 099 * A cached publisher with an initial value 100 * 101 * @param initialValue The value to 102 * @return The 103 */ 104 default Cached<V> cache(@Nullable V initialValue) { 105 return new PublisherCached<>(this, initialValue); 106 } 107 108 /** 109 * Get the executor that will be used to handle published 110 * 111 * @return the executor 112 */ 113 Executor getExecutor(); 114 115 /** 116 * A publisher that caches the last value received 117 * 118 * @param <V> value type 119 */ 120 interface Cached<V> extends Publisher<V> { 121 V get(); 122 123 void submit(V value); 124 } 125}