001/* 002 * Configurate 003 * Copyright (C) zml and Configurate contributors 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.spongepowered.configurate.reference; 018 019import static java.util.Objects.requireNonNull; 020 021import org.checkerframework.checker.nullness.qual.Nullable; 022import org.spongepowered.configurate.ConfigurateException; 023import org.spongepowered.configurate.ScopedConfigurationNode; 024import org.spongepowered.configurate.loader.ConfigurationLoader; 025import org.spongepowered.configurate.reactive.Disposable; 026import org.spongepowered.configurate.reactive.Subscriber; 027 028import java.io.IOException; 029import java.nio.file.ClosedWatchServiceException; 030import java.nio.file.FileSystem; 031import java.nio.file.FileSystems; 032import java.nio.file.Files; 033import java.nio.file.Path; 034import java.nio.file.StandardWatchEventKinds; 035import java.nio.file.WatchEvent; 036import java.nio.file.WatchKey; 037import java.nio.file.WatchService; 038import java.util.HashSet; 039import java.util.Set; 040import java.util.concurrent.ConcurrentHashMap; 041import java.util.concurrent.Executor; 042import java.util.concurrent.ForkJoinPool; 043import java.util.concurrent.ThreadFactory; 044import java.util.function.Function; 045 046/** 047 * A wrapper around NIO's {@link WatchService} that uses the provided watch key 048 * to poll for changes, and calls listeners once an event occurs. 049 * 050 * <p>Some deduplication is performed because Windows can be fairly spammy with 051 * its events, so one callback may receive multiple events at one time.</p> 052 * 053 * <p>Callback functions are {@link Subscriber Subscribers} that take the 054 * {@link WatchEvent} as their parameter.</p> 055 * 056 * <p>Listening to a directory provides updates on the directory's immediate 057 * children, but does not listen recursively.</p> 058 * 059 * @since 4.0.0 060 */ 061public final class WatchServiceListener implements AutoCloseable { 062 063 @SuppressWarnings("rawtypes") // IntelliJ says it's unnecessary, but the compiler shows warnings 064 private static final WatchEvent.Kind<?>[] DEFAULT_WATCH_EVENTS = new WatchEvent.Kind[]{StandardWatchEventKinds.OVERFLOW, 065 StandardWatchEventKinds.ENTRY_CREATE, StandardWatchEventKinds.ENTRY_DELETE, StandardWatchEventKinds.ENTRY_MODIFY}; 066 private static final int PARALLEL_THRESHOLD = 100; 067 private static final ThreadFactory DEFAULT_THREAD_FACTORY = new PrefixedNameThreadFactory("Configurate-WatchService", true); 068 069 private final WatchService watchService; 070 private volatile boolean open = true; 071 private final Thread executor; 072 final Executor taskExecutor; 073 private final ConcurrentHashMap<Path, DirectoryListenerRegistration> activeListeners = new ConcurrentHashMap<>(); 074 private static final ThreadLocal<IOException> exceptionHolder = new ThreadLocal<>(); 075 076 /** 077 * Returns a new builder for a WatchServiceListener to create a 078 * customized listener. 079 * 080 * @return a new builder 081 * @since 4.0.0 082 */ 083 public static Builder builder() { 084 return new Builder(); 085 } 086 087 /** 088 * Create a new {@link WatchServiceListener} using a new cached thread pool 089 * executor and the default filesystem. 090 * 091 * @return a new instance with default values 092 * @throws IOException if a watch service cannot be created 093 * @see #builder() for customization 094 * @since 4.0.0 095 */ 096 public static WatchServiceListener create() throws IOException { 097 return new WatchServiceListener(DEFAULT_THREAD_FACTORY, FileSystems.getDefault(), ForkJoinPool.commonPool()); 098 } 099 100 private WatchServiceListener(final ThreadFactory factory, final FileSystem fileSystem, final Executor taskExecutor) throws IOException { 101 this.watchService = fileSystem.newWatchService(); 102 this.executor = factory.newThread(() -> { 103 while (this.open) { 104 final WatchKey key; 105 try { 106 key = this.watchService.take(); 107 } catch (final InterruptedException e) { 108 this.open = false; 109 Thread.currentThread().interrupt(); 110 break; 111 } catch (final ClosedWatchServiceException e) { 112 break; 113 } 114 final Path watched = (Path) key.watchable(); 115 final DirectoryListenerRegistration registration = this.activeListeners.get(watched); 116 if (registration != null) { 117 final Set<Object> seenContexts = new HashSet<>(); 118 for (WatchEvent<?> event : key.pollEvents()) { 119 if (!key.isValid()) { 120 break; 121 } 122 123 if (!seenContexts.add(event.context())) { 124 continue; 125 } 126 127 // Process listeners 128 registration.submit(event); 129 if (registration.closeIfEmpty()) { 130 key.cancel(); 131 break; 132 } 133 } 134 135 // If the watch key is no longer valid, send all listeners a close event 136 if (!key.reset()) { 137 final DirectoryListenerRegistration oldListeners = this.activeListeners.remove(watched); 138 oldListeners.onClose(); 139 } 140 } 141 try { 142 Thread.sleep(20); 143 } catch (final InterruptedException e) { 144 Thread.currentThread().interrupt(); 145 break; 146 } 147 } 148 }); 149 this.taskExecutor = taskExecutor; 150 this.executor.start(); 151 } 152 153 /** 154 * Gets or creates a registration holder for a specific directory. This 155 * handles registering with the watch service if necessary. 156 * 157 * @param directory the directory to listen to 158 * @return a registration, created new if necessary. 159 * @throws ConfigurateException if produced while registering the path with 160 * our WatchService 161 */ 162 private DirectoryListenerRegistration registration(final Path directory) throws ConfigurateException { 163 final @Nullable DirectoryListenerRegistration reg = this.activeListeners.computeIfAbsent(directory, dir -> { 164 try { 165 return new DirectoryListenerRegistration(dir.register(this.watchService, DEFAULT_WATCH_EVENTS), this.taskExecutor); 166 } catch (final IOException ex) { 167 exceptionHolder.set(ex); 168 return null; 169 } 170 }); 171 172 if (reg == null) { 173 throw new ConfigurateException("While adding listener for " + directory, exceptionHolder.get()); 174 } 175 return reg; 176 } 177 178 /** 179 * Listen for changes to a specific file or directory. 180 * 181 * @param file the path of the file or directory to listen for changes on. 182 * @param callback a subscriber that will be notified when changes occur. 183 * @return a {@link Disposable} that can be used to cancel this subscription 184 * @throws ConfigurateException if a filesystem error occurs. 185 * @throws IllegalArgumentException if the provided path is a directory. 186 * @since 4.0.0 187 */ 188 public Disposable listenToFile(Path file, final Subscriber<WatchEvent<?>> callback) throws ConfigurateException, IllegalArgumentException { 189 file = file.toAbsolutePath(); 190 if (Files.isDirectory(file)) { 191 throw new IllegalArgumentException("Path " + file + " must be a file"); 192 } 193 194 final Path fileName = file.getFileName(); 195 return registration(file.getParent()).subscribe(fileName, callback); 196 } 197 198 /** 199 * Listen to a directory. Callbacks will receive events both for the 200 * directory and for its contents. 201 * 202 * @param directory the directory to listen to 203 * @param callback a subscriber that will be notified when changes occur. 204 * @return a {@link Disposable} that can be used to cancel this subscription 205 * @throws ConfigurateException when an error occurs registering with the 206 * underlying watch service. 207 * @throws IllegalArgumentException if the provided path is not a directory 208 * @since 4.0.0 209 */ 210 public Disposable listenToDirectory(Path directory, final Subscriber<WatchEvent<?>> callback) 211 throws ConfigurateException, IllegalArgumentException { 212 directory = directory.toAbsolutePath(); 213 if (!(Files.isDirectory(directory) || !Files.exists(directory))) { 214 throw new IllegalArgumentException("Path " + directory + " must be a directory"); 215 } 216 217 return registration(directory).subscribe(callback); 218 } 219 220 /** 221 * Create a new {@link ConfigurationReference} subscribed to FS updates. 222 * 223 * @param loaderFunc function that will create a new loader 224 * @param path path to to for changes 225 * @param <N> node type 226 * @return new reference 227 * @throws ConfigurateException if unable to complete an initial load of 228 * the configuration. 229 * @since 4.0.0 230 */ 231 public <N extends ScopedConfigurationNode<N>> ConfigurationReference<N> 232 listenToConfiguration(final Function<Path, ConfigurationLoader<? extends N>> loaderFunc, final Path path) throws ConfigurateException { 233 return ConfigurationReference.watching(loaderFunc, path, this); 234 } 235 236 @Override 237 public void close() throws IOException { 238 this.open = false; 239 this.watchService.close(); 240 this.activeListeners.forEachValue(PARALLEL_THRESHOLD, DirectoryListenerRegistration::onClose); 241 this.activeListeners.clear(); 242 try { 243 this.executor.interrupt(); 244 this.executor.join(); 245 } catch (final InterruptedException e) { 246 throw new IOException("Failed to await termination of executor thread!"); 247 } 248 } 249 250 /** 251 * Set the parameters needed to create a {@link WatchServiceListener}. All params are optional and defaults will be 252 * used if no values are specified. 253 * 254 * @since 4.0.0 255 */ 256 public static final class Builder { 257 258 private @Nullable ThreadFactory threadFactory; 259 private @Nullable FileSystem fileSystem; 260 private @Nullable Executor taskExecutor; 261 262 private Builder() { } 263 264 /** 265 * Set the thread factory that will be used to create the polling thread 266 * for the returned watch service. 267 * 268 * @param factory the thread factory to use to create the deamon thread 269 * @return this builder 270 * @since 4.0.0 271 */ 272 public Builder threadFactory(final ThreadFactory factory) { 273 this.threadFactory = requireNonNull(factory, "factory"); 274 return this; 275 } 276 277 /** 278 * Set the executor that will be used to execute tasks queued based on 279 * received events. By default, the 280 * {@link ForkJoinPool#commonPool() common pool} is used. 281 * 282 * @param executor the executor to use 283 * @return this builder 284 * @since 4.0.0 285 */ 286 public Builder taskExecutor(final Executor executor) { 287 this.taskExecutor = requireNonNull(executor, "executor"); 288 return this; 289 } 290 291 /** 292 * Set the filesystem expected to be used for paths. A separate 293 * {@link WatchServiceListener} should be created to listen to events on 294 * each different file system. 295 * 296 * @param system the file system to use. 297 * @return this builder 298 * @since 4.0.0 299 */ 300 public Builder fileSystem(final FileSystem system) { 301 this.fileSystem = system; 302 return this; 303 } 304 305 /** 306 * Create a new listener, using default values for any unset parameters. 307 * 308 * @return a newly created executor 309 * @throws IOException if thrown by {@link WatchServiceListener}'s constructor 310 * @since 4.0.0 311 */ 312 public WatchServiceListener build() throws IOException { 313 if (this.threadFactory == null) { 314 this.threadFactory = DEFAULT_THREAD_FACTORY; 315 } 316 317 if (this.fileSystem == null) { 318 this.fileSystem = FileSystems.getDefault(); 319 } 320 321 if (this.taskExecutor == null) { 322 this.taskExecutor = ForkJoinPool.commonPool(); 323 } 324 325 return new WatchServiceListener(this.threadFactory, this.fileSystem, this.taskExecutor); 326 } 327 328 } 329 330}