001package jmri.jmrix.sprog;
002
003import java.io.DataInputStream;
004import java.io.OutputStream;
005import java.util.Vector;
006import java.util.concurrent.BlockingQueue;
007import java.util.concurrent.LinkedBlockingQueue;
008
009import jmri.jmrix.AbstractPortController;
010import jmri.jmrix.sprog.SprogConstants.SprogState;
011import jmri.jmrix.sprog.serialdriver.SerialDriverAdapter;
012
013/**
014 * Converts Stream-based I/O to/from Sprog messages. The "SprogInterface" side
015 * sends/receives message objects. The connection to a SprogPortController is
016 * via a pair of *Streams, which then carry sequences of characters for
017 * transmission. Note that this processing is handled in an independent thread.
018 * <p>
019 * Rewritten during 4.11.x series. Create a high priority thread for the tc to
020 * move everything off the swing thread. Use a blocking queue to handle
021 * asynchronous messages from multiple sources.
022 *
023 * @author Bob Jacobsen Copyright (C) 2001
024 * @author Andrew Crosland Copyright (C) 2018
025 */
026public class SprogTrafficController implements SprogInterface,
027        Runnable {
028
029    private SprogReply reply = new SprogReply();
030    SprogListener lastSender = null;
031    private SprogState sprogState = SprogState.NORMAL;
032    private int lastId;
033
034    private Thread tcThread;
035    private final Object lock = new Object();
036    private boolean replyAvailable = false;
037    // Make this public so it can be overridden by a script for debug
038    public int timeout = SprogConstants.TC_PROG_REPLY_TIMEOUT;
039
040    /**
041     * Create a new SprogTrafficController instance.
042     *
043     * @param adaptermemo the associated SystemConnectionMemo
044     */
045    @edu.umd.cs.findbugs.annotations.SuppressFBWarnings(value="SC_START_IN_CTOR", justification="done at end, waits for data")
046    public SprogTrafficController(SprogSystemConnectionMemo adaptermemo) {
047        memo = adaptermemo;
048        init();
049    }
050
051    private void init() {
052        // Set the timeout for communication with hardware
053        resetTimeout();
054
055        tcThread = jmri.util.ThreadingUtil.newThread(this);
056        tcThread.setName("SPROG TC thread");
057        tcThread.setPriority(Thread.MAX_PRIORITY-1);
058        tcThread.setDaemon(true);
059        log.debug("starting TC thread from {} ", this, jmri.util.LoggingUtil.shortenStacktrace(new Exception("traceback"),6));
060        tcThread.start();
061    }
062
063    // Methods to implement the Sprog Interface
064
065    protected Vector<SprogListener> cmdListeners = new Vector<SprogListener>();
066
067    @Override
068    public boolean status() {
069        return (ostream != null && istream != null);
070    }
071
072    /**
073     * Check if the Sprog TC Thread ( started on construction of
074     * SprogTrafficController ) is alive.
075     * For testing purposes.
076     * @return true if alive, else false.
077     */
078    public boolean isTcThreadAlive() {
079        return tcThread.isAlive();
080    }
081
082    @Override
083    public synchronized void addSprogListener(SprogListener l) {
084        // add only if not already registered
085        if (l == null) {
086            throw new java.lang.NullPointerException();
087        }
088        if (!cmdListeners.contains(l)) {
089            cmdListeners.addElement(l);
090            log.debug("SprogListener added to {} tc", memo.getUserName());
091        }
092    }
093
094    @Override
095    public synchronized void removeSprogListener(SprogListener l) {
096        if (cmdListeners.contains(l)) {
097            cmdListeners.removeElement(l);
098        }
099    }
100
101    /**
102     * Reset timeout to default depending on current mode
103     */
104    public void resetTimeout() {
105        if (memo.getSprogMode() == SprogConstants.SprogMode.OPS) {
106            timeout = SprogConstants.TC_OPS_REPLY_TIMEOUT;
107        } else {
108            timeout = SprogConstants.TC_PROG_REPLY_TIMEOUT;
109        }
110    }
111
112    public void setTimeout(int t) {
113        timeout = t;
114    }
115
116    public SprogState getSprogState() {
117        return sprogState;
118    }
119
120    public void setSprogState(SprogState s) {
121        this.sprogState = s;
122        if (s == SprogState.V4BOOTMODE) {
123            // enable flow control - required for sprog v4 bootloader
124            var controller = getController();
125            controller.setHandshake(jmri.jmrix.AbstractSerialPortController.FlowControl.RTSCTS);
126
127        } else {
128            // disable flow control
129            // removed Jan 2010 - this stops SPROG from sending. Could cause problems with
130            // serial Sprogs, but I have no way of testing:
131            // getController().setHandshake(false);
132        }
133        log.debug("Setting sprogState {}", s);
134    }
135
136    public boolean isNormalMode() {
137        return sprogState == SprogState.NORMAL;
138    }
139
140    public boolean isSIIBootMode() {
141        return sprogState == SprogState.SIIBOOTMODE;
142    }
143
144    public boolean isV4BootMode() {
145        return sprogState == SprogState.V4BOOTMODE;
146    }
147
148    @SuppressWarnings("unchecked")
149    private synchronized Vector<SprogListener> getCopyOfListeners() {
150        return (Vector<SprogListener>) cmdListeners.clone();
151
152    }
153
154    protected synchronized void notifyMessage(SprogMessage m, SprogListener originator) {
155        for (SprogListener listener : this.getCopyOfListeners()) {
156            try {
157                // don't send it back to the originator!
158                if (listener != originator) {
159                    // skip forwarding to the last sender for now, we'll get them later
160                    if (lastSender != listener) {
161                        listener.notifyMessage(m);
162                    }
163                }
164            } catch (Exception e) {
165                log.warn("notify: During dispatch to {}", listener, e);
166            }
167        }
168        // forward to the last listener who sent a message
169        // this is done _second_ so monitoring can have already stored the reply
170        // before a response is sent
171        if (lastSender != null && lastSender != originator) {
172            lastSender.notifyMessage(m);
173        }
174    }
175
176    protected synchronized void notifyReply(SprogReply r) {
177        log.debug("notifyReply starts for later, last sender: {}", lastSender);
178
179        final Vector<SprogListener> listeners = this.getCopyOfListeners();
180        final SprogReply replyForLater = r;
181        final SprogListener senderForLater = lastSender;
182        Runnable rl = () -> {
183            for (SprogListener listener : listeners) {
184                try {
185                    // don't send message back to the originator!
186                    // skip forwarding to the last sender for now, we'll get them later
187                    if (senderForLater != listener) {
188                        listener.notifyReply(replyForLater);
189                    }
190
191                } catch (Exception e) {
192                    log.warn("notify: During dispatch to {}", listener, e);
193                }
194            }
195            // forward to the last listener who sent a message
196            // this is done _second_ so monitoring can have already stored the reply
197            // before a response is sent
198            if (senderForLater != null) {
199                senderForLater.notifyReply(replyForLater);
200            }
201        };
202        javax.swing.SwingUtilities.invokeLater(rl);
203    }
204
205    protected synchronized void notifyReply(SprogReply r, SprogListener lastSender) {
206        log.debug("notifyReply starts for later, last sender: {}", lastSender);
207
208        final Vector<SprogListener> listeners = this.getCopyOfListeners();
209        final SprogReply replyForLater = r;
210        final SprogListener senderForLater = lastSender;
211        Runnable rl = () -> {
212            log.debug("notifyReply starts last sender: {}", senderForLater);
213            for (SprogListener listener : listeners) {
214                try {
215                //if is message don't send it back to the originator!
216                    // skip forwarding to the last sender for now, we'll get them later
217                    if (senderForLater != listener) {
218                        log.debug("Notify listener: {} {}", listener, r.toString());
219                        listener.notifyReply(replyForLater);
220                    }
221
222                } catch (Exception e) {
223                    log.warn("notify: During dispatch to {}", listener, e);
224                }
225            }
226
227            // forward to the last listener who sent a message
228            // this is done _second_ so monitoring can have already stored the reply
229            // before a response is sent
230            if (senderForLater != null) {
231                log.debug("notify last sender: {} {}", senderForLater, replyForLater.toString());
232                senderForLater.notifyReply(replyForLater);
233            }
234        };
235        javax.swing.SwingUtilities.invokeLater(rl);
236    }
237
238    // A class to remember the message and who sent it
239    static private class MessageTuple {
240        private final SprogMessage message;
241        private final SprogListener listener;
242
243        public MessageTuple(SprogMessage m, SprogListener l) {
244            message = m;
245            listener = l;
246        }
247
248        // Copy constructor
249        public MessageTuple(MessageTuple mt) {
250            message = mt.message;
251            listener = mt.listener;
252        }
253    }
254
255    // The queue to hold messages being sent
256    BlockingQueue<MessageTuple> sendQueue = new LinkedBlockingQueue<MessageTuple>();
257
258    /**
259     * Enqueue a preformatted message to be sent to the actual interface
260     *
261     * @param m The message to be forwarded
262     */
263    public void sendSprogMessage(SprogMessage m) {
264        log.debug("Add message to queue: [{}] id: {}", m.toString(isSIIBootMode()), m.getId());
265        try {
266            sendQueue.add(new MessageTuple(m, null));
267        } catch (Exception e) {
268            log.error("Could not add message to queue", e);
269        }
270    }
271
272    /**
273     * Enqueue a preformatted message to be sent to the actual interface
274     *
275     * @param m         Message to send
276     * @param replyTo   Who is sending the message
277     */
278    @Override
279    public synchronized void sendSprogMessage(SprogMessage m, SprogListener replyTo) {
280        log.debug("Add message to queue: [{}] id: {}", m.toString(isSIIBootMode()), m.getId());
281        try {
282            sendQueue.add(new MessageTuple(m, replyTo));
283        } catch (Exception e) {
284            log.error("Could not add message to queue", e);
285        }
286    }
287
288    /**
289     * Block until a message is available from the queue, send it to the interface
290     * and then block until reply is received or a timeout occurs. This will be
291     * a very long timeout to allow for page mode programming operations in SPROG
292     * programmer mode.
293     */
294    @Override
295    public void run() {
296        MessageTuple messageToSend;
297        log.debug("Traffic controller queuing thread starts");
298        while (true) {
299            log.debug("Traffic controller queue waiting");
300            try {
301                messageToSend = new MessageTuple(sendQueue.take());
302            } catch (InterruptedException e) {
303                log.debug("Thread interrupted while dequeuing message to send");
304                return;
305            }
306            log.debug("Message dequeued {} id: {}", messageToSend.message, messageToSend.message.getId());
307            // remember who sent this
308            lastSender = messageToSend.listener;
309            lastId = messageToSend.message.getId();
310            // notify all _other_ listeners
311            notifyMessage(messageToSend.message, messageToSend.listener);
312            replyAvailable = false;
313            sendToInterface(messageToSend.message);
314            log.debug("Waiting {} for a reply", timeout);
315            try {
316                synchronized (lock) {
317                    lock.wait(timeout); // Wait for notify
318                }
319            } catch (InterruptedException e) {
320                log.debug("waitingForReply interrupted");
321            }
322            if (!replyAvailable) {
323                // Timed out
324                log.warn("Timeout waiting for reply from hardware in SprogState {}", sprogState);
325            } else {
326                log.debug("Notified of reply");
327            }
328        }
329    }
330
331    /**
332     * Forward a preformatted message to the interface.
333     *
334     * @param m The message to be forwarded
335     */
336    public void sendToInterface(SprogMessage m) {
337        // stream to port in single write, as that's needed by serial
338        try {
339            if (ostream != null) {
340                ostream.write(m.getFormattedMessage(sprogState));
341                log.debug("sendSprogMessage written to ostream");
342            } else {
343                // no stream connected
344                log.warn("sendMessage: no connection established");
345            }
346        } catch (Exception e) {
347            log.warn("sendMessage: Exception: ", e);
348        }
349    }
350
351// methods to connect/disconnect to a source of data in a SprogPortController
352    private AbstractPortController controller = null;
353
354    /**
355     * Make connection to existing PortController object.
356     *
357     * @param p The port controller
358     */
359    public void connectPort(AbstractPortController p) {
360        istream = p.getInputStream();
361        ostream = p.getOutputStream();
362        if (controller != null) {
363            log.warn("connectPort: connect called while connected");
364        }
365        controller = p;
366    }
367
368    /**
369     * Get the port controller, as a SerialDriverAdapter.
370     *
371     * @return the port controller
372     */
373    protected SerialDriverAdapter getController(){
374       return (SerialDriverAdapter) controller;
375    }
376
377    /**
378     * Break connection to existing SprogPortController object.
379     * <p>
380     * Once broken, attempts to send via "message" member will fail.
381     *
382     * @param p the connection to break
383     */
384    public void disconnectPort(AbstractPortController p) {
385        istream = null;
386        ostream = null;
387        if (controller != p) {
388            log.warn("disconnectPort: disconnect called from non-connected SprogPortController");
389        }
390        controller = null;
391    }
392
393    static volatile protected SprogTrafficController self = null;
394
395    public void setAdapterMemo(SprogSystemConnectionMemo adaptermemo) {
396        memo = adaptermemo;
397    }
398
399    public SprogSystemConnectionMemo getAdapterMemo() {
400        return memo;
401    }
402
403    private SprogSystemConnectionMemo memo = null;
404
405    // data members to hold the streams
406    DataInputStream istream = null;
407    OutputStream ostream = null;
408
409    boolean endReply(SprogReply msg) {
410        return msg.endNormalReply() || msg.endBootReply();
411    }
412
413    private boolean unsolicited;
414
415    /**
416     * Handle an incoming reply.
417     */
418    public void handleOneIncomingReply() {
419        // we get here if data has been received and this method is explicitly invoked
420        // fill the current reply with any data received
421        int replyCurrentSize = reply.getNumDataElements();
422        int i;
423        for (i = replyCurrentSize; i < SprogReply.maxSize - replyCurrentSize; i++) {
424            try {
425                if (istream.available() == 0) {
426                    break; // nothing waiting to be read
427                }
428                byte char1 = istream.readByte();
429                reply.setElement(i, char1);
430
431            } catch (Exception e) {
432                log.warn("Exception in DATA_AVAILABLE state", e);
433            }
434            if (endReply(reply)) {
435                sendreply();
436                break;
437            }
438        }
439    }
440
441    /**
442     * Send the current reply - built using data from serialEvent.
443     */
444    private void sendreply() {
445        //send the reply
446        log.debug("dispatch reply of length {} in SprogState {}", reply.getNumDataElements(), sprogState);
447        if (unsolicited) {
448            log.debug("Unsolicited Reply");
449            reply.setUnsolicited();
450        }
451        // Insert the id
452        reply.setId(lastId);
453        notifyReply(reply, lastSender);
454        log.debug("Notify() wait");
455        replyAvailable = true;
456        synchronized(lock) {
457            lock.notifyAll();
458        }
459
460        //Create a new reply, ready to be filled
461        reply = new SprogReply();
462    }
463
464    public void dispose(){
465        tcThread.interrupt();
466        try {
467            tcThread.join();
468        } catch (InterruptedException e) {
469            // Do nothing
470        }
471    }
472
473    private static final org.slf4j.Logger log = org.slf4j.LoggerFactory.getLogger(SprogTrafficController.class);
474
475}