001/* 002 * Configurate 003 * Copyright (C) zml and Configurate contributors 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.spongepowered.configurate.reactive; 018 019import org.checkerframework.checker.nullness.qual.Nullable; 020import org.spongepowered.configurate.util.CheckedFunction; 021 022import java.util.concurrent.Executor; 023import java.util.concurrent.ForkJoinPool; 024 025/** 026 * A combination of an {@link Publisher} and {@link Subscriber}. 027 * 028 * <p>Processors are expected to broadcast their submitted values to any 029 * registered observers, though filtering or other transformations may 030 * be applied. 031 * 032 * <p>Submitting a completion event to the processor will result in a completion 033 * event being passed to every subscriber, and the rejection of further events 034 * being submitted. 035 * 036 * @param <I> the type observed 037 * @param <O> the type produced 038 * @since 4.0.0 039 */ 040public interface Processor<I, O> extends Publisher<O>, Subscriber<I> { 041 042 /** 043 * Create a {@link Processor} instance that simply broadcasts submitted 044 * values to its subscribers. Broadcasts will occur on the 045 * common {@link ForkJoinPool}. 046 * 047 * @param <V> the type 048 * @return a new processor instance 049 * @since 4.0.0 050 */ 051 static <V> Processor.Iso<V> create() { 052 return create(ForkJoinPool.commonPool()); 053 } 054 055 /** 056 * Create a {@link Processor} instance that simply broadcasts submitted 057 * values to its subscribers. 058 * 059 * @param <V> the type 060 * @param executor task executor 061 * @return a new processor instance 062 * @since 4.0.0 063 */ 064 static <V> Processor.Iso<V> create(Executor executor) { 065 return new ProcessorImpl<>(executor); 066 } 067 068 /** 069 * Create a processor instance that is aware of transactions. 070 * 071 * @param <V> the value type 072 * @return a new transactional processor 073 * @since 4.0.0 074 */ 075 static <V> Processor.TransactionalIso<V> createTransactional() { 076 return createTransactional(ForkJoinPool.commonPool()); 077 } 078 079 /** 080 * Create a processor instance that is aware of transactions. 081 * 082 * <p>Operations will be submitted to the provided executor.</p> 083 * 084 * @param exec executor to run operations on 085 * @param <V> the value type 086 * @return a new transactional processor 087 * @since 4.0.0 088 */ 089 static <V> Processor.TransactionalIso<V> createTransactional(final Executor exec) { 090 return new TransactionalProcessorImpl<>(exec); 091 } 092 093 /** 094 * {@inheritDoc} 095 */ 096 @Override 097 default <R> Processor<O, R> map(CheckedFunction<? super O, ? extends R, TransactionFailedException> mapper) { 098 return new MappedProcessor<>(mapper, this); 099 } 100 101 /** 102 * Submit an element of the observed type, bypassing any mapping this 103 * Processor may do. If the input type of this processor equals the output 104 * type, this is equivalent to {@link #submit(Object)} 105 * 106 * @param element the element to submit 107 * @since 4.0.0 108 */ 109 void inject(O element); 110 111 /** 112 * Provide a {@link Subscriber} that will handle events submitted to this 113 * processor, but only if no other subscription is active. 114 * 115 * @param subscriber the fallback subscriber to add. Provide {@code null} to 116 * remove the handler 117 * @since 4.0.0 118 */ 119 void fallbackHandler(@Nullable Subscriber<O> subscriber); 120 121 /** 122 * Close this processor if there are no remaining subscriptions. Any signals 123 * that have already been submitted will be processed. 124 * 125 * <p>Any call to this method after the {@link Processor} has been closed 126 * will simply return true. 127 * 128 * @return true if there are no subscribers and this processor is closed 129 * @since 4.0.0 130 */ 131 boolean closeIfUnsubscribed(); 132 133 /** 134 * A Processor that has the same type for inputs and outputs. 135 * 136 * @param <V> the input and output type 137 * @since 4.0.0 138 */ 139 interface Iso<V> extends Processor<V, V> { 140 @Override 141 default void inject(V element) { 142 submit(element); 143 } 144 } 145 146 /** 147 * A processor that supports transactions. 148 * 149 * @param <I> input type 150 * @param <O> output type 151 * @since 4.0.0 152 */ 153 interface Transactional<I, O> extends Processor<I, O>, Publisher<O>, TransactionalSubscriber<I> { 154 } 155 156 /** 157 * A processor that supports transactions using the same input and outputs. 158 * 159 * @param <V> input/output type 160 * @since 4.0.0 161 */ 162 interface TransactionalIso<V> extends Transactional<V, V>, Iso<V> { 163 164 } 165 166}