001package jmri.jmrix.roco.z21; 002 003import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; 004import java.net.DatagramPacket; 005import java.util.ArrayList; 006import java.util.Arrays; 007import java.util.List; 008 009import jmri.jmrix.*; 010import org.slf4j.Logger; 011import org.slf4j.LoggerFactory; 012 013/** 014 * Abstract base for TrafficControllers in a Message/Reply protocol. 015 * 016 * @author Paul Bender Copyright (C) 2014 017 */ 018public class Z21TrafficController extends jmri.jmrix.AbstractMRTrafficController implements Z21Interface { 019 020 private java.net.InetAddress host; 021 private int port; 022 023 public Z21TrafficController() { 024 super(); 025 allowUnexpectedReply = true; 026 } 027 028 /** 029 * Implement this to forward a specific message type to a protocol-specific 030 * listener interface. This puts the casting into the concrete class. 031 */ 032 @Override 033 protected void forwardMessage(AbstractMRListener client, AbstractMRMessage m) { 034 ((Z21Listener) client).message((Z21Message) m); 035 } 036 037 /** 038 * Implement this to forward a specific Reply type to a protocol-specific 039 * listener interface. This puts the casting into the concrete class. 040 */ 041 @Override 042 protected void forwardReply(AbstractMRListener client, AbstractMRReply m) { 043 ((Z21Listener) client).reply((Z21Reply) m); 044 } 045 046 /** 047 * Invoked if it's appropriate to do low-priority polling of the command 048 * station, this should return the next message to send, or null if the TC 049 * should just sleep. 050 */ 051 @Override 052 protected Z21Message pollMessage() { 053 return null; 054 } 055 056 @Override 057 protected Z21Listener pollReplyHandler() { 058 return null; 059 } 060 061 /** 062 * enterProgMode() and enterNormalMode() return any message that 063 * needs to be returned to the command station to change modes. 064 * 065 * @see #enterNormalMode() 066 * @return if no message is needed, you may return null. 067 * 068 * If the programmerIdle() function returns true, enterNormalMode() is 069 * called after a timeout while in IDLESTATE during programming to 070 * return the system to normal mode. 071 */ 072 @Override 073 protected Z21Message enterProgMode() { 074 return null; 075 } 076 077 /** 078 * enterProgMode() and enterNormalMode() return any message that 079 * needs to be returned to the command station to change modes. 080 * 081 * @see #enterProgMode() 082 * @return if no message is needed, you may return null. 083 */ 084 @Override 085 protected Z21Message enterNormalMode() { 086 return null; 087 } 088 089 /** 090 * Actually transmits the next message to the port. 091 */ 092 @SuppressFBWarnings(value = {"TLW_TWO_LOCK_WAIT", "", "UW_UNCOND_WAIT"}, 093 justification = "Two locks needed for synchronization here, this is OK; String + only used for debug, so inefficient String processing not really a problem; Unconditional Wait is to give external hardware, which doesn't necessarilly respond, time to process the data.") 094 @Override 095 synchronized protected void forwardToPort(AbstractMRMessage m, AbstractMRListener reply) { 096 if (log.isDebugEnabled()) { 097 log.debug("forwardToPort message: [{}]", m); 098 } 099 // remember who sent this 100 mLastSender = reply; 101 102 // forward the message to the registered recipients, 103 // which includes the communications monitor, except the sender. 104 // Schedule notification via the Swing event queue to ensure order 105 Runnable r = new XmtNotifier(m, mLastSender, this); 106 javax.swing.SwingUtilities.invokeLater(r); 107 108 // stream to port in single write, as that's needed by serial 109 byte[] msg = new byte[lengthOfByteStream(m)]; 110 // add header 111 int offset = addHeaderToOutput(msg, m); 112 113 // add data content 114 int len = m.getNumDataElements(); 115 for (int i = 0; i < len; i++) { 116 msg[i + offset] = (byte) m.getElement(i); 117 } 118 // add trailer 119 addTrailerToOutput(msg, len + offset, m); 120 // and send the bytes 121 try { 122 if (log.isDebugEnabled()) { 123 StringBuilder f = new StringBuilder("formatted message: "); 124 for (byte b : msg) { 125 f.append(Integer.toHexString(0xFF & b)); 126 f.append(" "); 127 } 128 log.debug(new String(f)); 129 } 130 while (m.getRetries() >= 0) { 131 if (portReadyToSend(controller)) { 132 // create a datagram with the data from the 133 // message. 134 byte[] data = ((Z21Message) m).getBuffer(); 135 DatagramPacket sendPacket 136 = new DatagramPacket(data, ((Z21Message) m).getLength(), host, port); 137 // and send it. 138 ((Z21Adapter) controller).getSocket().send(sendPacket); 139 log.debug("written, msg timeout: {} mSec", m.getTimeout()); 140 break; 141 } else if (m.getRetries() >= 0) { 142 if (log.isDebugEnabled()) { 143 StringBuilder b = new StringBuilder("Retry message: "); 144 b.append(m.toString()); 145 b.append(" attempts remaining: "); 146 b.append(m.getRetries()); 147 log.debug(new String(b)); 148 } 149 m.setRetries(m.getRetries() - 1); 150 try { 151 synchronized (xmtRunnable) { 152 xmtRunnable.wait(m.getTimeout()); 153 } 154 } catch (InterruptedException e) { 155 Thread.currentThread().interrupt(); // retain if needed later 156 if(!threadStopRequest) { 157 log.error("retry wait interrupted"); 158 } else { 159 log.error("retry wait interrupted during thread stop"); 160 } 161 } 162 } else { 163 log.warn("sendMessage: port not ready for data sending: {}", java.util.Arrays.toString(msg)); 164 } 165 } 166 } catch (Exception e) { 167 // TODO Currently there's no port recovery if an exception occurs 168 // must restart JMRI to clear xmtException. 169 xmtException = true; 170 portWarn(e); 171 } 172 } 173 174 @Override() 175 public boolean status() { 176 if (controller == null) { 177 return false; 178 } else { 179 return (controller.status()); 180 } 181 } 182 183 /** 184 * Make connection to existing PortController object. 185 */ 186 @Override 187 public void connectPort(AbstractPortController p) { 188 rcvException = false; 189 xmtException = false; 190 if (controller != null) { 191 log.warn("connectPort: connect called while connected"); 192 } else { 193 log.debug("connectPort invoked"); 194 } 195 if (!(p instanceof Z21Adapter)) { 196 throw new IllegalArgumentException("attempt to connect wrong port type"); 197 } 198 controller = p; 199 try { 200 host = java.net.InetAddress.getByName(((Z21Adapter) controller).getHostName()); 201 port = ((Z21Adapter) controller).getPort(); 202 ConnectionStatus.instance().setConnectionState( 203 p.getSystemConnectionMemo().getUserName(), 204 ((Z21Adapter) p).getHostName() + ":" + ((Z21Adapter) p).getPort(), ConnectionStatus.CONNECTION_UP); 205 } catch (java.net.UnknownHostException uhe) { 206 log.error("Unknown Host: {} ", ((Z21Adapter) controller).getHostName()); 207 if (((Z21Adapter) p).getPort() != 0) { 208 ConnectionStatus.instance().setConnectionState( 209 p.getSystemConnectionMemo().getUserName(), 210 ((Z21Adapter) controller).getHostName() + ":" + ((Z21Adapter) p).getPort(), ConnectionStatus.CONNECTION_DOWN); 211 } else { 212 ConnectionStatus.instance().setConnectionState( 213 p.getSystemConnectionMemo().getUserName(), 214 ((Z21Adapter) controller).getHostName(), ConnectionStatus.CONNECTION_DOWN); 215 } 216 } 217 // and start threads 218 xmtThread = new Thread(xmtRunnable = () -> { 219 try { 220 transmitLoop(); 221 } catch (Throwable e) { 222 if(!threadStopRequest) 223 log.error("Transmit thread terminated prematurely by: {}", e.toString(), e); 224 } 225 }); 226 xmtThread.setName("z21.Z21TrafficController Transmit thread"); 227 xmtThread.start(); 228 rcvThread = new Thread(this::receiveLoop); 229 rcvThread.setName("z21.Z21TrafficController Receive thread"); 230 int xr = rcvThread.getPriority(); 231 xr++; 232 rcvThread.setPriority(xr); //bump up the priority 233 rcvThread.start(); 234 } 235 236 /** 237 * Break connection to existing PortController object. Once broken, attempts 238 * to send via "message" member will fail. 239 */ 240 @Override 241 public void disconnectPort(AbstractPortController p) { 242 if (controller != p) { 243 log.warn("disconnectPort: disconnect called from non-connected AbstractPortController"); 244 } 245 controller = null; 246 } 247 248 @Override 249 protected Z21Reply newReply() { 250 return new Z21Reply(); 251 } 252 253 @Override 254 protected boolean endOfMessage(AbstractMRReply r) { 255 // since this is a UDP protocol, and each reply in the packet is complete, 256 // we don't check for end of message manually. 257 return true; 258 } 259 260 /** 261 * Handle each reply when complete. 262 * <p> 263 * (This is public for testing purposes) Runs in the "Receive" thread. 264 */ 265 @SuppressFBWarnings(value = {"UW_UNCOND_WAIT", "WA_NOT_IN_LOOP", "NO_NOTIFY_NOT_NOTIFYALL"}, 266 justification = "Wait is for external hardware, which doesn't necessarilly respond, to process the data. Notify is used because Having more than one thread waiting on xmtRunnable is an error.") 267 @Override 268 public void handleOneIncomingReply() throws java.io.IOException { 269 // we sit in this until the message is complete, relying on 270 // threading to let other stuff happen 271 272 // create a buffer to hold the incoming data. 273 byte[] buffer = new byte[100]; // the size here just needs to be longer 274 // than the longest protocol message. 275 // Otherwise, the receive will truncate. 276 277 // create the packet. 278 DatagramPacket receivePacket = new DatagramPacket(buffer, 100, host, port); 279 280 // and wait to receive data in the packet. 281 try { 282 ((Z21Adapter) controller).getSocket().receive(receivePacket); 283 } catch (java.net.SocketException | NullPointerException se) { 284 // if we are waiting when the controller is disposed, 285 // a socket exception will be thrown. 286 log.debug("Socket exception during receive. Connection Closed?"); 287 rcvException = true; 288 return; 289 } 290 if (threadStopRequest) return; 291 292 // handle more than one reply in the same UDP packet. 293 List<Z21Reply> replies = new ArrayList<>(); 294 295 int totalLength=receivePacket.getLength(); 296 int consumed=0; 297 298 do { 299 int length = (0xff & buffer[0]) + ((0xff & buffer[1]) << 8); 300 Z21Reply msg = new Z21Reply(buffer, length); 301 302 replies.add(msg); 303 304 buffer = Arrays.copyOfRange(buffer,length,buffer.length); 305 consumed +=length; 306 log.trace("total length: {} consumed {}",totalLength,consumed); 307 } while(totalLength>consumed); 308 309 310 // and then dispatch each reply 311 replies.forEach(this::dispatchReply); 312 } 313 314 private void dispatchReply(Z21Reply msg) { 315 // message is complete, dispatch it !! 316 replyInDispatch = true; 317 if (log.isDebugEnabled()) { 318 log.debug("dispatch reply of length {} contains {} state {}", msg.getNumDataElements(), msg.toString(), mCurrentState); 319 } 320 321 // forward the message to the registered recipients, 322 // which includes the communications monitor 323 // return a notification via the Swing event queue to ensure proper thread 324 Runnable r = new RcvNotifier(msg, mLastSender, this); 325 try { 326 javax.swing.SwingUtilities.invokeAndWait(r); 327 } catch (InterruptedException ie) { 328 if(threadStopRequest) return; 329 log.error("Unexpected exception in invokeAndWait:{}", ie, ie); 330 } catch (Exception e) { 331 log.error("Unexpected exception in invokeAndWait:{}", e, e); 332 } 333 if (log.isDebugEnabled()) { 334 log.debug("dispatch thread invoked"); 335 } 336 337 if (!msg.isUnsolicited()) { 338 // effect on transmit: 339 switch (mCurrentState) { 340 case WAITMSGREPLYSTATE: { 341 // check to see if the response was an error message we want 342 // to automatically handle by re-queueing the last sent 343 // message, otherwise go on to the next message 344 if (msg.isRetransmittableErrorMsg()) { 345 if (log.isDebugEnabled()) { 346 log.debug("Automatic Recovery from Error Message: +msg.toString()"); 347 } 348 synchronized (xmtRunnable) { 349 mCurrentState = AUTORETRYSTATE; 350 replyInDispatch = false; 351 xmtRunnable.notify(); 352 } 353 } else { 354 // update state, and notify to continue 355 synchronized (xmtRunnable) { 356 mCurrentState = NOTIFIEDSTATE; 357 replyInDispatch = false; 358 xmtRunnable.notify(); 359 } 360 } 361 break; 362 } 363 case WAITREPLYINPROGMODESTATE: { 364 // entering programming mode 365 mCurrentMode = PROGRAMINGMODE; 366 replyInDispatch = false; 367 368 // check to see if we need to delay to allow decoders to become 369 // responsive 370 int warmUpDelay = enterProgModeDelayTime(); 371 if (warmUpDelay != 0) { 372 try { 373 synchronized (xmtRunnable) { 374 xmtRunnable.wait(warmUpDelay); 375 } 376 } catch (InterruptedException e) { 377 Thread.currentThread().interrupt(); // retain if needed later 378 if (threadStopRequest) return; 379 } 380 } 381 // update state, and notify to continue 382 synchronized (xmtRunnable) { 383 mCurrentState = OKSENDMSGSTATE; 384 xmtRunnable.notify(); 385 } 386 break; 387 } 388 case WAITREPLYINNORMMODESTATE: { 389 // entering normal mode 390 mCurrentMode = NORMALMODE; 391 replyInDispatch = false; 392 // update state, and notify to continue 393 synchronized (xmtRunnable) { 394 mCurrentState = OKSENDMSGSTATE; 395 xmtRunnable.notify(); 396 } 397 break; 398 } 399 default: { 400 replyInDispatch = false; 401 if (allowUnexpectedReply) { 402 if (log.isDebugEnabled()) { 403 log.debug("Allowed unexpected reply received in state: {} was {}", mCurrentState, msg.toString()); 404 } 405 synchronized (xmtRunnable) { 406 // The transmit thread sometimes gets stuck 407 // when unexpected replies are received. Notify 408 // it to clear the block without a timeout. 409 // (do not change the current state) 410 //if(mCurrentState!=IDLESTATE) 411 xmtRunnable.notify(); 412 } 413 } else { 414 unexpectedReplyStateError(mCurrentState,msg.toString()); 415 } 416 } 417 } 418 // Unsolicited message 419 } else { 420 if (log.isDebugEnabled()) { 421 log.debug("Unsolicited Message Received {}", msg.toString()); 422 } 423 424 replyInDispatch = false; 425 } 426 } 427 428 @SuppressFBWarnings(value = {"UW_UNCOND_WAIT", "WA_NOT_IN_LOOP"}, 429 justification = "Wait is for external hardware, which doesn't necessarilly respond, to process the data.") 430 @Override 431 protected void terminate() { 432 if (controller == null) { 433 log.debug("terminate called while not connected"); 434 return; 435 } else { 436 log.debug("Cleanup Starts"); 437 } 438 439 Z21Message logoffMessage = Z21Message.getLanLogoffRequestMessage(); 440 forwardToPort(logoffMessage, null); 441 // wait for reply 442 try { 443 if (xmtRunnable != null) { 444 synchronized (xmtRunnable) { 445 xmtRunnable.wait(logoffMessage.getTimeout()); 446 } 447 } 448 } catch (InterruptedException e) { 449 Thread.currentThread().interrupt(); // retain if needed later 450 log.error("transmit interrupted"); 451 } finally { 452 // set the controller to null, even if terminate fails. 453 controller = null; 454 } 455 } 456 457 /** 458 * Terminate the receive and transmit threads. 459 * <p> 460 * This is intended to be used only by testing subclasses. 461 */ 462 @Override 463 public void terminateThreads() { 464 threadStopRequest = true; 465 // ensure socket closed to end pending operations 466 if ( controller != null && ((Z21Adapter) controller).getSocket() != null) ((Z21Adapter) controller).getSocket().close(); 467 468 // usual stop process 469 super.terminateThreads(); 470 } 471 472 // The methods to implement the Z21Interface 473 @Override 474 public synchronized void addz21Listener(Z21Listener l) { 475 this.addListener(l); 476 } 477 478 @Override 479 public synchronized void removez21Listener(Z21Listener l) { 480 this.removeListener(l); 481 } 482 483 /** 484 * Forward a preformatted message to the actual interface. 485 */ 486 @Override 487 public void sendz21Message(Z21Message m, Z21Listener reply) { 488 sendMessage(m, reply); 489 } 490 491 private final static Logger log = LoggerFactory.getLogger(Z21TrafficController.class); 492}