001/* 002 * Configurate 003 * Copyright (C) zml and Configurate contributors 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package ninja.leaping.configurate.reference; 018 019import ninja.leaping.configurate.ConfigurationNode; 020import org.checkerframework.checker.nullness.qual.Nullable; 021import ninja.leaping.configurate.loader.ConfigurationLoader; 022import ninja.leaping.configurate.reactive.Disposable; 023import ninja.leaping.configurate.reactive.Subscriber; 024 025import java.io.IOException; 026import java.nio.file.ClosedWatchServiceException; 027import java.nio.file.FileSystem; 028import java.nio.file.FileSystems; 029import java.nio.file.Files; 030import java.nio.file.Path; 031import java.nio.file.StandardWatchEventKinds; 032import java.nio.file.WatchEvent; 033import java.nio.file.WatchKey; 034import java.nio.file.WatchService; 035import java.util.HashSet; 036import java.util.Set; 037import java.util.concurrent.ConcurrentHashMap; 038import java.util.concurrent.Executor; 039import java.util.concurrent.ForkJoinPool; 040import java.util.concurrent.ThreadFactory; 041import java.util.function.Function; 042 043import static java.util.Objects.requireNonNull; 044 045/** 046 * A wrapper around NIO's {@link WatchService} that uses the provided event loop to poll for changes, and calls 047 * listeners once an event occurs. 048 * <p> 049 * Some deduplication is performed because Windows can be fairly spammy with its events, so one callback may receive 050 * multiple events at one time. 051 * <p> 052 * Callback functions are {@link Subscriber Subscribers} that take the {@link WatchEvent} as their parameter. 053 * <p> 054 * Listening to a directory provides updates on the directory's immediate children, but does not 055 */ 056public class WatchServiceListener implements AutoCloseable { 057 @SuppressWarnings("rawtypes") // IntelliJ says it's unnecessary, but the compiler shows warnings 058 private static final WatchEvent.Kind<?>[] DEFAULT_WATCH_EVENTS = new WatchEvent.Kind[]{StandardWatchEventKinds.OVERFLOW, 059 StandardWatchEventKinds.ENTRY_CREATE, StandardWatchEventKinds.ENTRY_DELETE, StandardWatchEventKinds.ENTRY_MODIFY}; 060 private static final int PARALLEL_THRESHOLD = 100; 061 private static final ThreadFactory DEFAULT_THREAD_FACTORY = new PrefixedNameThreadFactory("Configurate-WatchService", true); 062 063 private final WatchService watchService; 064 private volatile boolean open = true; 065 private final Thread executor; 066 final Executor taskExecutor; 067 private final ConcurrentHashMap<Path, DirectoryListenerRegistration> activeListeners = new ConcurrentHashMap<>(); 068 private static final ThreadLocal<IOException> exceptionHolder = new ThreadLocal<>(); 069 070 /** 071 * Create a new builder for a WatchServiceListener to create a customized listener 072 * 073 * @return A builder 074 */ 075 public static Builder builder() { 076 return new Builder(); 077 } 078 079 /** 080 * Create a new {@link WatchServiceListener} using a new cached thread pool executor and the default filesystem. 081 * 082 * @return A new instance with default values 083 * @throws IOException If a watch service cannot be created 084 * @see #builder() for customization 085 */ 086 public static WatchServiceListener create() throws IOException { 087 return new WatchServiceListener(DEFAULT_THREAD_FACTORY, FileSystems.getDefault(), ForkJoinPool.commonPool()); 088 } 089 090 private WatchServiceListener(ThreadFactory factory, FileSystem fileSystem, Executor taskExecutor) throws IOException { 091 this.watchService = fileSystem.newWatchService(); 092 this.executor = factory.newThread(() -> { 093 while (open) { 094 WatchKey key; 095 try { 096 key = watchService.take(); 097 } catch (InterruptedException | ClosedWatchServiceException e) { 098 break; 099 } 100 Path watched = (Path) key.watchable(); 101 DirectoryListenerRegistration registration = activeListeners.get(watched); 102 if (registration != null) { 103 final Set<Object> seenContexts = new HashSet<>(); 104 for (WatchEvent<?> event : key.pollEvents()) { 105 if (!key.isValid()) { 106 break; 107 } 108 109 if (!seenContexts.add(event.context())) { 110 continue; 111 } 112 113 // Process listeners 114 registration.submit(event); 115 if (registration.closeIfEmpty()) { 116 key.cancel(); 117 break; 118 } 119 } 120 121 // If the watch key is no longer valid, send all listeners a close event 122 if (!key.reset()) { 123 DirectoryListenerRegistration oldListeners = activeListeners.remove(watched); 124 oldListeners.onClose(); 125 } 126 } 127 } 128 }); 129 this.taskExecutor = taskExecutor; 130 this.executor.start(); 131 } 132 133 /** 134 * Gets or creates a registration holder for a specific directory. This handles registering with the watch service 135 * if necessary. 136 * 137 * @param directory The directory to listen to 138 * @return A registration, created new if necessary. 139 * @throws IOException If produced while registering the path with our WatchService 140 */ 141 private DirectoryListenerRegistration getRegistration(Path directory) throws IOException { 142 @Nullable DirectoryListenerRegistration reg = activeListeners.computeIfAbsent(directory, dir -> { 143 try { 144 return new DirectoryListenerRegistration(dir.register(watchService, DEFAULT_WATCH_EVENTS), this.taskExecutor); 145 } catch (IOException ex) { 146 exceptionHolder.set(ex); 147 return null; 148 } 149 }); 150 151 if (reg == null) { 152 throw exceptionHolder.get(); 153 } 154 return reg; 155 } 156 157 /** 158 * Listen for changes to a specific file or directory. 159 * 160 * @param file The path of the file or directory to listen for changes on. 161 * @param callback A callback function that will be called when changes are made. If return value is false, we will 162 * stop monitoring for changes. 163 * @return A {@link Disposable} that can be used to cancel this subscription 164 * @throws IOException if a filesystem error occurs. 165 * @throws IllegalArgumentException if the provided path is a directory. 166 */ 167 public Disposable listenToFile(Path file, Subscriber<WatchEvent<?>> callback) throws IOException, IllegalArgumentException { 168 file = file.toAbsolutePath(); 169 if (Files.isDirectory(file)) { 170 throw new IllegalArgumentException("Path " + file + " must be a file"); 171 } 172 173 Path fileName = file.getFileName(); 174 return getRegistration(file.getParent()).subscribe(fileName, callback); 175 } 176 177 /** 178 * Listen to a directory. Callbacks will receive events both for the directory and for its contents. 179 * 180 * @param directory The directory to listen to 181 * @param callback A callback function that will be called when changes are made. If return value is false, we will 182 * stop monitoring for changes. 183 * @return A {@link Disposable} that can be used to cancel this subscription 184 * @throws IOException When an error occurs registering with the underlying watch service. 185 * @throws IllegalArgumentException If the provided path is not a directory 186 */ 187 public Disposable listenToDirectory(Path directory, Subscriber<WatchEvent<?>> callback) throws IOException, IllegalArgumentException { 188 directory = directory.toAbsolutePath(); 189 if (!(Files.isDirectory(directory) || !Files.exists(directory))) { 190 throw new IllegalArgumentException("Path " + directory + " must be a directory"); 191 } 192 193 return getRegistration(directory).subscribe(callback); 194 } 195 196 public <N extends ConfigurationNode> ConfigurationReference<N> listenToConfiguration(Function<Path, ConfigurationLoader<? extends N>> loaderFunc, Path path) throws IOException { 197 return ConfigurationReference.createWatching(loaderFunc, path, this); 198 } 199 200 @Override 201 public void close() throws IOException { 202 open = false; 203 watchService.close(); 204 activeListeners.forEachValue(PARALLEL_THRESHOLD, DirectoryListenerRegistration::onClose); 205 activeListeners.clear(); 206 try { 207 this.executor.join(); 208 } catch (InterruptedException e) { 209 throw new IOException("Failed to await termination of executor thread!"); 210 } 211 } 212 213 /** 214 * Set the parameters needed to create a {@link WatchServiceListener}. All params are optional and defaults will be 215 * used if no values are specified. 216 */ 217 public static class Builder { 218 private @Nullable ThreadFactory threadFactory; 219 private @Nullable FileSystem fileSystem; 220 private @Nullable Executor taskExecutor; 221 222 private Builder() { 223 224 } 225 226 /** 227 * Set the thread factory that will be used to create the polling thread for this watch service 228 * 229 * @param factory The thread factory to create the deamon thread 230 * @return this 231 */ 232 public Builder setThreadFactory(ThreadFactory factory) { 233 this.threadFactory = requireNonNull(factory, "factory"); 234 return this; 235 } 236 237 /** 238 * Set the executor that will be used to execute tasks queued based on received events. By default, the {@link 239 * ForkJoinPool#commonPool() common pool} is used. 240 * 241 * @param executor The executor to use 242 * @return this 243 */ 244 public Builder setTaskExecutor(Executor executor) { 245 this.taskExecutor = requireNonNull(executor, "executor"); 246 return this; 247 } 248 249 /** 250 * Set the filesystem expected to be used for paths. A separate {@link WatchServiceListener} should be created 251 * to listen to events on a different file system. 252 * 253 * @param system The file system to use. 254 * @return this 255 */ 256 public Builder setFileSystem(FileSystem system) { 257 this.fileSystem = system; 258 return this; 259 } 260 261 /** 262 * Create a new listener, using default values for any unset parameters. 263 * 264 * @return A newly created executor 265 * @throws IOException if thrown by {@link WatchServiceListener}'s constructor 266 */ 267 public WatchServiceListener build() throws IOException { 268 if (threadFactory == null) { 269 threadFactory = DEFAULT_THREAD_FACTORY; 270 } 271 272 if (fileSystem == null) { 273 fileSystem = FileSystems.getDefault(); 274 } 275 276 if (taskExecutor == null) { 277 taskExecutor = ForkJoinPool.commonPool(); 278 } 279 280 return new WatchServiceListener(threadFactory, fileSystem, taskExecutor); 281 } 282 } 283}