001package jmri.jmrix.sprog; 002 003import java.io.DataInputStream; 004import java.io.OutputStream; 005import java.util.Vector; 006import java.util.concurrent.BlockingQueue; 007import java.util.concurrent.LinkedBlockingQueue; 008 009import jmri.jmrix.AbstractPortController; 010import jmri.jmrix.sprog.SprogConstants.SprogState; 011import jmri.jmrix.sprog.serialdriver.SerialDriverAdapter; 012 013/** 014 * Converts Stream-based I/O to/from Sprog messages. The "SprogInterface" side 015 * sends/receives message objects. The connection to a SprogPortController is 016 * via a pair of *Streams, which then carry sequences of characters for 017 * transmission. Note that this processing is handled in an independent thread. 018 * <p> 019 * Rewritten during 4.11.x series. Create a high priority thread for the tc to 020 * move everything off the swing thread. Use a blocking queue to handle 021 * asynchronous messages from multiple sources. 022 * 023 * @author Bob Jacobsen Copyright (C) 2001 024 * @author Andrew Crosland Copyright (C) 2018 025 */ 026public class SprogTrafficController implements SprogInterface, 027 Runnable { 028 029 private SprogReply reply = new SprogReply(); 030 SprogListener lastSender = null; 031 private SprogState sprogState = SprogState.NORMAL; 032 private int lastId; 033 034 private Thread tcThread; 035 private final Object lock = new Object(); 036 private boolean replyAvailable = false; 037 // Make this public so it can be overridden by a script for debug 038 public int timeout = SprogConstants.TC_PROG_REPLY_TIMEOUT; 039 040 /** 041 * Create a new SprogTrafficController instance. 042 * 043 * @param adaptermemo the associated SystemConnectionMemo 044 */ 045 @edu.umd.cs.findbugs.annotations.SuppressFBWarnings(value="SC_START_IN_CTOR", justification="done at end, waits for data") 046 public SprogTrafficController(SprogSystemConnectionMemo adaptermemo) { 047 memo = adaptermemo; 048 init(); 049 } 050 051 private void init() { 052 // Set the timeout for communication with hardware 053 resetTimeout(); 054 055 tcThread = jmri.util.ThreadingUtil.newThread(this); 056 tcThread.setName("SPROG TC thread"); 057 tcThread.setPriority(Thread.MAX_PRIORITY-1); 058 tcThread.setDaemon(true); 059 log.debug("starting TC thread from {} ", this, jmri.util.LoggingUtil.shortenStacktrace(new Exception("traceback"),6)); 060 tcThread.start(); 061 } 062 063 // Methods to implement the Sprog Interface 064 065 protected Vector<SprogListener> cmdListeners = new Vector<SprogListener>(); 066 067 @Override 068 public boolean status() { 069 return (ostream != null && istream != null); 070 } 071 072 /** 073 * Check if the Sprog TC Thread ( started on construction of 074 * SprogTrafficController ) is alive. 075 * For testing purposes. 076 * @return true if alive, else false. 077 */ 078 public boolean isTcThreadAlive() { 079 return tcThread.isAlive(); 080 } 081 082 @Override 083 public synchronized void addSprogListener(SprogListener l) { 084 // add only if not already registered 085 if (l == null) { 086 throw new java.lang.NullPointerException(); 087 } 088 if (!cmdListeners.contains(l)) { 089 cmdListeners.addElement(l); 090 log.debug("SprogListener added to {} tc", memo.getUserName()); 091 } 092 } 093 094 @Override 095 public synchronized void removeSprogListener(SprogListener l) { 096 if (cmdListeners.contains(l)) { 097 cmdListeners.removeElement(l); 098 } 099 } 100 101 /** 102 * Reset timeout to default depending on current mode 103 */ 104 public void resetTimeout() { 105 if (memo.getSprogMode() == SprogConstants.SprogMode.OPS) { 106 timeout = SprogConstants.TC_OPS_REPLY_TIMEOUT; 107 } else { 108 timeout = SprogConstants.TC_PROG_REPLY_TIMEOUT; 109 } 110 } 111 112 public void setTimeout(int t) { 113 timeout = t; 114 } 115 116 public SprogState getSprogState() { 117 return sprogState; 118 } 119 120 public void setSprogState(SprogState s) { 121 this.sprogState = s; 122 if (s == SprogState.V4BOOTMODE) { 123 // enable flow control - required for sprog v4 bootloader 124 var controller = getController(); 125 controller.setHandshake(jmri.jmrix.AbstractSerialPortController.FlowControl.RTSCTS); 126 127 } else { 128 // disable flow control 129 // removed Jan 2010 - this stops SPROG from sending. Could cause problems with 130 // serial Sprogs, but I have no way of testing: 131 // getController().setHandshake(false); 132 } 133 log.debug("Setting sprogState {}", s); 134 } 135 136 public boolean isNormalMode() { 137 return sprogState == SprogState.NORMAL; 138 } 139 140 public boolean isSIIBootMode() { 141 return sprogState == SprogState.SIIBOOTMODE; 142 } 143 144 public boolean isV4BootMode() { 145 return sprogState == SprogState.V4BOOTMODE; 146 } 147 148 @SuppressWarnings("unchecked") 149 private synchronized Vector<SprogListener> getCopyOfListeners() { 150 return (Vector<SprogListener>) cmdListeners.clone(); 151 152 } 153 154 protected synchronized void notifyMessage(SprogMessage m, SprogListener originator) { 155 for (SprogListener listener : this.getCopyOfListeners()) { 156 try { 157 // don't send it back to the originator! 158 if (listener != originator) { 159 // skip forwarding to the last sender for now, we'll get them later 160 if (lastSender != listener) { 161 listener.notifyMessage(m); 162 } 163 } 164 } catch (Exception e) { 165 log.warn("notify: During dispatch to {}", listener, e); 166 } 167 } 168 // forward to the last listener who sent a message 169 // this is done _second_ so monitoring can have already stored the reply 170 // before a response is sent 171 if (lastSender != null && lastSender != originator) { 172 lastSender.notifyMessage(m); 173 } 174 } 175 176 protected synchronized void notifyReply(SprogReply r) { 177 log.debug("notifyReply starts for later, last sender: {}", lastSender); 178 179 final Vector<SprogListener> listeners = this.getCopyOfListeners(); 180 final SprogReply replyForLater = r; 181 final SprogListener senderForLater = lastSender; 182 Runnable rl = () -> { 183 for (SprogListener listener : listeners) { 184 try { 185 // don't send message back to the originator! 186 // skip forwarding to the last sender for now, we'll get them later 187 if (senderForLater != listener) { 188 listener.notifyReply(replyForLater); 189 } 190 191 } catch (Exception e) { 192 log.warn("notify: During dispatch to {}", listener, e); 193 } 194 } 195 // forward to the last listener who sent a message 196 // this is done _second_ so monitoring can have already stored the reply 197 // before a response is sent 198 if (senderForLater != null) { 199 senderForLater.notifyReply(replyForLater); 200 } 201 }; 202 javax.swing.SwingUtilities.invokeLater(rl); 203 } 204 205 protected synchronized void notifyReply(SprogReply r, SprogListener lastSender) { 206 log.debug("notifyReply starts for later, last sender: {}", lastSender); 207 208 final Vector<SprogListener> listeners = this.getCopyOfListeners(); 209 final SprogReply replyForLater = r; 210 final SprogListener senderForLater = lastSender; 211 Runnable rl = () -> { 212 log.debug("notifyReply starts last sender: {}", senderForLater); 213 for (SprogListener listener : listeners) { 214 try { 215 //if is message don't send it back to the originator! 216 // skip forwarding to the last sender for now, we'll get them later 217 if (senderForLater != listener) { 218 log.debug("Notify listener: {} {}", listener, r.toString()); 219 listener.notifyReply(replyForLater); 220 } 221 222 } catch (Exception e) { 223 log.warn("notify: During dispatch to {}", listener, e); 224 } 225 } 226 227 // forward to the last listener who sent a message 228 // this is done _second_ so monitoring can have already stored the reply 229 // before a response is sent 230 if (senderForLater != null) { 231 log.debug("notify last sender: {} {}", senderForLater, replyForLater.toString()); 232 senderForLater.notifyReply(replyForLater); 233 } 234 }; 235 javax.swing.SwingUtilities.invokeLater(rl); 236 } 237 238 // A class to remember the message and who sent it 239 static private class MessageTuple { 240 private final SprogMessage message; 241 private final SprogListener listener; 242 243 public MessageTuple(SprogMessage m, SprogListener l) { 244 message = m; 245 listener = l; 246 } 247 248 // Copy constructor 249 public MessageTuple(MessageTuple mt) { 250 message = mt.message; 251 listener = mt.listener; 252 } 253 } 254 255 // The queue to hold messages being sent 256 BlockingQueue<MessageTuple> sendQueue = new LinkedBlockingQueue<MessageTuple>(); 257 258 /** 259 * Enqueue a preformatted message to be sent to the actual interface 260 * 261 * @param m The message to be forwarded 262 */ 263 public void sendSprogMessage(SprogMessage m) { 264 log.debug("Add message to queue: [{}] id: {}", m.toString(isSIIBootMode()), m.getId()); 265 try { 266 sendQueue.add(new MessageTuple(m, null)); 267 } catch (Exception e) { 268 log.error("Could not add message to queue", e); 269 } 270 } 271 272 /** 273 * Enqueue a preformatted message to be sent to the actual interface 274 * 275 * @param m Message to send 276 * @param replyTo Who is sending the message 277 */ 278 @Override 279 public synchronized void sendSprogMessage(SprogMessage m, SprogListener replyTo) { 280 log.debug("Add message to queue: [{}] id: {}", m.toString(isSIIBootMode()), m.getId()); 281 try { 282 sendQueue.add(new MessageTuple(m, replyTo)); 283 } catch (Exception e) { 284 log.error("Could not add message to queue", e); 285 } 286 } 287 288 /** 289 * Block until a message is available from the queue, send it to the interface 290 * and then block until reply is received or a timeout occurs. This will be 291 * a very long timeout to allow for page mode programming operations in SPROG 292 * programmer mode. 293 */ 294 @Override 295 public void run() { 296 MessageTuple messageToSend; 297 log.debug("Traffic controller queuing thread starts"); 298 while (true) { 299 log.debug("Traffic controller queue waiting"); 300 try { 301 messageToSend = new MessageTuple(sendQueue.take()); 302 } catch (InterruptedException e) { 303 log.debug("Thread interrupted while dequeuing message to send"); 304 return; 305 } 306 log.debug("Message dequeued {} id: {}", messageToSend.message, messageToSend.message.getId()); 307 // remember who sent this 308 lastSender = messageToSend.listener; 309 lastId = messageToSend.message.getId(); 310 // notify all _other_ listeners 311 notifyMessage(messageToSend.message, messageToSend.listener); 312 replyAvailable = false; 313 sendToInterface(messageToSend.message); 314 log.debug("Waiting {} for a reply", timeout); 315 try { 316 synchronized (lock) { 317 lock.wait(timeout); // Wait for notify 318 } 319 } catch (InterruptedException e) { 320 log.debug("waitingForReply interrupted"); 321 } 322 if (!replyAvailable) { 323 // Timed out 324 log.warn("Timeout waiting for reply from hardware in SprogState {}", sprogState); 325 } else { 326 log.debug("Notified of reply"); 327 } 328 } 329 } 330 331 /** 332 * Forward a preformatted message to the interface. 333 * 334 * @param m The message to be forwarded 335 */ 336 public void sendToInterface(SprogMessage m) { 337 // stream to port in single write, as that's needed by serial 338 try { 339 if (ostream != null) { 340 ostream.write(m.getFormattedMessage(sprogState)); 341 log.debug("sendSprogMessage written to ostream"); 342 } else { 343 // no stream connected 344 log.warn("sendMessage: no connection established"); 345 } 346 } catch (Exception e) { 347 log.warn("sendMessage: Exception: ", e); 348 } 349 } 350 351// methods to connect/disconnect to a source of data in a SprogPortController 352 private AbstractPortController controller = null; 353 354 /** 355 * Make connection to existing PortController object. 356 * 357 * @param p The port controller 358 */ 359 public void connectPort(AbstractPortController p) { 360 istream = p.getInputStream(); 361 ostream = p.getOutputStream(); 362 if (controller != null) { 363 log.warn("connectPort: connect called while connected"); 364 } 365 controller = p; 366 } 367 368 /** 369 * Get the port controller, as a SerialDriverAdapter. 370 * 371 * @return the port controller 372 */ 373 protected SerialDriverAdapter getController(){ 374 return (SerialDriverAdapter) controller; 375 } 376 377 /** 378 * Break connection to existing SprogPortController object. 379 * <p> 380 * Once broken, attempts to send via "message" member will fail. 381 * 382 * @param p the connection to break 383 */ 384 public void disconnectPort(AbstractPortController p) { 385 istream = null; 386 ostream = null; 387 if (controller != p) { 388 log.warn("disconnectPort: disconnect called from non-connected SprogPortController"); 389 } 390 controller = null; 391 } 392 393 static volatile protected SprogTrafficController self = null; 394 395 public void setAdapterMemo(SprogSystemConnectionMemo adaptermemo) { 396 memo = adaptermemo; 397 } 398 399 public SprogSystemConnectionMemo getAdapterMemo() { 400 return memo; 401 } 402 403 private SprogSystemConnectionMemo memo = null; 404 405 // data members to hold the streams 406 DataInputStream istream = null; 407 OutputStream ostream = null; 408 409 boolean endReply(SprogReply msg) { 410 return msg.endNormalReply() || msg.endBootReply(); 411 } 412 413 private boolean unsolicited; 414 415 /** 416 * Handle an incoming reply. 417 */ 418 public void handleOneIncomingReply() { 419 // we get here if data has been received and this method is explicitly invoked 420 // fill the current reply with any data received 421 int replyCurrentSize = reply.getNumDataElements(); 422 int i; 423 for (i = replyCurrentSize; i < SprogReply.maxSize - replyCurrentSize; i++) { 424 try { 425 if (istream.available() == 0) { 426 break; // nothing waiting to be read 427 } 428 byte char1 = istream.readByte(); 429 reply.setElement(i, char1); 430 431 } catch (Exception e) { 432 log.warn("Exception in DATA_AVAILABLE state", e); 433 } 434 if (endReply(reply)) { 435 sendreply(); 436 break; 437 } 438 } 439 } 440 441 /** 442 * Send the current reply - built using data from serialEvent. 443 */ 444 private void sendreply() { 445 //send the reply 446 log.debug("dispatch reply of length {} in SprogState {}", reply.getNumDataElements(), sprogState); 447 if (unsolicited) { 448 log.debug("Unsolicited Reply"); 449 reply.setUnsolicited(); 450 } 451 // Insert the id 452 reply.setId(lastId); 453 notifyReply(reply, lastSender); 454 log.debug("Notify() wait"); 455 replyAvailable = true; 456 synchronized(lock) { 457 lock.notifyAll(); 458 } 459 460 //Create a new reply, ready to be filled 461 reply = new SprogReply(); 462 } 463 464 public void dispose(){ 465 tcThread.interrupt(); 466 try { 467 tcThread.join(); 468 } catch (InterruptedException e) { 469 // Do nothing 470 } 471 } 472 473 private static final org.slf4j.Logger log = org.slf4j.LoggerFactory.getLogger(SprogTrafficController.class); 474 475}