001package jmri.jmrit.logixng.util; 002 003import java.awt.event.ActionEvent; 004import java.util.*; 005import java.util.concurrent.ArrayBlockingQueue; 006import java.util.concurrent.BlockingQueue; 007 008import javax.swing.Timer; 009import javax.annotation.Nonnull; 010import javax.annotation.concurrent.ThreadSafe; 011 012import jmri.util.*; 013 014import jmri.util.ThreadingUtil.ThreadAction; 015 016/** 017 * Utilities for handling JMRI's LogixNG threading conventions. 018 * <p> 019 * For background, see 020 * <a href="http://jmri.org/help/en/html/doc/Technical/Threads.shtml">http://jmri.org/help/en/html/doc/Technical/Threads.shtml</a> 021 * <p> 022 * This is the ThreadingUtil class for LogixNG. 023 * 024 * @author Bob Jacobsen Copyright 2015 025 * @author Daniel Bergqvist Copyright 2020 026 */ 027@ThreadSafe 028public class LogixNG_Thread { 029 030 public static final int ERROR_HANDLING_LOGIXNG_THREAD = Integer.MIN_VALUE; 031 public static final int DEFAULT_LOGIXNG_THREAD = 0; 032 public static final int DEFAULT_LOGIXNG_DEBUG_THREAD = 1; 033 034 private static final Map<Integer, LogixNG_Thread> _threads = new HashMap<>(); 035 private static final Map<String, LogixNG_Thread> _threadNames = new HashMap<>(); 036 private static int _highestThreadID = -1; 037 038 private final int _threadID; 039 private String _name; 040 private volatile boolean _stopThread = false; 041 private volatile boolean _threadIsStopped = false; 042 043 private final Thread _logixNGThread; 044 private boolean _threadInUse = false; 045 private final BlockingQueue<ThreadEvent> _eventQueue; 046 047 048 public static LogixNG_Thread createNewThread(@Nonnull String name) { 049 return createNewThread(-1, name); 050 } 051 052 public static LogixNG_Thread createNewThread(int threadID, @Nonnull String name) { 053 synchronized (LogixNG_Thread.class) { 054 if (threadID == -1) { 055 threadID = ++_highestThreadID; 056 } else { 057 if (threadID > _highestThreadID) _highestThreadID = threadID; 058 } 059 060 if (_threads.containsKey(threadID) && _threads.get(threadID)._name.equals(name)) { 061 log.warn("Thread ID {} with name {} already exists", threadID, name); 062 return _threads.get(threadID); 063 } 064 065 if (_threads.containsKey(threadID)) { 066 throw new IllegalArgumentException(String.format("Thread ID %d already exists", threadID)); 067 } 068 069 if (_threadNames.containsKey(name)) { 070 throw new IllegalArgumentException(String.format("Thread name %s already exists", name)); 071 } 072 LogixNG_Thread thread = new LogixNG_Thread(threadID, name); 073 _threads.put(threadID, thread); 074 _threadNames.put(name, thread); 075 thread._logixNGThread.start(); 076 077 return thread; 078 } 079 } 080 081 public static boolean validateNewThreadName(@Nonnull String name) { 082 synchronized (LogixNG_Thread.class) { 083 return !_threadNames.containsKey(name); 084 } 085 } 086 087 public static LogixNG_Thread getThread(int threadID) { 088 synchronized (LogixNG_Thread.class) { 089 LogixNG_Thread thread = _threads.get(threadID); 090 if (thread == null) { 091 switch (threadID) { 092 case ERROR_HANDLING_LOGIXNG_THREAD: 093 thread = createNewThread(ERROR_HANDLING_LOGIXNG_THREAD, "Error handling thread"); 094 break; 095 case DEFAULT_LOGIXNG_THREAD: 096 thread = createNewThread(DEFAULT_LOGIXNG_THREAD, Bundle.getMessage("LogixNG_Thread")); 097 break; 098 case DEFAULT_LOGIXNG_DEBUG_THREAD: 099 thread = createNewThread(DEFAULT_LOGIXNG_DEBUG_THREAD, Bundle.getMessage("LogixNG_DebugThread")); 100 break; 101 default: 102 throw new IllegalArgumentException(String.format("Thread ID %d does not exists", threadID)); 103 } 104 } 105 return thread; 106 } 107 } 108 109 public static int getThreadID(String name) { 110 synchronized (LogixNG_Thread.class) { 111 for (LogixNG_Thread t : _threads.values()) { 112 if (name.equals(t._name)) return t._threadID; 113 } 114 throw new IllegalArgumentException(String.format("Thread name \"%s\" does not exists", name)); 115 } 116 } 117 118 public static void deleteThread(LogixNG_Thread thread) { 119 synchronized (LogixNG_Thread.class) { 120 LogixNG_Thread aThread = _threads.get(thread._threadID); 121 122 if (aThread == null) throw new IllegalArgumentException("Thread does not exists"); 123 if (aThread != thread) throw new IllegalArgumentException("Thread ID does not match"); 124 if (aThread._threadInUse) throw new IllegalArgumentException("Thread is in use"); 125 126 _threads.remove(thread._threadID); 127 _threadNames.remove(thread._name); 128 } 129 } 130 131 public static Collection<LogixNG_Thread> getThreads() { 132 var threadsCopy = new HashMap<>(_threads); 133 threadsCopy.remove(ERROR_HANDLING_LOGIXNG_THREAD); 134 return Collections.unmodifiableCollection(threadsCopy.values()); 135 } 136 137 private LogixNG_Thread(int threadID, String name) { 138 _threadID = threadID; 139 _name = name; 140 141 synchronized(LogixNG_Thread.class) { 142 143 _eventQueue = new ArrayBlockingQueue<>(1024); 144 _logixNGThread = new Thread(() -> { 145 while (!_stopThread) { 146 try { 147 ThreadEvent event = _eventQueue.take(); 148 if (event._lock != null) { 149 synchronized(event._lock) { 150 if (!_stopThread) event._threadAction.run(); 151 event._lock.notify(); 152 } 153 } else { 154 event._threadAction.run(); 155 } 156 } catch (InterruptedException ex) { 157 Thread.currentThread().interrupt(); 158 } 159 } 160 _threadIsStopped = true; 161 }, "JMRI LogixNGThread"); 162 163 _logixNGThread.setDaemon(true); 164 } 165 } 166 167 public Thread getThread() { 168 return _logixNGThread; 169 } 170 171 public int getThreadId() { 172 return _threadID; 173 } 174 175 public String getThreadName() { 176 return _name; 177 } 178 179 public void setThreadName(@Nonnull String name) { 180 if (_name.equals(name)) return; 181 182 synchronized (LogixNG_Thread.class) { 183 if (_threadNames.containsKey(name)) { 184 throw new IllegalArgumentException(String.format("Thread name %s already exists", name)); 185 } 186 _threadNames.remove(_name); 187 _threadNames.put(name, this); 188 _name = name; 189 } 190 } 191 192 public boolean getThreadInUse() { 193 return _threadInUse; 194 } 195 196 /** 197 * Set the thread to "in use". 198 * If a thread is in use, it cannot be unset as not in use. 199 */ 200 public void setThreadInUse() { 201 _threadInUse = true; 202 } 203 204 /** 205 * Run some LogixNG-specific code before returning. 206 * <p> 207 * Typical uses: 208 * <p> {@code 209 * ThreadingUtil.runOnLogixNG(() -> { 210 * logixNG.doSomething(value); 211 * }); 212 * } 213 * 214 * @param ta What to run, usually as a lambda expression 215 */ 216 @edu.umd.cs.findbugs.annotations.SuppressFBWarnings(value = {"WA_NOT_IN_LOOP", "UW_UNCOND_WAIT"}, 217 justification="Method runOnLogixNG() doesn't have a loop. Waiting for single possible event."+ 218 "The thread that is going to call notify() cannot get"+ 219 " it's hands on the lock until wait() is called, "+ 220 " since the caller must first fetch the event from the"+ 221 " event queue and the event is put on the event queue in"+ 222 " the synchronize block.") 223 public void runOnLogixNG(@Nonnull ThreadAction ta) { 224 if (_logixNGThread != null) { 225 Object lock = new Object(); 226 synchronized(lock) { 227 _eventQueue.add(new ThreadEvent(ta, lock)); 228 try { 229 lock.wait(); 230 } catch (InterruptedException e) { 231 log.debug("Interrupted while running on LogixNG thread"); 232 Thread.currentThread().interrupt(); 233 } 234 } 235 } else { 236 throw new RuntimeException("LogixNG thread not started. ThreadID: "+Integer.toString(_threadID)); 237 } 238 } 239 240 /** 241 * Run some LogixNG-specific code at some later point. 242 * <p> 243 * Please note the operation may have happened before this returns. Or 244 * later. No long-term guarantees. 245 * <p> 246 * Typical uses: 247 * <p> {@code 248 * ThreadingUtil.runOnLogixNGEventually(() -> { 249 * sensor.setState(value); 250 * }); 251 * } 252 * 253 * @param ta What to run, usually as a lambda expression 254 */ 255 public void runOnLogixNGEventually(@Nonnull ThreadAction ta) { 256 if (_logixNGThread != null) { 257 _eventQueue.add(new ThreadEvent(ta)); 258 } else { 259 throw new RuntimeException("LogixNG thread not started"); 260 } 261 } 262 263 /** 264 * Run some LogixNG-specific code at some later point, at least a known time 265 * in the future. 266 * <p> 267 * There is no long-term guarantee about the accuracy of the interval. 268 * <p> 269 * Typical uses: 270 * <p> {@code 271 * ThreadingUtil.runOnLogixNGDelayed(() -> { 272 * sensor.setState(value); 273 * }, 1000); 274 * } 275 * 276 * @param ta What to run, usually as a lambda expression 277 * @param delay interval in milliseconds 278 * @return reference to timer object handling delay so you can cancel if desired; note that operation may have already taken place. 279 */ 280 @Nonnull 281 public Timer runOnLogixNGDelayed(@Nonnull ThreadAction ta, int delay) { 282 if (_logixNGThread != null) { 283 // dispatch to logixng thread via timer. We are forced to use a 284 // Swing Timer since the method returns a Timer object and we don't 285 // want to change the method interface. 286 Timer timer = new Timer(delay, (ActionEvent e) -> { 287 // Dispatch the event to the LogixNG event handler once the time 288 // has passed. 289 _eventQueue.add(new ThreadEvent(ta)); 290 }); 291 timer.setRepeats(false); 292 timer.start(); 293 return timer; 294 } else { 295 throw new RuntimeException("LogixNG thread not started"); 296 } 297 } 298 299 public boolean isQueueEmpty() { 300 return _eventQueue.isEmpty(); 301 } 302 303 /** 304 * Check if on the LogixNG-operation thread. 305 * 306 * @return true if on the LogixNG-operation thread 307 */ 308 public boolean isLogixNGThread() { 309 if (_logixNGThread != null) { 310 return _logixNGThread == Thread.currentThread(); 311 } else { 312 throw new RuntimeException("LogixNG thread not started"); 313 } 314 } 315 316 /** 317 * Checks if the the current thread is the LogixNG thread. 318 * The check is only done if debug is enabled. 319 */ 320 public void checkIsLogixNGThread() { 321 if (log.isDebugEnabled()) { 322 if (!isLogixNGThread()) { 323 LoggingUtil.warnOnce(log, "checkIsLogixNGThread() called on wrong thread", new Exception()); 324 } 325 } 326 } 327 328 static private class ThreadEvent { 329 private final ThreadAction _threadAction; 330 private final Object _lock; 331 332 public ThreadEvent(ThreadAction threadAction) { 333 _threadAction = threadAction; 334 _lock = null; 335 } 336 337 public ThreadEvent(ThreadAction threadAction, 338 Object lock) { 339 _threadAction = threadAction; 340 _lock = lock; 341 } 342 } 343 344 private void stopLogixNGThread() { 345 synchronized(LogixNG_Thread.class) { 346 if (_logixNGThread != null) { 347 _stopThread = true; 348 _logixNGThread.interrupt(); 349 try { 350 _logixNGThread.join(0); 351 } catch (InterruptedException e) { 352 throw new RuntimeException("stopLogixNGThread() was interrupted"); 353 } 354 if (_logixNGThread.getState() != Thread.State.TERMINATED) { 355 throw new RuntimeException("Could not stop logixNGThread. Current state: "+_logixNGThread.getState().name()); 356 } 357 _threads.remove(_threadID); 358 _threadNames.remove(_name); 359 _stopThread = false; 360 } 361 } 362 } 363 364 public static void stopAllLogixNGThreads() { 365 synchronized(LogixNG_Thread.class) { 366 List<LogixNG_Thread> list = new ArrayList<>(_threads.values()); 367 for (LogixNG_Thread thread : list) { 368 thread.stopLogixNGThread(); 369 } 370 } 371 } 372 373 public static void assertLogixNGThreadNotRunning() { 374 synchronized(LogixNG_Thread.class) { 375 boolean aThreadIsRunning = false; 376 for (LogixNG_Thread thread : _threads.values()) { 377 if (!thread._threadIsStopped) { 378 aThreadIsRunning = true; 379 thread.stopLogixNGThread(); 380 } 381 } 382 if (aThreadIsRunning) { 383 throw new RuntimeException("logixNGThread is running"); 384 } 385 } 386 } 387 388 private final static org.slf4j.Logger log = org.slf4j.LoggerFactory.getLogger(LogixNG_Thread.class); 389 390} 391