001/*
002 * Configurate
003 * Copyright (C) zml and Configurate contributors
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 *    http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.spongepowered.configurate.reference;
018
019import static java.util.Objects.requireNonNull;
020
021import org.checkerframework.checker.nullness.qual.Nullable;
022import org.spongepowered.configurate.ConfigurateException;
023import org.spongepowered.configurate.ScopedConfigurationNode;
024import org.spongepowered.configurate.loader.ConfigurationLoader;
025import org.spongepowered.configurate.reactive.Disposable;
026import org.spongepowered.configurate.reactive.Subscriber;
027
028import java.io.IOException;
029import java.nio.file.ClosedWatchServiceException;
030import java.nio.file.FileSystem;
031import java.nio.file.FileSystems;
032import java.nio.file.Files;
033import java.nio.file.Path;
034import java.nio.file.StandardWatchEventKinds;
035import java.nio.file.WatchEvent;
036import java.nio.file.WatchKey;
037import java.nio.file.WatchService;
038import java.util.HashSet;
039import java.util.Set;
040import java.util.concurrent.ConcurrentHashMap;
041import java.util.concurrent.Executor;
042import java.util.concurrent.ForkJoinPool;
043import java.util.concurrent.ThreadFactory;
044import java.util.function.Function;
045
046/**
047 * A wrapper around NIO's {@link WatchService} that uses the provided watch key
048 * to poll for changes, and calls listeners once an event occurs.
049 *
050 * <p>Some deduplication is performed because Windows can be fairly spammy with
051 * its events, so one callback may receive multiple events at one time.</p>
052 *
053 * <p>Callback functions are {@link Subscriber Subscribers} that take the
054 * {@link WatchEvent} as their parameter.</p>
055 *
056 * <p>Listening to a directory provides updates on the directory's immediate
057 * children, but does not listen recursively.</p>
058 *
059 * @since 4.0.0
060 */
061public final class WatchServiceListener implements AutoCloseable {
062
063    @SuppressWarnings("rawtypes") // IntelliJ says it's unnecessary, but the compiler shows warnings
064    private static final WatchEvent.Kind<?>[] DEFAULT_WATCH_EVENTS = new WatchEvent.Kind[]{StandardWatchEventKinds.OVERFLOW,
065        StandardWatchEventKinds.ENTRY_CREATE, StandardWatchEventKinds.ENTRY_DELETE, StandardWatchEventKinds.ENTRY_MODIFY};
066    private static final int PARALLEL_THRESHOLD = 100;
067    private static final ThreadFactory DEFAULT_THREAD_FACTORY = new PrefixedNameThreadFactory("Configurate-WatchService", true);
068
069    private final WatchService watchService;
070    private volatile boolean open = true;
071    private final Thread executor;
072    final Executor taskExecutor;
073    private final ConcurrentHashMap<Path, DirectoryListenerRegistration> activeListeners = new ConcurrentHashMap<>();
074    private static final ThreadLocal<IOException> exceptionHolder = new ThreadLocal<>();
075
076    /**
077     * Returns a new builder for a WatchServiceListener to create a
078     * customized listener.
079     *
080     * @return a new builder
081     * @since 4.0.0
082     */
083    public static Builder builder() {
084        return new Builder();
085    }
086
087    /**
088     * Create a new {@link WatchServiceListener} using a new cached thread pool
089     * executor and the default filesystem.
090     *
091     * @return a new instance with default values
092     * @throws IOException if a watch service cannot be created
093     * @see #builder() for customization
094     * @since 4.0.0
095     */
096    public static WatchServiceListener create() throws IOException {
097        return new WatchServiceListener(DEFAULT_THREAD_FACTORY, FileSystems.getDefault(), ForkJoinPool.commonPool());
098    }
099
100    private WatchServiceListener(final ThreadFactory factory, final FileSystem fileSystem, final Executor taskExecutor) throws IOException {
101        this.watchService = fileSystem.newWatchService();
102        this.executor = factory.newThread(() -> {
103            while (this.open) {
104                final WatchKey key;
105                try {
106                    key = this.watchService.take();
107                } catch (final InterruptedException e) {
108                    this.open = false;
109                    Thread.currentThread().interrupt();
110                    break;
111                } catch (final ClosedWatchServiceException e) {
112                    break;
113                }
114                final Path watched = (Path) key.watchable();
115                final DirectoryListenerRegistration registration = this.activeListeners.get(watched);
116                if (registration != null) {
117                    final Set<Object> seenContexts = new HashSet<>();
118                    for (WatchEvent<?> event : key.pollEvents()) {
119                        if (!key.isValid()) {
120                            break;
121                        }
122
123                        if (!seenContexts.add(event.context())) {
124                            continue;
125                        }
126
127                        // Process listeners
128                        registration.submit(event);
129                        if (registration.closeIfEmpty()) {
130                            key.cancel();
131                            break;
132                        }
133                    }
134
135                    // If the watch key is no longer valid, send all listeners a close event
136                    if (!key.reset()) {
137                        final DirectoryListenerRegistration oldListeners = this.activeListeners.remove(watched);
138                        oldListeners.onClose();
139                    }
140                }
141                try {
142                    Thread.sleep(20);
143                } catch (final InterruptedException e) {
144                    Thread.currentThread().interrupt();
145                    break;
146                }
147            }
148        });
149        this.taskExecutor = taskExecutor;
150        this.executor.start();
151    }
152
153    /**
154     * Gets or creates a registration holder for a specific directory. This
155     * handles registering with the watch service if necessary.
156     *
157     * @param directory the directory to listen to
158     * @return a registration, created new if necessary.
159     * @throws ConfigurateException if produced while registering the path with
160     *          our WatchService
161     */
162    private DirectoryListenerRegistration registration(final Path directory) throws ConfigurateException {
163        final @Nullable DirectoryListenerRegistration reg = this.activeListeners.computeIfAbsent(directory, dir -> {
164            try {
165                return new DirectoryListenerRegistration(dir.register(this.watchService, DEFAULT_WATCH_EVENTS), this.taskExecutor);
166            } catch (final IOException ex) {
167                exceptionHolder.set(ex);
168                return null;
169            }
170        });
171
172        if (reg == null) {
173            throw new ConfigurateException("While adding listener for " + directory, exceptionHolder.get());
174        }
175        return reg;
176    }
177
178    /**
179     * Listen for changes to a specific file or directory.
180     *
181     * @param file the path of the file or directory to listen for changes on.
182     * @param callback a subscriber that will be notified when changes occur.
183     * @return a {@link Disposable} that can be used to cancel this subscription
184     * @throws ConfigurateException if a filesystem error occurs.
185     * @throws IllegalArgumentException if the provided path is a directory.
186     * @since 4.0.0
187     */
188    public Disposable listenToFile(Path file, final Subscriber<WatchEvent<?>> callback) throws ConfigurateException, IllegalArgumentException {
189        file = file.toAbsolutePath();
190        if (Files.isDirectory(file)) {
191            throw new IllegalArgumentException("Path " + file + " must be a file");
192        }
193
194        final Path fileName = file.getFileName();
195        return registration(file.getParent()).subscribe(fileName, callback);
196    }
197
198    /**
199     * Listen to a directory. Callbacks will receive events both for the
200     * directory and for its contents.
201     *
202     * @param directory the directory to listen to
203     * @param callback a subscriber that will be notified when changes occur.
204     * @return a {@link Disposable} that can be used to cancel this subscription
205     * @throws ConfigurateException when an error occurs registering with the
206     *                              underlying watch service.
207     * @throws IllegalArgumentException if the provided path is not a directory
208     * @since 4.0.0
209     */
210    public Disposable listenToDirectory(Path directory, final Subscriber<WatchEvent<?>> callback)
211            throws ConfigurateException, IllegalArgumentException {
212        directory = directory.toAbsolutePath();
213        if (!(Files.isDirectory(directory) || !Files.exists(directory))) {
214            throw new IllegalArgumentException("Path " + directory + " must be a directory");
215        }
216
217        return registration(directory).subscribe(callback);
218    }
219
220    /**
221     * Create a new {@link ConfigurationReference} subscribed to FS updates.
222     *
223     * @param loaderFunc function that will create a new loader
224     * @param path path to to for changes
225     * @param <N> node type
226     * @return new reference
227     * @throws ConfigurateException if unable to complete an initial load of
228     *      the configuration.
229     * @since 4.0.0
230     */
231    public <N extends ScopedConfigurationNode<N>> ConfigurationReference<N>
232        listenToConfiguration(final Function<Path, ConfigurationLoader<? extends N>> loaderFunc, final Path path) throws ConfigurateException {
233        return ConfigurationReference.watching(loaderFunc, path, this);
234    }
235
236    @Override
237    public void close() throws IOException {
238        this.open = false;
239        this.watchService.close();
240        this.activeListeners.forEachValue(PARALLEL_THRESHOLD, DirectoryListenerRegistration::onClose);
241        this.activeListeners.clear();
242        try {
243            this.executor.interrupt();
244            this.executor.join();
245        } catch (final InterruptedException e) {
246            throw new IOException("Failed to await termination of executor thread!");
247        }
248    }
249
250    /**
251     * Set the parameters needed to create a {@link WatchServiceListener}. All params are optional and defaults will be
252     * used if no values are specified.
253     *
254     * @since 4.0.0
255     */
256    public static final class Builder {
257
258        private @Nullable ThreadFactory threadFactory;
259        private @Nullable FileSystem fileSystem;
260        private @Nullable Executor taskExecutor;
261
262        private Builder() { }
263
264        /**
265         * Set the thread factory that will be used to create the polling thread
266         * for the returned watch service.
267         *
268         * @param factory the thread factory to use to create the deamon thread
269         * @return this builder
270         * @since 4.0.0
271         */
272        public Builder threadFactory(final ThreadFactory factory) {
273            this.threadFactory = requireNonNull(factory, "factory");
274            return this;
275        }
276
277        /**
278         * Set the executor that will be used to execute tasks queued based on
279         * received events. By default, the
280         * {@link ForkJoinPool#commonPool() common pool} is used.
281         *
282         * @param executor the executor to use
283         * @return this builder
284         * @since 4.0.0
285         */
286        public Builder taskExecutor(final Executor executor) {
287            this.taskExecutor = requireNonNull(executor, "executor");
288            return this;
289        }
290
291        /**
292         * Set the filesystem expected to be used for paths. A separate
293         * {@link WatchServiceListener} should be created to listen to events on
294         * each different file system.
295         *
296         * @param system the file system to use.
297         * @return this builder
298         * @since 4.0.0
299         */
300        public Builder fileSystem(final FileSystem system) {
301            this.fileSystem = system;
302            return this;
303        }
304
305        /**
306         * Create a new listener, using default values for any unset parameters.
307         *
308         * @return a newly created executor
309         * @throws IOException if thrown by {@link WatchServiceListener}'s constructor
310         * @since 4.0.0
311         */
312        public WatchServiceListener build() throws IOException {
313            if (this.threadFactory == null) {
314                this.threadFactory = DEFAULT_THREAD_FACTORY;
315            }
316
317            if (this.fileSystem == null) {
318                this.fileSystem = FileSystems.getDefault();
319            }
320
321            if (this.taskExecutor == null) {
322                this.taskExecutor = ForkJoinPool.commonPool();
323            }
324
325            return new WatchServiceListener(this.threadFactory, this.fileSystem, this.taskExecutor);
326        }
327
328    }
329
330}