001/*
002 * Configurate
003 * Copyright (C) zml and Configurate contributors
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 *    http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package ninja.leaping.configurate.reference;
018
019import ninja.leaping.configurate.ConfigurationNode;
020import org.checkerframework.checker.nullness.qual.Nullable;
021import ninja.leaping.configurate.loader.ConfigurationLoader;
022import ninja.leaping.configurate.reactive.Disposable;
023import ninja.leaping.configurate.reactive.Subscriber;
024
025import java.io.IOException;
026import java.nio.file.ClosedWatchServiceException;
027import java.nio.file.FileSystem;
028import java.nio.file.FileSystems;
029import java.nio.file.Files;
030import java.nio.file.Path;
031import java.nio.file.StandardWatchEventKinds;
032import java.nio.file.WatchEvent;
033import java.nio.file.WatchKey;
034import java.nio.file.WatchService;
035import java.util.HashSet;
036import java.util.Set;
037import java.util.concurrent.ConcurrentHashMap;
038import java.util.concurrent.Executor;
039import java.util.concurrent.ForkJoinPool;
040import java.util.concurrent.ThreadFactory;
041import java.util.function.Function;
042
043import static java.util.Objects.requireNonNull;
044
045/**
046 * A wrapper around NIO's {@link WatchService} that uses the provided event loop to poll for changes, and calls
047 * listeners once an event occurs.
048 * <p>
049 * Some deduplication is performed because Windows can be fairly spammy with its events, so one callback may receive
050 * multiple events at one time.
051 * <p>
052 * Callback functions are {@link Subscriber Subscribers} that take the {@link WatchEvent} as their parameter.
053 * <p>
054 * Listening to a directory provides updates on the directory's immediate children, but does not
055 */
056public class WatchServiceListener implements AutoCloseable {
057    @SuppressWarnings("rawtypes") // IntelliJ says it's unnecessary, but the compiler shows warnings
058    private static final WatchEvent.Kind<?>[] DEFAULT_WATCH_EVENTS = new WatchEvent.Kind[]{StandardWatchEventKinds.OVERFLOW,
059            StandardWatchEventKinds.ENTRY_CREATE, StandardWatchEventKinds.ENTRY_DELETE, StandardWatchEventKinds.ENTRY_MODIFY};
060    private static final int PARALLEL_THRESHOLD = 100;
061    private static final ThreadFactory DEFAULT_THREAD_FACTORY = new PrefixedNameThreadFactory("Configurate-WatchService", true);
062
063    private final WatchService watchService;
064    private volatile boolean open = true;
065    private final Thread executor;
066    final Executor taskExecutor;
067    private final ConcurrentHashMap<Path, DirectoryListenerRegistration> activeListeners = new ConcurrentHashMap<>();
068    private static final ThreadLocal<IOException> exceptionHolder = new ThreadLocal<>();
069
070    /**
071     * Create a new builder for a WatchServiceListener to create a customized listener
072     *
073     * @return A builder
074     */
075    public static Builder builder() {
076        return new Builder();
077    }
078
079    /**
080     * Create a new {@link WatchServiceListener} using a new cached thread pool executor and the default filesystem.
081     *
082     * @return A new instance with default values
083     * @throws IOException If a watch service cannot be created
084     * @see #builder() for customization
085     */
086    public static WatchServiceListener create() throws IOException {
087        return new WatchServiceListener(DEFAULT_THREAD_FACTORY, FileSystems.getDefault(), ForkJoinPool.commonPool());
088    }
089
090    private WatchServiceListener(ThreadFactory factory, FileSystem fileSystem, Executor taskExecutor) throws IOException {
091        this.watchService = fileSystem.newWatchService();
092        this.executor = factory.newThread(() -> {
093            while (open) {
094                WatchKey key;
095                try {
096                    key = watchService.take();
097                } catch (InterruptedException | ClosedWatchServiceException e) {
098                    break;
099                }
100                Path watched = (Path) key.watchable();
101                DirectoryListenerRegistration registration = activeListeners.get(watched);
102                if (registration != null) {
103                    final Set<Object> seenContexts = new HashSet<>();
104                    for (WatchEvent<?> event : key.pollEvents()) {
105                        if (!key.isValid()) {
106                            break;
107                        }
108
109                        if (!seenContexts.add(event.context())) {
110                            continue;
111                        }
112
113                        // Process listeners
114                        registration.submit(event);
115                        if (registration.closeIfEmpty()) {
116                            key.cancel();
117                            break;
118                        }
119                    }
120
121                    // If the watch key is no longer valid, send all listeners a close event
122                    if (!key.reset()) {
123                        DirectoryListenerRegistration oldListeners = activeListeners.remove(watched);
124                        oldListeners.onClose();
125                    }
126                }
127            }
128        });
129        this.taskExecutor = taskExecutor;
130        this.executor.start();
131    }
132
133    /**
134     * Gets or creates a registration holder for a specific directory. This handles registering with the watch service
135     * if necessary.
136     *
137     * @param directory The directory to listen to
138     * @return A registration, created new if necessary.
139     * @throws IOException If produced while registering the path with our WatchService
140     */
141    private DirectoryListenerRegistration getRegistration(Path directory) throws IOException {
142        @Nullable DirectoryListenerRegistration reg = activeListeners.computeIfAbsent(directory, dir -> {
143            try {
144                return new DirectoryListenerRegistration(dir.register(watchService, DEFAULT_WATCH_EVENTS), this.taskExecutor);
145            } catch (IOException ex) {
146                exceptionHolder.set(ex);
147                return null;
148            }
149        });
150
151        if (reg == null) {
152            throw exceptionHolder.get();
153        }
154        return reg;
155    }
156
157    /**
158     * Listen for changes to a specific file or directory.
159     *
160     * @param file     The path of the file or directory to listen for changes on.
161     * @param callback A callback function that will be called when changes are made. If return value is false, we will
162     *                 stop monitoring for changes.
163     * @return A {@link Disposable} that can be used to cancel this subscription
164     * @throws IOException              if a filesystem error occurs.
165     * @throws IllegalArgumentException if the provided path is a directory.
166     */
167    public Disposable listenToFile(Path file, Subscriber<WatchEvent<?>> callback) throws IOException, IllegalArgumentException {
168        file = file.toAbsolutePath();
169        if (Files.isDirectory(file)) {
170            throw new IllegalArgumentException("Path " + file + " must be a file");
171        }
172
173        Path fileName = file.getFileName();
174        return getRegistration(file.getParent()).subscribe(fileName, callback);
175    }
176
177    /**
178     * Listen to a directory. Callbacks will receive events both for the directory and for its contents.
179     *
180     * @param directory The directory to listen to
181     * @param callback  A callback function that will be called when changes are made. If return value is false, we will
182     *                  stop monitoring for changes.
183     * @return A {@link Disposable} that can be used to cancel this subscription
184     * @throws IOException              When an error occurs registering with the underlying watch service.
185     * @throws IllegalArgumentException If the provided path is not a directory
186     */
187    public Disposable listenToDirectory(Path directory, Subscriber<WatchEvent<?>> callback) throws IOException, IllegalArgumentException {
188        directory = directory.toAbsolutePath();
189        if (!(Files.isDirectory(directory) || !Files.exists(directory))) {
190            throw new IllegalArgumentException("Path " + directory + " must be a directory");
191        }
192
193        return getRegistration(directory).subscribe(callback);
194    }
195
196    public <N extends ConfigurationNode> ConfigurationReference<N> listenToConfiguration(Function<Path, ConfigurationLoader<? extends N>> loaderFunc, Path path) throws IOException {
197        return ConfigurationReference.createWatching(loaderFunc, path, this);
198    }
199
200    @Override
201    public void close() throws IOException {
202        open = false;
203        watchService.close();
204        activeListeners.forEachValue(PARALLEL_THRESHOLD, DirectoryListenerRegistration::onClose);
205        activeListeners.clear();
206        try {
207            this.executor.join();
208        } catch (InterruptedException e) {
209            throw new IOException("Failed to await termination of executor thread!");
210        }
211    }
212
213    /**
214     * Set the parameters needed to create a {@link WatchServiceListener}. All params are optional and defaults will be
215     * used if no values are specified.
216     */
217    public static class Builder {
218        private @Nullable ThreadFactory threadFactory;
219        private @Nullable FileSystem fileSystem;
220        private @Nullable Executor taskExecutor;
221
222        private Builder() {
223
224        }
225
226        /**
227         * Set the thread factory that will be used to create the polling thread for this watch service
228         *
229         * @param factory The thread factory to create the deamon thread
230         * @return this
231         */
232        public Builder setThreadFactory(ThreadFactory factory) {
233            this.threadFactory = requireNonNull(factory, "factory");
234            return this;
235        }
236
237        /**
238         * Set the executor that will be used to execute tasks queued based on received events. By default, the {@link
239         * ForkJoinPool#commonPool() common pool} is used.
240         *
241         * @param executor The executor to use
242         * @return this
243         */
244        public Builder setTaskExecutor(Executor executor) {
245            this.taskExecutor = requireNonNull(executor, "executor");
246            return this;
247        }
248
249        /**
250         * Set the filesystem expected to be used for paths. A separate {@link WatchServiceListener} should be created
251         * to listen to events on a different file system.
252         *
253         * @param system The file system to use.
254         * @return this
255         */
256        public Builder setFileSystem(FileSystem system) {
257            this.fileSystem = system;
258            return this;
259        }
260
261        /**
262         * Create a new listener, using default values for any unset parameters.
263         *
264         * @return A newly created executor
265         * @throws IOException if thrown by {@link WatchServiceListener}'s constructor
266         */
267        public WatchServiceListener build() throws IOException {
268            if (threadFactory == null) {
269                threadFactory = DEFAULT_THREAD_FACTORY;
270            }
271
272            if (fileSystem == null) {
273                fileSystem = FileSystems.getDefault();
274            }
275
276            if (taskExecutor == null) {
277                taskExecutor = ForkJoinPool.commonPool();
278            }
279
280            return new WatchServiceListener(threadFactory, fileSystem, taskExecutor);
281        }
282    }
283}