001package jmri.jmrix.xpa; 002 003import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; 004import java.io.DataInputStream; 005import java.io.OutputStream; 006import java.util.ArrayList; 007import java.util.LinkedList; 008import java.util.NoSuchElementException; 009import org.slf4j.Logger; 010import org.slf4j.LoggerFactory; 011 012/** 013 * Converts Stream-based I/O to/from Xpa messages. The "XpaInterface" side 014 * sends/receives message objects. The connection to an XpaPortController is via 015 * a pair of *Streams, which then carry sequences of characters for 016 * transmission. Note that this processing is handled in an independent thread. 017 * 018 * @author Paul Bender Copyright (C) 2004, 2016 019 */ 020public class XpaTrafficController implements XpaInterface, Runnable { 021 022 // Linked list to store the transmit queue. 023 final LinkedList<byte[]> xmtList = new LinkedList<>(); 024 025 /** 026 * (local class) object to implement the transmit thread 027 */ 028 final XmtHandler xmtHandler = new XmtHandler(); 029 Thread xmtThread = null; 030 031 /** 032 * Create a new XpaTrafficController instance. 033 */ 034 public XpaTrafficController() { 035 if (log.isDebugEnabled()) { 036 log.debug("setting instance: {}", this); 037 } 038 } 039 040 /** 041 * Start the Transmit thread. 042 */ 043 public void startTransmitThread() { 044 if (xmtThread == null) { 045 // Start the xmtHandler thread 046 xmtThread = new Thread(xmtHandler, "XPA transmit handler"); 047 xmtThread.setPriority(Thread.MAX_PRIORITY - 1); 048 xmtThread.start(); 049 } 050 } 051 052 protected final ArrayList<XpaListener> cmdListeners = new ArrayList<>(); 053 054 @Override 055 public boolean status() { 056 return (ostream != null && istream != null); 057 } 058 059 @Override 060 public synchronized void addXpaListener(XpaListener l) { 061 // add only if not already registered 062 if (l == null) { 063 throw new java.lang.NullPointerException(); 064 } 065 if (!cmdListeners.contains(l)) { 066 cmdListeners.add(l); 067 } 068 } 069 070 @Override 071 public synchronized void removeXpaListener(XpaListener l) { 072 cmdListeners.remove(l); 073 } 074 075 /** 076 * Forward a XpaMessage to all registered XpaInterface listeners. 077 * 078 * @param m the message to forward 079 * @param notMe registered listener not to forward the message to 080 */ 081 protected void notifyMessage(XpaMessage m, XpaListener notMe) { 082 // make a copy of the listener vector to synchronized not needed for transmit 083 ArrayList<XpaListener> v; 084 synchronized (this) { 085 v = new ArrayList<>(cmdListeners); 086 } 087 // forward to all listeners 088 for (XpaListener client : v) { 089 if (notMe != client) { 090 if (log.isDebugEnabled()) { 091 log.debug("notify client: {}", client); 092 } 093 try { 094 client.message(m); 095 } catch (Exception e) { 096 log.warn("notify: During dispatch to {}", client, e); 097 } 098 } 099 } 100 } 101 102 XpaListener lastSender = null; 103 104 protected void notifyReply(XpaMessage r) { 105 // make a copy of the listener vector to synchronized (not needed for transmit?) 106 ArrayList<XpaListener> v; 107 synchronized (this) { 108 v = new ArrayList<>(cmdListeners); 109 } 110 // forward to all listeners 111 for (XpaListener client : v) { 112 if (log.isDebugEnabled()) { 113 log.debug("notify client: {}", client); 114 } 115 try { 116 // Skip forwarding the message to the last sender until 117 // later. 118 if (lastSender != client) { 119 client.reply(r); 120 } 121 } catch (Exception e) { 122 log.warn("notify: During dispatch to {}", client, e); 123 } 124 } 125 126 // forward to the last listener who send a message 127 // this is done _second_ so monitoring can have already stored the reply 128 // before a response is sent 129 if (lastSender != null) { 130 lastSender.reply(r); 131 } 132 } 133 134 /** 135 * Forward a pre-formatted message to the actual interface. 136 * 137 * @param m the message to forward 138 * @param reply the listener to receive the reply 139 */ 140 @SuppressFBWarnings(value = {"NO_NOTIFY_NOT_NOTIFYALL"}, 141 justification = "Notify is used because Having more than one thread waiting on xmtHandler is an error.") 142 @Override 143 synchronized public void sendXpaMessage(XpaMessage m, XpaListener reply) { 144 if (log.isDebugEnabled()) { 145 log.debug("sendXpaMessage message: [{}]", m); 146 } 147 // remember who sent this 148 lastSender = reply; 149 150 // notify all _other_ listeners 151 notifyMessage(m, reply); 152 153 // stream to port in single write, as that's needed by serial 154 int len = m.getNumDataElements(); 155 int cr = 1; // space for carriage return linefeed 156 157 byte[] msg = new byte[len + cr]; 158 159 for (int i = 0; i < len; i++) { 160 msg[i] = (byte) m.getElement(i); 161 } 162 msg[len] = 0x0d; 163 164 //queue the request to send, and notify the xmtHandler. 165 synchronized (xmtHandler) { 166 xmtList.addLast(msg); 167 xmtHandler.notify(); 168 } 169 170 } 171 172 // methods to connect/disconnect to a source of data in a XpaPortController 173 private XpaPortController controller = null; 174 175 /** 176 * Make connection to existing PortController object. 177 * 178 * @param p controller for the port associated with this controller 179 */ 180 public void connectPort(XpaPortController p) { 181 istream = p.getInputStream(); 182 ostream = p.getOutputStream(); 183 if (controller != null) { 184 log.warn("connectPort: connect called while connected"); 185 } 186 controller = p; 187 // Send the initialization string to the port 188 this.sendXpaMessage(XpaMessage.getDefaultInitMsg(), null); 189 } 190 191 /** 192 * Break connection to existing XpaPortController object. Once broken, 193 * attempts to send via "message" member will fail. 194 * 195 * @param p controller for the port associated with this controller 196 */ 197 public void disconnectPort(XpaPortController p) { 198 istream = null; 199 ostream = null; 200 if (controller != p) { 201 log.warn("disconnectPort: disconnect called from non-connected XpaPortController"); 202 } 203 controller = null; 204 } 205 206 // data members to hold the streams 207 DataInputStream istream = null; 208 OutputStream ostream = null; 209 210 /** 211 * Handle incoming characters. This is a permanent loop, looking for input 212 * messages in character form on the stream connected to the PortController 213 * via <code>connectPort</code>. Terminates with the input stream breaking 214 * out of the try block. 215 */ 216 @Override 217 public void run() { 218 while (true) { // loop permanently, stream close will exit via exception 219 try { 220 handleOneIncomingReply(); 221 } catch (java.io.IOException e) { 222 log.warn("run: Exception: {}", e.toString()); 223 } 224 } 225 } 226 227 void handleOneIncomingReply() throws java.io.IOException { 228 // we sit in this until the message is complete, relying on 229 // threading to let other stuff happen 230 231 // Create output message 232 XpaMessage msg = new XpaMessage(); 233 // message exists, now fill it 234 int i; 235 for (i = 0; i < XpaMessage.MAX_SIZE; i++) { 236 byte char1 = istream.readByte(); 237 msg.setElement(i, char1); 238 //if (endReply(msg)) break; 239 } 240 241 // message is complete, dispatch it !! 242 if (log.isDebugEnabled()) { 243 log.debug("dispatch reply of length {}", i); 244 } 245 { 246 final XpaMessage thisMsg = msg; 247 final XpaTrafficController thisTc = this; 248 // return a notification via the queue to ensure end 249 Runnable r = new Runnable() { 250 final XpaMessage msgForLater = thisMsg; 251 final XpaTrafficController myTc = thisTc; 252 253 @Override 254 public void run() { 255 log.debug("Delayed notify starts"); 256 myTc.notifyReply(msgForLater); 257 } 258 }; 259 javax.swing.SwingUtilities.invokeLater(r); 260 } 261 } 262 263 /** 264 * Captive class to handle transmission. 265 */ 266 class XmtHandler implements Runnable { 267 268 @SuppressFBWarnings(value = {"UW_UNCOND_WAIT", "NO_NOTIFY_NOT_NOTIFYALL"}, 269 justification = "while loop controls access") 270 @Override 271 public void run() { 272 while (true) { // loop forever 273 // Check to see if there is anything to send 274 try { 275 // get content; failure is a NoSuchElementException 276 if (log.isDebugEnabled()) { 277 log.debug("check for input"); 278 } 279 byte[] msg; 280 synchronized (this) { 281 msg = xmtList.removeFirst(); 282 } 283 284 // Now send this to the port 285 try { 286 if (ostream != null) { 287 if (log.isDebugEnabled()) { 288 log.debug("write message: {}", java.util.Arrays.toString(msg)); 289 } 290 synchronized (ostream) { 291 ostream.write(msg); 292 ostream.notify(); 293 } 294 } else { 295 // no stream connected 296 log.warn("sendMessage: no connection established"); 297 } 298 } catch (java.io.IOException e) { 299 log.warn("sendMessage: Exception: {}", e.toString()); 300 } 301 } catch (NoSuchElementException e) { 302 // message queue was empty, wait for input 303 if (log.isDebugEnabled()) { 304 log.debug("start wait"); 305 } 306 try { 307 synchronized (this) { 308 wait(); 309 } 310 } catch (java.lang.InterruptedException ei) { 311 Thread.currentThread().interrupt(); // retain if needed later 312 } 313 if (log.isDebugEnabled()) { 314 log.debug("end wait"); 315 } 316 } 317 } 318 } 319 } 320 321 private final static Logger log = LoggerFactory.getLogger(XpaTrafficController.class); 322 323}