001/* 002 * Configurate 003 * Copyright (C) zml and Configurate contributors 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package ninja.leaping.configurate.reactive; 018 019import org.checkerframework.checker.nullness.qual.Nullable; 020import ninja.leaping.configurate.util.CheckedFunction; 021 022import java.util.concurrent.Executor; 023import java.util.concurrent.ForkJoinPool; 024 025/** 026 * A combination of an {@link Publisher} and {@link Subscriber}. 027 * <p> 028 * Processors are expected to broadcast their submitted values to any registered observers, though 029 * filtering or other transformations may be applied. 030 * <p> 031 * Submitting a completion event to the processor will result in a completion event being passed to 032 * every subscriber, and the rejection of further events being submitted. 033 * 034 * @param <I> The type observed 035 * @param <O> The type produced 036 */ 037public interface Processor<I, O> extends Publisher<O>, Subscriber<I> { 038 /** 039 * Create a {@link Processor} instance that simply broadcasts submitted values to its 040 * subscribers. Broadcasts will occur on the common {@link ForkJoinPool}. 041 * 042 * @param <V> The type 043 * @return A new processor instance 044 */ 045 static <V> Processor.Iso<V> create() { 046 return create(ForkJoinPool.commonPool()); 047 } 048 049 /** 050 * Create a processor instance that is aware of transactions 051 * 052 * @param <V> The value type 053 * @return a new transactional processor 054 */ 055 static <V> TransactionalIso<V> createTransactional() { 056 return createTransactional(ForkJoinPool.commonPool()); 057 } 058 059 /** 060 * Create a {@link Processor} instance that simply broadcasts submitted values to its 061 * subscribers 062 * 063 * @param <V> The type 064 * @param executor task executor 065 * @return A new processor instance 066 */ 067 static <V> Processor.Iso<V> create(Executor executor) { 068 return new ProcessorImpl<>(executor); 069 } 070 071 static <V> TransactionalIso<V> createTransactional(Executor exec) { 072 return new TransactionalProcessorImpl<>(exec); 073 } 074 075 /** 076 * {@inheritDoc} 077 */ 078 @Override 079 default <R> Processor<O, R> map(CheckedFunction<? super O, ? extends R, TransactionFailedException> mapper) { 080 return new MappedProcessor<>(mapper, this); 081 } 082 083 /** 084 * Submit an element of the observed type, bypassing any mapping this Processor may do. If the 085 * input type of this processor equals the output type, this is equivalent to {@link 086 * #submit(Object)} 087 * 088 * @param element The element to submit 089 */ 090 void inject(O element); 091 092 /** 093 * Provide a {@link Subscriber} that will handle events submitted to this processor, but only if 094 * no other subscription is active. 095 * 096 * @param subscriber The fallback subscriber to add. Provide {@code null} to remove the handler 097 */ 098 void setFallbackHandler(@Nullable Subscriber<O> subscriber); 099 100 /** 101 * Close this processor if there are no remaining subscriptions. Any signals that have already 102 * been submitted will be processed. 103 * <p> 104 * Any call to this method after the {@link Processor} has been closed will simply return true. 105 * 106 * @return true if there are no subscribers and this processor is closed 107 */ 108 boolean closeIfUnsubscribed(); 109 110 /** 111 * A Processor that has the same type for inputs and outputs 112 * 113 * @param <V> The input and output type 114 */ 115 interface Iso<V> extends Processor<V, V> { 116 @Override 117 default void inject(V element) { 118 submit(element); 119 } 120 } 121 122 interface Transactional<I, O> extends Processor<I, O>, Publisher<O>, ninja.leaping.configurate.reactive.TransactionalSubscriber<I> { 123 } 124 125 interface TransactionalIso<V> extends Transactional<V, V>, Iso<V> { 126 127 } 128 129}