001/* 002 * Configurate 003 * Copyright (C) zml and Configurate contributors 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.spongepowered.configurate.reactive; 018 019import static java.util.Objects.requireNonNull; 020 021import org.checkerframework.checker.nullness.qual.Nullable; 022import org.spongepowered.configurate.util.CheckedFunction; 023import org.spongepowered.configurate.util.CheckedSupplier; 024 025import java.util.concurrent.Executor; 026import java.util.concurrent.ForkJoinPool; 027 028/** 029 * Something that can publish events. 030 * 031 * <p>Each subscriber is responsible for removing itself from this stream, by 032 * using the Disposable returned upon subscription. 033 * 034 * @param <V> the type of notification received by subscribers 035 * @since 4.0.0 036 */ 037public interface Publisher<V> { 038 039 /** 040 * Execute an action returning a single value on the common {@link ForkJoinPool}, 041 * and pass the result to any subscribers. 042 * 043 * <p>Subscribers who only begin subscribing after the operation has been 044 * completed will receive the result of the operation. 045 * 046 * @param action the action to perform 047 * @param <V> returned value type 048 * @param <E> exception thrown 049 * @return a publisher 050 * @since 4.0.0 051 */ 052 static <V, E extends Exception> Publisher<V> execute(CheckedSupplier<V, E> action) { 053 return execute(action, ForkJoinPool.commonPool()); 054 } 055 056 /** 057 * Execute an action returning a single value on the provided {@link Executor}, 058 * and pass the result to any subscribers. 059 * 060 * <p>Subscribers who only begin subscribing after the operation has been 061 * completed will receive the result of the operation. 062 * 063 * @param action the action to perform 064 * @param executor the executor to perform this operation on 065 * @param <V> returned value type 066 * @param <E> exception thrown 067 * @return a publisher 068 * @since 4.0.0 069 */ 070 static <V, E extends Exception> Publisher<V> execute(CheckedSupplier<V, E> action, Executor executor) { 071 return new ExecutePublisher<>(requireNonNull(action, "action"), requireNonNull(executor, "executor")); 072 } 073 074 /** 075 * Subscribe to updates from this Publisher. If this is already closed, the 076 * Subscriber will receive an error event with an IllegalStateException, and 077 * the returned {@link Disposable} will be a no-op. 078 * 079 * @param subscriber the listener to register 080 * @return a disposable that can be used to cancel this subscription 081 * @since 4.0.0 082 */ 083 Disposable subscribe(Subscriber<? super V> subscriber); 084 085 /** 086 * Return whether or not this Publisher has any subscribers. 087 * 088 * <p>In a concurrent environment, this value could change from the time 089 * of calling. 090 * 091 * @return if there are subscribers 092 * @since 4.0.0 093 */ 094 boolean hasSubscribers(); 095 096 /** 097 * Create a new publisher that will transform events published. 098 * 099 * @param mapper transformer function 100 * @param <R> output value type 101 * @return a new publisher 102 * @since 4.0.0 103 */ 104 default <R> Publisher<R> map(CheckedFunction<? super V, ? extends R, TransactionFailedException> mapper) { 105 return new MappedProcessor<>(mapper, this); 106 } 107 108 /** 109 * Return a publisher that will track its most recent value. The provided 110 * processor won't have a value until one is submitted to this publisher. 111 * 112 * @return a publisher based on this one 113 * @since 4.0.0 114 */ 115 default Cached<V> cache() { 116 return cache(null); 117 } 118 119 /** 120 * Create a cached publisher with an initial value. 121 * 122 * @param initialValue value to initialize the returned publisher with 123 * @return publisher that will cache future responses 124 * @since 4.0.0 125 */ 126 default Cached<V> cache(@Nullable V initialValue) { 127 return new CachedPublisher<>(this, initialValue); 128 } 129 130 /** 131 * Get the executor used to handle published events. 132 * 133 * @return the executor 134 * @since 4.0.0 135 */ 136 Executor executor(); 137 138 /** 139 * A publisher that caches the last value received. 140 * 141 * @param <V> value type 142 * @since 4.0.0 143 */ 144 interface Cached<V> extends Publisher<V> { 145 146 /** 147 * Get the last cached value. 148 * 149 * @return latest cached value 150 * @since 4.0.0 151 */ 152 V get(); 153 154 /** 155 * Directly submit a value to be the current cached value. 156 * 157 * <p>This will be passed on to downstream subscribers.</p> 158 * 159 * @param value new value 160 * @since 4.0.0 161 */ 162 void submit(V value); 163 164 } 165 166}