001package jmri.jmrix.roco.z21;
002
003import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
004import java.net.DatagramPacket;
005import java.util.ArrayList;
006import java.util.Arrays;
007import java.util.List;
008
009import jmri.jmrix.*;
010import org.slf4j.Logger;
011import org.slf4j.LoggerFactory;
012
013/**
014 * Abstract base for TrafficControllers in a Message/Reply protocol.
015 *
016 * @author Paul Bender Copyright (C) 2014
017 */
018public class Z21TrafficController extends jmri.jmrix.AbstractMRTrafficController implements Z21Interface {
019
020    private java.net.InetAddress host;
021    private int port;
022
023    public Z21TrafficController() {
024        super();
025        allowUnexpectedReply = true;
026    }
027
028    /**
029     * Implement this to forward a specific message type to a protocol-specific
030     * listener interface. This puts the casting into the concrete class.
031     */
032    @Override
033    protected void forwardMessage(AbstractMRListener client, AbstractMRMessage m) {
034        ((Z21Listener) client).message((Z21Message) m);
035    }
036
037    /**
038     * Implement this to forward a specific Reply type to a protocol-specific
039     * listener interface. This puts the casting into the concrete class.
040     */
041    @Override
042    protected void forwardReply(AbstractMRListener client, AbstractMRReply m) {
043        ((Z21Listener) client).reply((Z21Reply) m);
044    }
045
046    /**
047     * Invoked if it's appropriate to do low-priority polling of the command
048     * station, this should return the next message to send, or null if the TC
049     * should just sleep.
050     */
051    @Override
052    protected Z21Message pollMessage() {
053        return null;
054    }
055
056    @Override
057    protected Z21Listener pollReplyHandler() {
058        return null;
059    }
060
061    /**
062     * enterProgMode() and enterNormalMode() return any message that
063     * needs to be returned to the command station to change modes.
064     *
065     * @see #enterNormalMode()
066     * @return if no message is needed, you may return null.
067     *
068     * If the programmerIdle() function returns true, enterNormalMode() is
069     * called after a timeout while in IDLESTATE during programming to
070     * return the system to normal mode.
071     */
072    @Override
073    protected Z21Message enterProgMode() {
074        return null;
075    }
076
077    /**
078     * enterProgMode() and enterNormalMode() return any message that
079     * needs to be returned to the command station to change modes.
080     *
081     * @see #enterProgMode()
082     * @return if no message is needed, you may return null.
083     */
084    @Override
085    protected Z21Message enterNormalMode() {
086        return null;
087    }
088
089    /**
090     * Actually transmits the next message to the port.
091     */
092    @SuppressFBWarnings(value = {"TLW_TWO_LOCK_WAIT", "", "UW_UNCOND_WAIT"},
093            justification = "Two locks needed for synchronization here, this is OK; String + only used for debug, so inefficient String processing not really a problem; Unconditional Wait is to give external hardware, which doesn't necessarilly respond, time to process the data.")
094    @Override
095    synchronized protected void forwardToPort(AbstractMRMessage m, AbstractMRListener reply) {
096        if (log.isDebugEnabled()) {
097            log.debug("forwardToPort message: [{}]", m);
098        }
099        // remember who sent this
100        mLastSender = reply;
101
102        // forward the message to the registered recipients,
103        // which includes the communications monitor, except the sender.
104        // Schedule notification via the Swing event queue to ensure order
105        Runnable r = new XmtNotifier(m, mLastSender, this);
106        javax.swing.SwingUtilities.invokeLater(r);
107
108        // stream to port in single write, as that's needed by serial
109        byte[] msg = new byte[lengthOfByteStream(m)];
110        // add header
111        int offset = addHeaderToOutput(msg, m);
112
113        // add data content
114        int len = m.getNumDataElements();
115        for (int i = 0; i < len; i++) {
116            msg[i + offset] = (byte) m.getElement(i);
117        }
118        // add trailer
119        addTrailerToOutput(msg, len + offset, m);
120        // and send the bytes
121        try {
122            if (log.isDebugEnabled()) {
123                StringBuilder f = new StringBuilder("formatted message: ");
124                for (byte b : msg) {
125                    f.append(Integer.toHexString(0xFF & b));
126                    f.append(" ");
127                }
128                log.debug(new String(f));
129            }
130            while (m.getRetries() >= 0) {
131                if (portReadyToSend(controller)) {
132                    // create a datagram with the data from the
133                    // message.
134                    byte[] data = ((Z21Message) m).getBuffer();
135                    DatagramPacket sendPacket
136                            = new DatagramPacket(data, ((Z21Message) m).getLength(), host, port);
137                    // and send it.
138                    ((Z21Adapter) controller).getSocket().send(sendPacket);
139                    log.debug("written, msg timeout: {} mSec", m.getTimeout());
140                    break;
141                } else if (m.getRetries() >= 0) {
142                    if (log.isDebugEnabled()) {
143                        StringBuilder b = new StringBuilder("Retry message: ");
144                        b.append(m.toString());
145                        b.append(" attempts remaining: ");
146                        b.append(m.getRetries());
147                        log.debug(new String(b));
148                    }
149                    m.setRetries(m.getRetries() - 1);
150                    try {
151                        synchronized (xmtRunnable) {
152                            xmtRunnable.wait(m.getTimeout());
153                        }
154                    } catch (InterruptedException e) {
155                        Thread.currentThread().interrupt(); // retain if needed later
156                        if(!threadStopRequest) {
157                           log.error("retry wait interrupted");
158                        } else {
159                           log.error("retry wait interrupted during thread stop");
160                        }
161                    }
162                } else {
163                    log.warn("sendMessage: port not ready for data sending: {}", java.util.Arrays.toString(msg));
164                }
165            }
166        } catch (Exception e) {
167            // TODO Currently there's no port recovery if an exception occurs
168            // must restart JMRI to clear xmtException.
169            xmtException = true;
170            portWarn(e);
171        }
172    }
173
174    @Override()
175    public boolean status() {
176        if (controller == null) {
177            return false;
178        } else {
179            return (controller.status());
180        }
181    }
182
183    /**
184     * Make connection to existing PortController object.
185     */
186    @Override
187    public void connectPort(AbstractPortController p) {
188        rcvException = false;
189        xmtException = false;
190        if (controller != null) {
191            log.warn("connectPort: connect called while connected");
192        } else {
193            log.debug("connectPort invoked");
194        }
195        if (!(p instanceof Z21Adapter)) {
196            throw new IllegalArgumentException("attempt to connect wrong port type");
197        }
198        controller = p;
199        try {
200            host = java.net.InetAddress.getByName(((Z21Adapter) controller).getHostName());
201            port = ((Z21Adapter) controller).getPort();
202            ConnectionStatus.instance().setConnectionState(
203                    p.getSystemConnectionMemo().getUserName(),
204                    ((Z21Adapter) p).getHostName() + ":" + ((Z21Adapter) p).getPort(), ConnectionStatus.CONNECTION_UP);
205        } catch (java.net.UnknownHostException uhe) {
206            log.error("Unknown Host: {} ", ((Z21Adapter) controller).getHostName());
207            if (((Z21Adapter) p).getPort() != 0) {
208                ConnectionStatus.instance().setConnectionState(
209                        p.getSystemConnectionMemo().getUserName(),
210                        ((Z21Adapter) controller).getHostName() + ":" + ((Z21Adapter) p).getPort(), ConnectionStatus.CONNECTION_DOWN);
211            } else {
212                ConnectionStatus.instance().setConnectionState(
213                        p.getSystemConnectionMemo().getUserName(),
214                        ((Z21Adapter) controller).getHostName(), ConnectionStatus.CONNECTION_DOWN);
215            }
216        }
217        // and start threads
218        xmtThread = new Thread(xmtRunnable = () -> {
219            try {
220                transmitLoop();
221            } catch (Throwable e) {
222                if(!threadStopRequest)
223                    log.error("Transmit thread terminated prematurely by: {}", e.toString(), e);
224            }
225        });
226        xmtThread.setName("z21.Z21TrafficController Transmit thread");
227        xmtThread.start();
228        rcvThread = new Thread(this::receiveLoop);
229        rcvThread.setName("z21.Z21TrafficController Receive thread");
230        int xr = rcvThread.getPriority();
231        xr++;
232        rcvThread.setPriority(xr);      //bump up the priority
233        rcvThread.start();
234    }
235
236    /**
237     * Break connection to existing PortController object. Once broken, attempts
238     * to send via "message" member will fail.
239     */
240    @Override
241    public void disconnectPort(AbstractPortController p) {
242        if (controller != p) {
243            log.warn("disconnectPort: disconnect called from non-connected AbstractPortController");
244        }
245        controller = null;
246    }
247
248    @Override
249    protected Z21Reply newReply() {
250        return new Z21Reply();
251    }
252
253    @Override
254    protected boolean endOfMessage(AbstractMRReply r) {
255        // since this is a UDP protocol, and each reply in the packet is complete,
256        // we don't check for end of message manually.
257        return true;
258    }
259
260    /**
261     * Handle each reply when complete.
262     * <p>
263     * (This is public for testing purposes) Runs in the "Receive" thread.
264     */
265    @SuppressFBWarnings(value = {"UW_UNCOND_WAIT", "WA_NOT_IN_LOOP", "NO_NOTIFY_NOT_NOTIFYALL"},
266            justification = "Wait is for external hardware, which doesn't necessarilly respond, to process the data.  Notify is used because Having more than one thread waiting on xmtRunnable is an error.")
267    @Override
268    public void handleOneIncomingReply() throws java.io.IOException {
269        // we sit in this until the message is complete, relying on
270        // threading to let other stuff happen
271
272        // create a buffer to hold the incoming data.
273        byte[] buffer = new byte[100];  // the size here just needs to be longer
274        // than the longest protocol message.
275        // Otherwise, the receive will truncate.
276
277        // create the packet.
278        DatagramPacket receivePacket = new DatagramPacket(buffer, 100, host, port);
279
280        // and wait to receive data in the packet.
281        try {
282            ((Z21Adapter) controller).getSocket().receive(receivePacket);
283        } catch (java.net.SocketException | NullPointerException se) {
284            // if we are waiting when the controller is disposed,
285            // a socket exception will be thrown.
286            log.debug("Socket exception during receive.  Connection Closed?");
287            rcvException = true;
288            return;
289        }
290        if (threadStopRequest) return;
291
292        // handle more than one reply in the same UDP packet.
293        List<Z21Reply> replies = new ArrayList<>();
294
295        int totalLength=receivePacket.getLength();
296        int consumed=0;
297
298        do {
299            int length = (0xff & buffer[0]) + ((0xff & buffer[1]) << 8);
300            Z21Reply msg = new Z21Reply(buffer, length);
301
302            replies.add(msg);
303
304            buffer = Arrays.copyOfRange(buffer,length,buffer.length);
305            consumed +=length;
306            log.trace("total length: {} consumed {}",totalLength,consumed);
307        } while(totalLength>consumed);
308
309
310        // and then dispatch each reply
311        replies.forEach(this::dispatchReply);
312    }
313
314    private void dispatchReply(Z21Reply msg) {
315        // message is complete, dispatch it !!
316        replyInDispatch = true;
317        if (log.isDebugEnabled()) {
318            log.debug("dispatch reply of length {} contains {} state {}", msg.getNumDataElements(), msg.toString(), mCurrentState);
319        }
320
321        // forward the message to the registered recipients,
322        // which includes the communications monitor
323        // return a notification via the Swing event queue to ensure proper thread
324        Runnable r = new RcvNotifier(msg, mLastSender, this);
325        try {
326            javax.swing.SwingUtilities.invokeAndWait(r);
327        } catch (InterruptedException ie) {
328            if(threadStopRequest) return;
329            log.error("Unexpected exception in invokeAndWait:{}", ie, ie);
330        } catch (Exception e) {
331            log.error("Unexpected exception in invokeAndWait:{}", e, e);
332        }
333        if (log.isDebugEnabled()) {
334            log.debug("dispatch thread invoked");
335        }
336
337        if (!msg.isUnsolicited()) {
338            // effect on transmit:
339            switch (mCurrentState) {
340                case WAITMSGREPLYSTATE: {
341                    // check to see if the response was an error message we want
342                    // to automatically handle by re-queueing the last sent
343                    // message, otherwise go on to the next message
344                    if (msg.isRetransmittableErrorMsg()) {
345                        if (log.isDebugEnabled()) {
346                            log.debug("Automatic Recovery from Error Message: +msg.toString()");
347                        }
348                        synchronized (xmtRunnable) {
349                            mCurrentState = AUTORETRYSTATE;
350                            replyInDispatch = false;
351                            xmtRunnable.notify();
352                        }
353                    } else {
354                        // update state, and notify to continue
355                        synchronized (xmtRunnable) {
356                            mCurrentState = NOTIFIEDSTATE;
357                            replyInDispatch = false;
358                            xmtRunnable.notify();
359                        }
360                    }
361                    break;
362                }
363                case WAITREPLYINPROGMODESTATE: {
364                    // entering programming mode
365                    mCurrentMode = PROGRAMINGMODE;
366                    replyInDispatch = false;
367
368                    // check to see if we need to delay to allow decoders to become
369                    // responsive
370                    int warmUpDelay = enterProgModeDelayTime();
371                    if (warmUpDelay != 0) {
372                        try {
373                            synchronized (xmtRunnable) {
374                                xmtRunnable.wait(warmUpDelay);
375                            }
376                        } catch (InterruptedException e) {
377                            Thread.currentThread().interrupt(); // retain if needed later
378                            if (threadStopRequest) return;
379                        }
380                    }
381                    // update state, and notify to continue
382                    synchronized (xmtRunnable) {
383                        mCurrentState = OKSENDMSGSTATE;
384                        xmtRunnable.notify();
385                    }
386                    break;
387                }
388                case WAITREPLYINNORMMODESTATE: {
389                    // entering normal mode
390                    mCurrentMode = NORMALMODE;
391                    replyInDispatch = false;
392                    // update state, and notify to continue
393                    synchronized (xmtRunnable) {
394                        mCurrentState = OKSENDMSGSTATE;
395                        xmtRunnable.notify();
396                    }
397                    break;
398                }
399                default: {
400                    replyInDispatch = false;
401                    if (allowUnexpectedReply) {
402                        if (log.isDebugEnabled()) {
403                            log.debug("Allowed unexpected reply received in state: {} was {}", mCurrentState, msg.toString());
404                        }
405                        synchronized (xmtRunnable) {
406                            // The transmit thread sometimes gets stuck
407                            // when unexpected replies are received.  Notify
408                            // it to clear the block without a timeout.
409                            // (do not change the current state)
410                            //if(mCurrentState!=IDLESTATE)
411                            xmtRunnable.notify();
412                        }
413                    } else {
414                        unexpectedReplyStateError(mCurrentState,msg.toString());
415                    }
416                }
417            }
418            // Unsolicited message
419        } else {
420            if (log.isDebugEnabled()) {
421                log.debug("Unsolicited Message Received {}", msg.toString());
422            }
423
424            replyInDispatch = false;
425        }
426    }
427
428    @SuppressFBWarnings(value = {"UW_UNCOND_WAIT", "WA_NOT_IN_LOOP"},
429            justification = "Wait is for external hardware, which doesn't necessarilly respond, to process the data.")
430    @Override
431    protected void terminate() {
432        if (controller == null) {
433            log.debug("terminate called while not connected");
434            return;
435        } else {
436            log.debug("Cleanup Starts");
437        }
438
439        Z21Message logoffMessage = Z21Message.getLanLogoffRequestMessage();
440        forwardToPort(logoffMessage, null);
441        // wait for reply
442        try {
443            if (xmtRunnable != null) {
444                synchronized (xmtRunnable) {
445                    xmtRunnable.wait(logoffMessage.getTimeout());
446                }
447            }
448        } catch (InterruptedException e) {
449            Thread.currentThread().interrupt(); // retain if needed later
450            log.error("transmit interrupted");
451        } finally {
452            // set the controller to null, even if terminate fails.
453            controller = null;
454        }
455    }
456
457    /**
458     * Terminate the receive and transmit threads.
459     * <p>
460     * This is intended to be used only by testing subclasses.
461     */
462    @Override
463    public void terminateThreads() {
464        threadStopRequest = true;
465        // ensure socket closed to end pending operations
466        if ( controller != null && ((Z21Adapter) controller).getSocket() != null) ((Z21Adapter) controller).getSocket().close();
467
468        // usual stop process
469        super.terminateThreads();
470    }
471
472    // The methods to implement the Z21Interface
473    @Override
474    public synchronized void addz21Listener(Z21Listener l) {
475        this.addListener(l);
476    }
477
478    @Override
479    public synchronized void removez21Listener(Z21Listener l) {
480        this.removeListener(l);
481    }
482
483    /**
484     * Forward a preformatted message to the actual interface.
485     */
486    @Override
487    public void sendz21Message(Z21Message m, Z21Listener reply) {
488        sendMessage(m, reply);
489    }
490
491    private final static Logger log = LoggerFactory.getLogger(Z21TrafficController.class);
492}