Eclipse, and the +Eclipse Logo are registered trademarks of the Eclipse Foundation. + +Paho is a trademark of the Eclipse Foundation. Eclipse, and the Eclipse Logo are +registered trademarks of the Eclipse Foundation. + +## Copyright + +All content is the property of the respective authors or their employers. +For more information regarding authorship of content, please consult the +listed source code repository logs. + +## Declared Project Licenses + +This program and the accompanying materials are made available under the terms of the +Eclipse Public License v2.0 and Eclipse Distribution License v1.0 which accompany this +distribution. + +The Eclipse Public License is available at +https://www.eclipse.org/legal/epl-2.0/ +and the Eclipse Distribution License is available at +http://www.eclipse.org/org/documents/edl-v10.php. + +For an explanation of what dual-licensing means to you, see: +https://www.eclipse.org/legal/eplfaq.php#DUALLIC + +SPDX-License-Identifier: EPL-2.0 or BSD-3-Clause + +## Source Code + +The project maintains the following source code repositories: + + * https://github.com/eclipse/paho.mqtt.golang + +## Third-party Content + +This project makes use of the follow third party projects. + +Go Programming Language and Standard Library + +* License: BSD-style license (https://golang.org/LICENSE) +* Project: https://golang.org/ + +Go Networking + +* License: BSD 3-Clause style license and patent grant. +* Project: https://cs.opensource.google/go/x/net + +Go Sync + +* License: BSD 3-Clause style license and patent grant. +* Project: https://cs.opensource.google/go/x/sync/ + +Gorilla Websockets v1.4.2 + +* License: BSD 2-Clause "Simplified" License +* Project: https://github.com/gorilla/websocket + +## Cryptography + +Content may contain encryption software. The country in which you are currently +may have restrictions on the import, possession, and use, and/or re-export to +another country, of encryption software. BEFORE using any encryption software, +please check the country's laws, regulations and policies concerning the import, +possession, or use, and re-export of encryption software, to see if this is +permitted. \ No newline at end of file diff --git a/vendor/github.com/eclipse/paho.mqtt.golang/README.md b/vendor/github.com/eclipse/paho.mqtt.golang/README.md index c1acf4e..63a7f44 100644 --- a/vendor/github.com/eclipse/paho.mqtt.golang/README.md +++ b/vendor/github.com/eclipse/paho.mqtt.golang/README.md @@ -111,36 +111,54 @@ identifier; this is as per the [spec](https://docs.oasis-open.org/mqtt/mqtt/v3.1 `ClientOptions.SetOrderMatters(false)` set). If you wish to perform a long-running task, or publish a message, then please use a go routine (blocking in the handler is a common cause of unexpected `pingresp not received, disconnecting` errors). -* When QOS1+ subscriptions have been created previously and you connect with `CleanSession` set to false it is possible that the broker will deliver retained -messages before `Subscribe` can be called. To process these messages either configure a handler with `AddRoute` or -set a `DefaultPublishHandler`. +* When QOS1+ subscriptions have been created previously and you connect with `CleanSession` set to false it is possible +that the broker will deliver retained messages before `Subscribe` can be called. To process these messages either +configure a handler with `AddRoute` or set a `DefaultPublishHandler`. * Loss of network connectivity may not be detected immediately. If this is an issue then consider setting -`ClientOptions.KeepAlive` (sends regular messages to check the link is active). -* Brokers offer many configuration options; some settings may lead to unexpected results. If using Mosquitto check -`max_inflight_messages`, `max_queued_messages`, `persistence` (the defaults may not be what you expect). +`ClientOptions.KeepAlive` (sends regular messages to check the link is active). +* Reusing a `Client` is not completely safe. After calling `Disconnect` please create a new Client (`NewClient()`) rather +than attempting to reuse the existing one (note that features such as `SetAutoReconnect` mean this is rarely necessary). +* Brokers offer many configuration options; some settings may lead to unexpected results. +* Publish tokens will complete if the connection is lost and re-established using the default +options.SetAutoReconnect(true) functionality (token.Error() will return nil). Attempts will be made to re-deliver the +message but there is currently no easy way know when such messages are delivered. + +If using Mosquitto then there are a range of fairly common issues: +* `listener` - By default [Mosquitto v2+](https://mosquitto.org/documentation/migrating-to-2-0/) listens on loopback +interfaces only (meaning it will only accept connections made from the computer its running on). +* `max_inflight_messages` - Unless this is set to 1 mosquitto does not guarantee ordered delivery of messages. +* `max_queued_messages` / `max_queued_bytes` - These impose limits on the number/size of queued messages. The defaults +may lead to messages being silently dropped. +* `persistence` - Defaults to false (messages will not survive a broker restart) +* `max_keepalive` - defaults to 65535 and, from version 2.0.12, `SetKeepAlive(0)` will result in a rejected connection +by default. Reporting bugs -------------- Please report bugs by raising issues for this project in github https://github.com/eclipse/paho.mqtt.golang/issues -*A limited number of contributors monitor the issues section so if you have a general question please consider the -resources in the [more information](#more-information) section (your question will be seen by more people, and you are -likely to receive an answer more quickly).* +A limited number of contributors monitor the issues section so if you have a general question please see the +resources in the [more information](#more-information) section for help. We welcome bug reports, but it is important they are actionable. A significant percentage of issues reported are not resolved due to a lack of information. If we cannot replicate the problem then it is unlikely we will be able to fix it. -The information required will vary from issue to issue but consider including: +The information required will vary from issue to issue but almost all bug reports would be expected to include: -* Which version of the package you are using (tag or commit - this should be in your go.mod file) -* A [Minimal, Reproducible Example](https://stackoverflow.com/help/minimal-reproducible-example). Providing an example -is the best way to demonstrate the issue you are facing; it is important this includes all relevant information -(including broker configuration). Docker (see `cmd/docker`) makes it relatively simple to provide a working end-to-end -example. +* Which version of the package you are using (tag or commit - this should be in your `go.mod` file) * A full, clear, description of the problem (detail what you are expecting vs what actually happens). +* Configuration information (code showing how you connect, please include all references to `ClientOption`) +* Broker details (name and version). + +If at all possible please also include: * Details of your attempts to resolve the issue (what have you tried, what worked, what did not). -* [Application Logs](#logging) covering the period the issue occurred. Unless you have isolated the root cause of the issue please include a link to a full log (including data from well before the problem arose). -* Broker Logs covering the period the issue occurred. +* A [Minimal, Reproducible Example](https://stackoverflow.com/help/minimal-reproducible-example). Providing an example +is the best way to demonstrate the issue you are facing; it is important this includes all relevant information +(including broker configuration). Docker (see `cmd/docker`) makes it relatively simple to provide a working end-to-end +example. +* Broker logs covering the period the issue occurred. +* [Application Logs](#logging) covering the period the issue occurred. Unless you have isolated the root cause of the +issue please include a link to a full log (including data from well before the problem arose). It is important to remember that this library does not stand alone; it communicates with a broker and any issues you are seeing may be due to: @@ -158,7 +176,7 @@ Contributing ------------ We welcome pull requests but before your contribution can be accepted by the project, you need to create and -electronically sign the Eclipse Contributor Agreement (ECA) and sign off on the Eclipse Foundation Certificate of Origin. +electronically sign the Eclipse Contributor Agreement (ECA) and sign off on the Eclipse Foundation Certificate of Origin. More information is available in the [Eclipse Development Resources](http://wiki.eclipse.org/Development_Resources/Contributing_via_Git); please take special @@ -167,11 +185,12 @@ note of the requirement that the commit record contain a "Signed-off-by" entry. More information ---------------- +[Stack Overflow](https://stackoverflow.com/questions/tagged/mqtt+go) has a range questions/answers covering a range of +common issues (both relating to use of this library and MQTT in general). This is the best place to ask general questions +(including those relating to the use of this library). + Discussion of the Paho clients takes place on the [Eclipse paho-dev mailing list](https://dev.eclipse.org/mailman/listinfo/paho-dev). This program and the accompanying materials - * are made available under the terms of the Eclipse Public License v1.0 - * which accompanies this distribution, and is available at - * http://www.eclipse.org/legal/epl-v10.html + * are made available under the terms of the Eclipse Public License v2.0 + * and Eclipse Distribution License v1.0 which accompany this distribution. + * + * The Eclipse Public License is available at + * https://www.eclipse.org/legal/epl-2.0/ + * and the Eclipse Distribution License is available at + * http://www.eclipse.org/org/documents/edl-v10.php. * * Contributors: * Seth Hoenig * Allan Stockdill-Mander * Mike Robertson + * Matt Brittan */ // Portions copyright © 2018 TIBCO Software Inc. @@ -19,6 +24,7 @@ package mqtt import ( "bytes" + "context" "errors" "fmt" "net" @@ -27,14 +33,9 @@ import ( "sync/atomic" "time" - "github.com/eclipse/paho.mqtt.golang/packets" -) + "golang.org/x/sync/semaphore" -const ( - disconnected uint32 = iota - connecting - reconnecting - connected + "github.com/eclipse/paho.mqtt.golang/packets" ) // Client is the interface definition for a Client as used by this @@ -44,9 +45,12 @@ const ( // with an MQTT server using non-blocking methods that allow work // to be done in the background. // An application may connect to an MQTT server using: -// A plain TCP socket -// A secure SSL/TLS socket -// A websocket +// +// A plain TCP socket (e.g. mqtt://test.mosquitto.org:1833) +// A secure SSL/TLS socket (e.g. tls://test.mosquitto.org:8883) +// A websocket (e.g ws://test.mosquitto.org:8080 or wss://test.mosquitto.org:8081) +// Something else (using `options.CustomOpenConnectionFn`) +// // To enable ensured message delivery at Quality of Service (QoS) levels // described in the MQTT spec, a message persistence mechanism must be // used. This is done by providing a type which implements the Store @@ -120,8 +124,7 @@ type client struct { lastReceived atomic.Value // time.Time - the last time a packet was successfully received from network pingOutstanding int32 // set to 1 if a ping has been sent but response not ret received - status uint32 // see const definitions at top of file for possible values - sync.RWMutex // Protects the above two variables (note: atomic writes are also used somewhat inconsistently) + status connectionStatus // see constants in status.go for values messageIds // effectively a map from message id to token completor @@ -161,7 +164,6 @@ func NewClient(o *ClientOptions) Client { c.options.protocolVersionExplicit = false } c.persist = c.options.Store - c.status = disconnected c.messageIds = messageIds{index: make(map[uint16]tokenCompletor)} c.msgRouter = newRouter() c.msgRouter.setDefaultHandler(c.options.DefaultPublishHandler) @@ -188,47 +190,27 @@ func (c *client) AddRoute(topic string, callback MessageHandler) { // the client is connected or not. // connected means that the connection is up now OR it will // be established/reestablished automatically when possible +// Warning: The connection status may change at any time so use this with care! func (c *client) IsConnected() bool { - c.RLock() - defer c.RUnlock() - status := atomic.LoadUint32(&c.status) + // This will need to change if additional statuses are added + s, r := c.status.ConnectionStatusRetry() switch { - case status == connected: - return true - case c.options.AutoReconnect && status > connecting: + case s == connected: return true - case c.options.ConnectRetry && status == connecting: + case c.options.ConnectRetry && s == connecting: return true + case c.options.AutoReconnect: + return s == reconnecting || (s == disconnecting && r) // r indicates we will reconnect default: return false } } // IsConnectionOpen return a bool signifying whether the client has an active -// connection to mqtt broker, i.e not in disconnected or reconnect mode +// connection to mqtt broker, i.e. not in disconnected or reconnect mode +// Warning: The connection status may change at any time so use this with care! func (c *client) IsConnectionOpen() bool { - c.RLock() - defer c.RUnlock() - status := atomic.LoadUint32(&c.status) - switch { - case status == connected: - return true - default: - return false - } -} - -func (c *client) connectionStatus() uint32 { - c.RLock() - defer c.RUnlock() - status := atomic.LoadUint32(&c.status) - return status -} - -func (c *client) setConnected(status uint32) { - c.Lock() - defer c.Unlock() - atomic.StoreUint32(&c.status, status) + return c.status.ConnectionStatus() == connected } // ErrNotConnected is the error returned from function calls that are @@ -245,25 +227,31 @@ func (c *client) Connect() Token { t := newToken(packets.Connect).(*ConnectToken) DEBUG.Println(CLI, "Connect()") - if c.options.ConnectRetry && atomic.LoadUint32(&c.status) != disconnected { - // if in any state other than disconnected and ConnectRetry is - // enabled then the connection will come up automatically - // client can assume connection is up - WARN.Println(CLI, "Connect() called but not disconnected") - t.returnCode = packets.Accepted - t.flowComplete() + connectionUp, err := c.status.Connecting() + if err != nil { + if err == errAlreadyConnectedOrReconnecting && c.options.AutoReconnect { + // When reconnection is active we don't consider calls tro Connect to ba an error (mainly for compatability) + WARN.Println(CLI, "Connect() called but not disconnected") + t.returnCode = packets.Accepted + t.flowComplete() + return t + } + ERROR.Println(CLI, err) // CONNECT should never be called unless we are disconnected + t.setError(err) return t } c.persist.Open() if c.options.ConnectRetry { - c.reserveStoredPublishIDs() // Reserve IDs to allow publish before connect complete + c.reserveStoredPublishIDs() // Reserve IDs to allow publishing before connect complete } - c.setConnected(connecting) go func() { if len(c.options.Servers) == 0 { t.setError(fmt.Errorf("no servers defined to connect to")) + if err := connectionUp(false); err != nil { + ERROR.Println(CLI, err.Error()) + } return } @@ -274,29 +262,31 @@ func (c *client) Connect() Token { conn, rc, t.sessionPresent, err = c.attemptConnection() if err != nil { if c.options.ConnectRetry { - DEBUG.Println(CLI, "Connect failed, sleeping for", int(c.options.ConnectRetryInterval.Seconds()), "seconds and will then retry") + DEBUG.Println(CLI, "Connect failed, sleeping for", int(c.options.ConnectRetryInterval.Seconds()), "seconds and will then retry, error:", err.Error()) time.Sleep(c.options.ConnectRetryInterval) - if atomic.LoadUint32(&c.status) == connecting { + if c.status.ConnectionStatus() == connecting { // Possible connection aborted elsewhere goto RETRYCONN } } ERROR.Println(CLI, "Failed to connect to a broker") - c.setConnected(disconnected) c.persist.Close() t.returnCode = rc t.setError(err) + if err := connectionUp(false); err != nil { + ERROR.Println(CLI, err.Error()) + } return } - inboundFromStore := make(chan packets.ControlPacket) // there may be some inbound comms packets in the store that are awaiting processing - if c.startCommsWorkers(conn, inboundFromStore) { + inboundFromStore := make(chan packets.ControlPacket) // there may be some inbound comms packets in the store that are awaiting processing + if c.startCommsWorkers(conn, connectionUp, inboundFromStore) { // note that this takes care of updating the status (to connected or disconnected) // Take care of any messages in the store if !c.options.CleanSession { c.resume(c.options.ResumeSubs, inboundFromStore) } else { c.persist.Reset() } - } else { + } else { // Note: With the new status subsystem this should only happen if Disconnect called simultaneously with the above WARN.Println(CLI, "Connect() called but connection established in another goroutine") } @@ -308,7 +298,8 @@ func (c *client) Connect() Token { } // internal function used to reconnect the client when it loses its connection -func (c *client) reconnect() { +// The connection status MUST be reconnecting prior to calling this function (via call to status.connectionLost) +func (c *client) reconnect(connectionUp connCompletedFn) { DEBUG.Println(CLI, "enter reconnect") var ( sleep = 1 * time.Second @@ -333,23 +324,18 @@ func (c *client) reconnect() { if sleep > c.options.MaxReconnectInterval { sleep = c.options.MaxReconnectInterval } - // Disconnect may have been called - if atomic.LoadUint32(&c.status) == disconnected { - break - } - } - // Disconnect() must have been called while we were trying to reconnect. - if c.connectionStatus() == disconnected { - if conn != nil { - conn.Close() + if c.status.ConnectionStatus() != reconnecting { // Disconnect may have been called + if err := connectionUp(false); err != nil { // Should always return an error + ERROR.Println(CLI, err.Error()) + } + DEBUG.Println(CLI, "Client moved to disconnected state while reconnecting, abandoning reconnect") + return } - DEBUG.Println(CLI, "Client moved to disconnected state while reconnecting, abandoning reconnect") - return } - inboundFromStore := make(chan packets.ControlPacket) // there may be some inbound comms packets in the store that are awaiting processing - if c.startCommsWorkers(conn, inboundFromStore) { + inboundFromStore := make(chan packets.ControlPacket) // there may be some inbound comms packets in the store that are awaiting processing + if c.startCommsWorkers(conn, connectionUp, inboundFromStore) { // note that this takes care of updating the status (to connected or disconnected) c.resume(c.options.ResumeSubs, inboundFromStore) } close(inboundFromStore) @@ -379,8 +365,23 @@ func (c *client) attemptConnection() (net.Conn, byte, bool, error) { cm := newConnectMsgFromOptions(&c.options, broker) DEBUG.Println(CLI, "about to write new connect msg") CONN: + tlsCfg := c.options.TLSConfig + if c.options.OnConnectAttempt != nil { + DEBUG.Println(CLI, "using custom onConnectAttempt handler...") + tlsCfg = c.options.OnConnectAttempt(broker, c.options.TLSConfig) + } + connDeadline := time.Now().Add(c.options.ConnectTimeout) // Time by which connection must be established + dialer := c.options.Dialer + if dialer == nil { // + WARN.Println(CLI, "dialer was nil, using default") + dialer = &net.Dialer{Timeout: 30 * time.Second} + } // Start by opening the network connection (tcp, tls, ws) etc - conn, err = openConnection(broker, c.options.TLSConfig, c.options.ConnectTimeout, c.options.HTTPHeaders, c.options.WebsocketOptions) + if c.options.CustomOpenConnectionFn != nil { + conn, err = c.options.CustomOpenConnectionFn(broker, c.options) + } else { + conn, err = openConnection(broker, tlsCfg, c.options.ConnectTimeout, c.options.HTTPHeaders, c.options.WebsocketOptions, dialer) + } if err != nil { ERROR.Println(CLI, err.Error()) WARN.Println(CLI, "failed to connect to broker, trying next") @@ -389,16 +390,23 @@ func (c *client) attemptConnection() (net.Conn, byte, bool, error) { } DEBUG.Println(CLI, "socket connected to broker") - // Now we send the perform the MQTT connection handshake + // Now we perform the MQTT connection handshake ensuring that it does not exceed the timeout + if err := conn.SetDeadline(connDeadline); err != nil { + ERROR.Println(CLI, "set deadline for handshake ", err) + } + + // Now we perform the MQTT connection handshake rc, sessionPresent, err = connectMQTT(conn, cm, protocolVersion) if rc == packets.Accepted { + if err := conn.SetDeadline(time.Time{}); err != nil { + ERROR.Println(CLI, "reset deadline following handshake ", err) + } break // successfully connected } - // We may be have to attempt the connection with MQTT 3.1 - if conn != nil { - conn.Close() - } + // We may have to attempt the connection with MQTT 3.1 + _ = conn.Close() + if !c.options.protocolVersionExplicit && protocolVersion == 4 { // try falling back to 3.1? DEBUG.Println(CLI, "Trying reconnect using MQTT 3.1 protocol") protocolVersion = 3 @@ -426,37 +434,63 @@ func (c *client) attemptConnection() (net.Conn, byte, bool, error) { // Disconnect will end the connection with the server, but not before waiting // the specified number of milliseconds to wait for existing work to be // completed. +// WARNING: `Disconnect` may return before all activities (goroutines) have completed. This means that +// reusing the `client` may lead to panics. If you want to reconnect when the connection drops then use +// `SetAutoReconnect` and/or `SetConnectRetry`options instead of implementing this yourself. func (c *client) Disconnect(quiesce uint) { - status := atomic.LoadUint32(&c.status) - if status == connected { + done := make(chan struct{}) // Simplest way to ensure quiesce is always honoured + go func() { + defer close(done) + disDone, err := c.status.Disconnecting() + if err != nil { + // Status has been set to disconnecting, but we had to wait for something else to complete + WARN.Println(CLI, err.Error()) + return + } + defer func() { + c.disconnect() // Force disconnection + disDone() // Update status + }() DEBUG.Println(CLI, "disconnecting") - c.setConnected(disconnected) - dm := packets.NewControlPacket(packets.Disconnect).(*packets.DisconnectPacket) dt := newToken(packets.Disconnect) - c.oboundP <- &PacketAndToken{p: dm, t: dt} + select { + case c.oboundP <- &PacketAndToken{p: dm, t: dt}: + // wait for work to finish, or quiesce time consumed + DEBUG.Println(CLI, "calling WaitTimeout") + dt.WaitTimeout(time.Duration(quiesce) * time.Millisecond) + DEBUG.Println(CLI, "WaitTimeout done") + // Below code causes a potential data race. Following status refactor it should no longer be required + // but leaving in as need to check code further. + // case <-c.commsStopped: + // WARN.Println("Disconnect packet could not be sent because comms stopped") + case <-time.After(time.Duration(quiesce) * time.Millisecond): + WARN.Println("Disconnect packet not sent due to timeout") + } + }() - // wait for work to finish, or quiesce time consumed - DEBUG.Println(CLI, "calling WaitTimeout") - dt.WaitTimeout(time.Duration(quiesce) * time.Millisecond) - DEBUG.Println(CLI, "WaitTimeout done") - } else { - WARN.Println(CLI, "Disconnect() called but not connected (disconnected/reconnecting)") - c.setConnected(disconnected) + // Return when done or after timeout expires (would like to change but this maintains compatibility) + delay := time.NewTimer(time.Duration(quiesce) * time.Millisecond) + select { + case <-done: + if !delay.Stop() { + <-delay.C + } + case <-delay.C: } - - c.disconnect() } // forceDisconnect will end the connection with the mqtt broker immediately (used for tests only) func (c *client) forceDisconnect() { - if !c.IsConnected() { - WARN.Println(CLI, "already disconnected") + disDone, err := c.status.Disconnecting() + if err != nil { + // Possible that we are not actually connected + WARN.Println(CLI, err.Error()) return } - c.setConnected(disconnected) DEBUG.Println(CLI, "forcefully disconnecting") c.disconnect() + disDone() } // disconnect cleans up after a final disconnection (user requested so no auto reconnection) @@ -473,47 +507,79 @@ func (c *client) disconnect() { // internalConnLost cleanup when connection is lost or an error occurs // Note: This function will not block -func (c *client) internalConnLost(err error) { +func (c *client) internalConnLost(whyConnLost error) { // It is possible that internalConnLost will be called multiple times simultaneously // (including after sending a DisconnectPacket) as such we only do cleanup etc if the // routines were actually running and are not being disconnected at users request DEBUG.Println(CLI, "internalConnLost called") + disDone, err := c.status.ConnectionLost(c.options.AutoReconnect && c.status.ConnectionStatus() > connecting) + if err != nil { + if err == errConnLossWhileDisconnecting || err == errAlreadyHandlingConnectionLoss { + return // Loss of connection is expected or already being handled + } + ERROR.Println(CLI, fmt.Sprintf("internalConnLost unexpected status: %s", err.Error())) + return + } + + // c.stopCommsWorker returns a channel that is closed when the operation completes. This was required prior + // to the implementation of proper status management but has been left in place, for now, to minimise change stopDone := c.stopCommsWorkers() - if stopDone != nil { // stopDone will be nil if workers already in the process of stopping or stopped - go func() { - DEBUG.Println(CLI, "internalConnLost waiting on workers") - <-stopDone - DEBUG.Println(CLI, "internalConnLost workers stopped") - // It is possible that Disconnect was called which led to this error so reconnection depends upon status - reconnect := c.options.AutoReconnect && c.connectionStatus() > connecting - - if c.options.CleanSession && !reconnect { - c.messageIds.cleanUp() - } - if reconnect { - c.setConnected(reconnecting) - go c.reconnect() - } else { - c.setConnected(disconnected) - } - if c.options.OnConnectionLost != nil { - go c.options.OnConnectionLost(c, err) - } - DEBUG.Println(CLI, "internalConnLost complete") - }() + // stopDone was required in previous versions because there was no connectionLost status (and there were + // issues with status handling). This code has been left in place for the time being just in case the new + // status handling contains bugs (refactoring required at some point). + if stopDone == nil { // stopDone will be nil if workers already in the process of stopping or stopped + ERROR.Println(CLI, "internalConnLost stopDone unexpectedly nil - BUG BUG") + // Cannot really do anything other than leave things disconnected + if _, err = disDone(false); err != nil { // Safest option - cannot leave status as connectionLost + ERROR.Println(CLI, fmt.Sprintf("internalConnLost failed to set status to disconnected (stopDone): %s", err.Error())) + } + return } + + // It may take a while for the disconnection to complete whatever called us needs to exit cleanly so finnish in goRoutine + go func() { + DEBUG.Println(CLI, "internalConnLost waiting on workers") + <-stopDone + DEBUG.Println(CLI, "internalConnLost workers stopped") + + reConnDone, err := disDone(true) + if err != nil { + ERROR.Println(CLI, "failure whilst reporting completion of disconnect", err) + } else if reConnDone == nil { // Should never happen + ERROR.Println(CLI, "BUG BUG BUG reconnection function is nil", err) + } + + reconnect := err == nil && reConnDone != nil + + if c.options.CleanSession && !reconnect { + c.messageIds.cleanUp() // completes PUB/SUB/UNSUB tokens + } else if !c.options.ResumeSubs { + c.messageIds.cleanUpSubscribe() // completes SUB/UNSUB tokens + } + if reconnect { + go c.reconnect(reConnDone) // Will set connection status to reconnecting + } + if c.options.OnConnectionLost != nil { + go c.options.OnConnectionLost(c, whyConnLost) + } + DEBUG.Println(CLI, "internalConnLost complete") + }() } -// startCommsWorkers is called when the connection is up. It starts off all of the routines needed to process incoming and -// outgoing messages. -// Returns true if the comms workers were started (i.e. they were not already running) -func (c *client) startCommsWorkers(conn net.Conn, inboundFromStore <-chan packets.ControlPacket) bool { +// startCommsWorkers is called when the connection is up. +// It starts off the routines needed to process incoming and outgoing messages. +// Returns true if the comms workers were started (i.e. successful connection) +// connectionUp(true) will be called once everything is up; connectionUp(false) will be called on failure +func (c *client) startCommsWorkers(conn net.Conn, connectionUp connCompletedFn, inboundFromStore <-chan packets.ControlPacket) bool { DEBUG.Println(CLI, "startCommsWorkers called") c.connMu.Lock() defer c.connMu.Unlock() - if c.conn != nil { - WARN.Println(CLI, "startCommsWorkers called when commsworkers already running") - conn.Close() // No use for the new network connection + if c.conn != nil { // Should never happen due to new status handling; leaving in for safety for the time being + WARN.Println(CLI, "startCommsWorkers called when commsworkers already running BUG BUG") + _ = conn.Close() // No use for the new network connection + if err := connectionUp(false); err != nil { + ERROR.Println(CLI, err.Error()) + } return false } c.conn = conn // Store the connection @@ -533,7 +599,17 @@ func (c *client) startCommsWorkers(conn net.Conn, inboundFromStore <-chan packet c.workers.Add(1) // Done will be called when ackOut is closed ackOut := c.msgRouter.matchAndDispatch(incomingPubChan, c.options.Order, c) - c.setConnected(connected) + // The connection is now ready for use (we spin up a few go routines below). It is possible that + // Disconnect has been called in the interim... + if err := connectionUp(true); err != nil { + DEBUG.Println(CLI, err) + close(c.stop) // Tidy up anything we have already started + close(incomingPubChan) + c.workers.Wait() + c.conn.Close() + c.conn = nil + return false + } DEBUG.Println(CLI, "client is connected/reconnected") if c.options.OnConnect != nil { go c.options.OnConnect(c) @@ -592,7 +668,22 @@ func (c *client) startCommsWorkers(conn net.Conn, inboundFromStore <-chan packet commsIncomingPub = nil continue } - incomingPubChan <- pub + // Care is needed here because an error elsewhere could trigger a deadlock + sendPubLoop: + for { + select { + case incomingPubChan <- pub: + break sendPubLoop + case err, ok := <-commsErrors: + if !ok { // commsErrors has been closed so we can ignore it + commsErrors = nil + continue + } + ERROR.Println(CLI, "Connect comms goroutine - error triggered during send Pub", err) + c.internalConnLost(err) // no harm in calling this if the connection is already down (or shutdown is in progress) + continue + } + } case err, ok := <-commsErrors: if !ok { commsErrors = nil @@ -611,8 +702,9 @@ func (c *client) startCommsWorkers(conn net.Conn, inboundFromStore <-chan packet } // stopWorkersAndComms - Cleanly shuts down worker go routines (including the comms routines) and waits until everything has stopped -// Returns nil it workers did not need to be stopped; otherwise returns a channel which will be closed when the stop is complete +// Returns nil if workers did not need to be stopped; otherwise returns a channel which will be closed when the stop is complete // Note: This may block so run as a go routine if calling from any of the comms routines +// Note2: It should be possible to simplify this now that the new status management code is in place. func (c *client) stopCommsWorkers() chan struct{} { DEBUG.Println(CLI, "stopCommsWorkers called") // It is possible that this function will be called multiple times simultaneously due to the way things get shutdown @@ -661,7 +753,8 @@ func (c *client) Publish(topic string, qos byte, retained bool, payload interfac case !c.IsConnected(): token.setError(ErrNotConnected) return token - case c.connectionStatus() == reconnecting && qos == 0: + case c.status.ConnectionStatus() == reconnecting && qos == 0: + // message written to store and will be sent when connection comes up token.flowComplete() return token } @@ -691,11 +784,13 @@ func (c *client) Publish(topic string, qos byte, retained bool, payload interfac token.messageID = mID } persistOutbound(c.persist, pub) - switch c.connectionStatus() { + switch c.status.ConnectionStatus() { case connecting: DEBUG.Println(CLI, "storing publish message (connecting), topic:", topic) case reconnecting: DEBUG.Println(CLI, "storing publish message (reconnecting), topic:", topic) + case disconnecting: + DEBUG.Println(CLI, "storing publish message (disconnecting), topic:", topic) default: DEBUG.Println(CLI, "sending publish message, topic:", topic) publishWaitTimeout := c.options.WriteTimeout @@ -728,11 +823,11 @@ func (c *client) Subscribe(topic string, qos byte, callback MessageHandler) Toke if !c.IsConnectionOpen() { switch { case !c.options.ResumeSubs: - // if not connected and resumesubs not set this sub will be thrown away + // if not connected and resumeSubs not set this sub will be thrown away token.setError(fmt.Errorf("not currently connected and ResumeSubs not set")) return token - case c.options.CleanSession && c.connectionStatus() == reconnecting: - // if reconnecting and cleansession is true this sub will be thrown away + case c.options.CleanSession && c.status.ConnectionStatus() == reconnecting: + // if reconnecting and cleanSession is true this sub will be thrown away token.setError(fmt.Errorf("reconnecting state and cleansession is true")) return token } @@ -770,12 +865,16 @@ func (c *client) Subscribe(topic string, qos byte, callback MessageHandler) Toke } DEBUG.Println(CLI, sub.String()) - persistOutbound(c.persist, sub) - switch c.connectionStatus() { + if c.options.ResumeSubs { // Only persist if we need this to resume subs after a disconnection + persistOutbound(c.persist, sub) + } + switch c.status.ConnectionStatus() { case connecting: DEBUG.Println(CLI, "storing subscribe message (connecting), topic:", topic) case reconnecting: DEBUG.Println(CLI, "storing subscribe message (reconnecting), topic:", topic) + case disconnecting: + DEBUG.Println(CLI, "storing subscribe message (disconnecting), topic:", topic) default: DEBUG.Println(CLI, "sending subscribe message, topic:", topic) subscribeWaitTimeout := c.options.WriteTimeout @@ -813,8 +912,8 @@ func (c *client) SubscribeMultiple(filters map[string]byte, callback MessageHand // if not connected and resumesubs not set this sub will be thrown away token.setError(fmt.Errorf("not currently connected and ResumeSubs not set")) return token - case c.options.CleanSession && c.connectionStatus() == reconnecting: - // if reconnecting and cleansession is true this sub will be thrown away + case c.options.CleanSession && c.status.ConnectionStatus() == reconnecting: + // if reconnecting and cleanSession is true this sub will be thrown away token.setError(fmt.Errorf("reconnecting state and cleansession is true")) return token } @@ -842,12 +941,16 @@ func (c *client) SubscribeMultiple(filters map[string]byte, callback MessageHand sub.MessageID = mID token.messageID = mID } - persistOutbound(c.persist, sub) - switch c.connectionStatus() { + if c.options.ResumeSubs { // Only persist if we need this to resume subs after a disconnection + persistOutbound(c.persist, sub) + } + switch c.status.ConnectionStatus() { case connecting: DEBUG.Println(CLI, "storing subscribe message (connecting), topics:", sub.Topics) case reconnecting: DEBUG.Println(CLI, "storing subscribe message (reconnecting), topics:", sub.Topics) + case disconnecting: + DEBUG.Println(CLI, "storing subscribe message (disconnecting), topics:", sub.Topics) default: DEBUG.Println(CLI, "sending subscribe message, topics:", sub.Topics) subscribeWaitTimeout := c.options.WriteTimeout @@ -889,10 +992,42 @@ func (c *client) reserveStoredPublishIDs() { // Load all stored messages and resend them // Call this to ensure QOS > 1,2 even after an application crash // Note: This function will exit if c.stop is closed (this allows the shutdown to proceed avoiding a potential deadlock) -// +// other than that it does not return until all messages in the store have been sent (connect() does not complete its +// token before this completes) func (c *client) resume(subscription bool, ibound chan packets.ControlPacket) { DEBUG.Println(STR, "enter Resume") + // Prior to sending a message getSemaphore will be called and once sent releaseSemaphore will be called + // with the token (so semaphore can be released when ACK received if applicable). + // Using a weighted semaphore rather than channels because this retains ordering + getSemaphore := func() {} // Default = do nothing + releaseSemaphore := func(_ *PublishToken) {} // Default = do nothing + var sem *semaphore.Weighted + if c.options.MaxResumePubInFlight > 0 { + sem = semaphore.NewWeighted(int64(c.options.MaxResumePubInFlight)) + ctx, cancel := context.WithCancel(context.Background()) // Context needed for semaphore + defer cancel() // ensure context gets cancelled + + go func() { + select { + case <-c.stop: // Request to stop (due to comm error etc) + cancel() + case <-ctx.Done(): // resume completed normally + } + }() + + getSemaphore = func() { sem.Acquire(ctx, 1) } + releaseSemaphore = func(token *PublishToken) { // Note: If token never completes then resume() may stall (will still exit on ctx.Done()) + go func() { + select { + case <-token.Done(): + case <-ctx.Done(): + } + sem.Release(1) + }() + } + } + storedKeys := c.persist.All() for _, key := range storedKeys { packet := c.persist.Get(key) @@ -956,14 +1091,16 @@ func (c *client) resume(subscription bool, ibound chan packets.ControlPacket) { c.claimID(token, details.MessageID) DEBUG.Println(STR, fmt.Sprintf("loaded pending publish (%d)", details.MessageID)) DEBUG.Println(STR, details) + getSemaphore() select { case c.obound <- &PacketAndToken{p: p, t: token}: case <-c.stop: DEBUG.Println(STR, "resume exiting due to stop") return } + releaseSemaphore(token) // If limiting simultaneous messages then we need to know when message is acknowledged default: - ERROR.Println(STR, "invalid message type in store (discarded)") + ERROR.Println(STR, fmt.Sprintf("invalid message type (inbound - %T) in store (discarded)", packet)) c.persist.Del(key) } } else { @@ -977,7 +1114,7 @@ func (c *client) resume(subscription bool, ibound chan packets.ControlPacket) { return } default: - ERROR.Println(STR, "invalid message type in store (discarded)") + ERROR.Println(STR, fmt.Sprintf("invalid message type (%T) in store (discarded)", packet)) c.persist.Del(key) } } @@ -998,11 +1135,11 @@ func (c *client) Unsubscribe(topics ...string) Token { if !c.IsConnectionOpen() { switch { case !c.options.ResumeSubs: - // if not connected and resumesubs not set this unsub will be thrown away + // if not connected and resumeSubs not set this unsub will be thrown away token.setError(fmt.Errorf("not currently connected and ResumeSubs not set")) return token - case c.options.CleanSession && c.connectionStatus() == reconnecting: - // if reconnecting and cleansession is true this unsub will be thrown away + case c.options.CleanSession && c.status.ConnectionStatus() == reconnecting: + // if reconnecting and cleanSession is true this unsub will be thrown away token.setError(fmt.Errorf("reconnecting state and cleansession is true")) return token } @@ -1021,13 +1158,17 @@ func (c *client) Unsubscribe(topics ...string) Token { token.messageID = mID } - persistOutbound(c.persist, unsub) + if c.options.ResumeSubs { // Only persist if we need this to resume subs after a disconnection + persistOutbound(c.persist, unsub) + } - switch c.connectionStatus() { + switch c.status.ConnectionStatus() { case connecting: DEBUG.Println(CLI, "storing unsubscribe message (connecting), topics:", topics) case reconnecting: DEBUG.Println(CLI, "storing unsubscribe message (reconnecting), topics:", topics) + case disconnecting: + DEBUG.Println(CLI, "storing unsubscribe message (reconnecting), topics:", topics) default: DEBUG.Println(CLI, "sending unsubscribe message, topics:", topics) subscribeWaitTimeout := c.options.WriteTimeout diff --git a/vendor/github.com/eclipse/paho.mqtt.golang/components.go b/vendor/github.com/eclipse/paho.mqtt.golang/components.go index 23ade20..524db03 100644 --- a/vendor/github.com/eclipse/paho.mqtt.golang/components.go +++ b/vendor/github.com/eclipse/paho.mqtt.golang/components.go @@ -1,10 +1,14 @@ /* - * Copyright (c) 2013 IBM Corp. + * Copyright (c) 2021 IBM Corp and others. * * All rights reserved. This program and the accompanying materials - * are made available under the terms of the Eclipse Public License v1.0 - * which accompanies this distribution, and is available at - * http://www.eclipse.org/legal/epl-v10.html + * are made available under the terms of the Eclipse Public License v2.0 + * and Eclipse Distribution License v1.0 which accompany this distribution. + * + * The Eclipse Public License is available at + * https://www.eclipse.org/legal/epl-2.0/ + * and the Eclipse Distribution License is available at + * http://www.eclipse.org/org/documents/edl-v10.php. * * Contributors: * Seth Hoenig @@ -168,7 +172,7 @@ func (store *FileStore) all() []string { for _, f := range files { DEBUG.Println(STR, "file in All():", f.Name()) name := f.Name() - if name[len(name)-4:] != msgExt { + if len(name) < len(msgExt) || name[len(name)-len(msgExt):] != msgExt { DEBUG.Println(STR, "skipping file, doesn't have right extension: ", name) continue } diff --git a/vendor/github.com/eclipse/paho.mqtt.golang/go.mod b/vendor/github.com/eclipse/paho.mqtt.golang/go.mod index fcbd787..beb1991 100644 --- a/vendor/github.com/eclipse/paho.mqtt.golang/go.mod +++ b/vendor/github.com/eclipse/paho.mqtt.golang/go.mod @@ -5,4 +5,5 @@ go 1.14 require ( github.com/gorilla/websocket v1.4.2 golang.org/x/net v0.0.0-20200425230154-ff2c4b7c35a0 + golang.org/x/sync v0.0.0-20210220032951-036812b2e83c ) diff --git a/vendor/github.com/eclipse/paho.mqtt.golang/go.sum b/vendor/github.com/eclipse/paho.mqtt.golang/go.sum index 7095afb..4ce755b 100644 --- a/vendor/github.com/eclipse/paho.mqtt.golang/go.sum +++ b/vendor/github.com/eclipse/paho.mqtt.golang/go.sum @@ -3,6 +3,8 @@ github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/ad golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/net v0.0.0-20200425230154-ff2c4b7c35a0 h1:Jcxah/M+oLZ/R4/z5RzfPzGbPXnVDPkEDtf2JnuxN+U= golang.org/x/net v0.0.0-20200425230154-ff2c4b7c35a0/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= +golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ= +golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= diff --git a/vendor/github.com/eclipse/paho.mqtt.golang/memstore.go b/vendor/github.com/eclipse/paho.mqtt.golang/memstore.go index d245a5f..e9f8088 100644 --- a/vendor/github.com/eclipse/paho.mqtt.golang/memstore.go +++ b/vendor/github.com/eclipse/paho.mqtt.golang/memstore.go @@ -1,10 +1,14 @@ /* - * Copyright (c) 2013 IBM Corp. + * Copyright (c) 2021 IBM Corp and others. * * All rights reserved. This program and the accompanying materials - * are made available under the terms of the Eclipse Public License v1.0 - * which accompanies this distribution, and is available at - * http://www.eclipse.org/legal/epl-v10.html + * are made available under the terms of the Eclipse Public License v2.0 + * and Eclipse Distribution License v1.0 which accompany this distribution. + * + * The Eclipse Public License is available at + * https://www.eclipse.org/legal/epl-2.0/ + * and the Eclipse Distribution License is available at + * http://www.eclipse.org/org/documents/edl-v10.php. * * Contributors: * Seth Hoenig diff --git a/vendor/github.com/eclipse/paho.mqtt.golang/memstore_ordered.go b/vendor/github.com/eclipse/paho.mqtt.golang/memstore_ordered.go new file mode 100644 index 0000000..498b82b --- /dev/null +++ b/vendor/github.com/eclipse/paho.mqtt.golang/memstore_ordered.go @@ -0,0 +1,166 @@ +/* + * Copyright (c) 2021 IBM Corp and others. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v2.0 + * and Eclipse Distribution License v1.0 which accompany this distribution. + * + * The Eclipse Public License is available at + * https://www.eclipse.org/legal/epl-2.0/ + * and the Eclipse Distribution License is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * Contributors: + * Seth Hoenig + * Allan Stockdill-Mander + * Mike Robertson + * Matt Brittan + */ + +package mqtt + +import ( + "sort" + "sync" + "time" + + "github.com/eclipse/paho.mqtt.golang/packets" +) + +// OrderedMemoryStore uses a map internally so the order in which All() returns packets is +// undefined. OrderedMemoryStore resolves this by storing the time the message is added +// and sorting based upon this. + +// storedMessage encapsulates a message and the time it was initially stored +type storedMessage struct { + ts time.Time + msg packets.ControlPacket +} + +// OrderedMemoryStore implements the store interface to provide a "persistence" +// mechanism wholly stored in memory. This is only useful for +// as long as the client instance exists. +type OrderedMemoryStore struct { + sync.RWMutex + messages map[string]storedMessage + opened bool +} + +// NewOrderedMemoryStore returns a pointer to a new instance of +// OrderedMemoryStore, the instance is not initialized and ready to +// use until Open() has been called on it. +func NewOrderedMemoryStore() *OrderedMemoryStore { + store := &OrderedMemoryStore{ + messages: make(map[string]storedMessage), + opened: false, + } + return store +} + +// Open initializes a OrderedMemoryStore instance. +func (store *OrderedMemoryStore) Open() { + store.Lock() + defer store.Unlock() + store.opened = true + DEBUG.Println(STR, "OrderedMemoryStore initialized") +} + +// Put takes a key and a pointer to a Message and stores the +// message. +func (store *OrderedMemoryStore) Put(key string, message packets.ControlPacket) { + store.Lock() + defer store.Unlock() + if !store.opened { + ERROR.Println(STR, "Trying to use memory store, but not open") + return + } + store.messages[key] = storedMessage{ts: time.Now(), msg: message} +} + +// Get takes a key and looks in the store for a matching Message +// returning either the Message pointer or nil. +func (store *OrderedMemoryStore) Get(key string) packets.ControlPacket { + store.RLock() + defer store.RUnlock() + if !store.opened { + ERROR.Println(STR, "Trying to use memory store, but not open") + return nil + } + mid := mIDFromKey(key) + m, ok := store.messages[key] + if !ok || m.msg == nil { + CRITICAL.Println(STR, "OrderedMemoryStore get: message", mid, "not found") + } else { + DEBUG.Println(STR, "OrderedMemoryStore get: message", mid, "found") + } + return m.msg +} + +// All returns a slice of strings containing all the keys currently +// in the OrderedMemoryStore. +func (store *OrderedMemoryStore) All() []string { + store.RLock() + defer store.RUnlock() + if !store.opened { + ERROR.Println(STR, "Trying to use memory store, but not open") + return nil + } + type tsAndKey struct { + ts time.Time + key string + } + + tsKeys := make([]tsAndKey, 0, len(store.messages)) + for k, v := range store.messages { + tsKeys = append(tsKeys, tsAndKey{ts: v.ts, key: k}) + } + sort.Slice(tsKeys, func(a int, b int) bool { return tsKeys[a].ts.Before(tsKeys[b].ts) }) + + keys := make([]string, len(tsKeys)) + for i := range tsKeys { + keys[i] = tsKeys[i].key + } + return keys +} + +// Del takes a key, searches the OrderedMemoryStore and if the key is found +// deletes the Message pointer associated with it. +func (store *OrderedMemoryStore) Del(key string) { + store.Lock() + defer store.Unlock() + if !store.opened { + ERROR.Println(STR, "Trying to use memory store, but not open") + return + } + mid := mIDFromKey(key) + _, ok := store.messages[key] + if !ok { + WARN.Println(STR, "OrderedMemoryStore del: message", mid, "not found") + } else { + delete(store.messages, key) + DEBUG.Println(STR, "OrderedMemoryStore del: message", mid, "was deleted") + } +} + +// Close will disallow modifications to the state of the store. +func (store *OrderedMemoryStore) Close() { + store.Lock() + defer store.Unlock() + if !store.opened { + ERROR.Println(STR, "Trying to close memory store, but not open") + return + } + store.opened = false + DEBUG.Println(STR, "OrderedMemoryStore closed") +} + +// Reset eliminates all persisted message data in the store. +func (store *OrderedMemoryStore) Reset() { + store.Lock() + defer store.Unlock() + if !store.opened { + ERROR.Println(STR, "Trying to reset memory store, but not open") + } + store.messages = make(map[string]storedMessage) + WARN.Println(STR, "OrderedMemoryStore wiped") +} diff --git a/vendor/github.com/eclipse/paho.mqtt.golang/message.go b/vendor/github.com/eclipse/paho.mqtt.golang/message.go index 0f8ca60..35b463f 100644 --- a/vendor/github.com/eclipse/paho.mqtt.golang/message.go +++ b/vendor/github.com/eclipse/paho.mqtt.golang/message.go @@ -1,10 +1,14 @@ /* - * Copyright (c) 2013 IBM Corp. + * Copyright (c) 2021 IBM Corp and others. * * All rights reserved. This program and the accompanying materials - * are made available under the terms of the Eclipse Public License v1.0 - * which accompanies this distribution, and is available at - * http://www.eclipse.org/legal/epl-v10.html + * are made available under the terms of the Eclipse Public License v2.0 + * and Eclipse Distribution License v1.0 which accompany this distribution. + * + * The Eclipse Public License is available at + * https://www.eclipse.org/legal/epl-2.0/ + * and the Eclipse Distribution License is available at + * http://www.eclipse.org/org/documents/edl-v10.php. * * Contributors: * Seth Hoenig diff --git a/vendor/github.com/eclipse/paho.mqtt.golang/messageids.go b/vendor/github.com/eclipse/paho.mqtt.golang/messageids.go index 26c2c0d..04c94bd 100644 --- a/vendor/github.com/eclipse/paho.mqtt.golang/messageids.go +++ b/vendor/github.com/eclipse/paho.mqtt.golang/messageids.go @@ -1,15 +1,20 @@ /* - * Copyright (c) 2013 IBM Corp. + * Copyright (c) 2013 IBM Corp and others. * * All rights reserved. This program and the accompanying materials - * are made available under the terms of the Eclipse Public License v1.0 - * which accompanies this distribution, and is available at - * http://www.eclipse.org/legal/epl-v10.html + * are made available under the terms of the Eclipse Public License v2.0 + * and Eclipse Distribution License v1.0 which accompany this distribution. + * + * The Eclipse Public License is available at + * https://www.eclipse.org/legal/epl-2.0/ + * and the Eclipse Distribution License is available at + * http://www.eclipse.org/org/documents/edl-v10.php. * * Contributors: * Seth Hoenig * Allan Stockdill-Mander * Mike Robertson + * Matt Brittan */ package mqtt @@ -26,7 +31,7 @@ import ( type MId uint16 type messageIds struct { - sync.RWMutex + mu sync.RWMutex // Named to prevent Mu from being accessible directly via client index map[uint16]tokenCompletor lastIssuedID uint16 // The most recently issued ID. Used so we cycle through ids rather than immediately reusing them (can make debugging easier) @@ -37,8 +42,9 @@ const ( midMax uint16 = 65535 ) +// cleanup clears the message ID map; completes all token types and sets error on PUB, SUB and UNSUB tokens. func (mids *messageIds) cleanUp() { - mids.Lock() + mids.mu.Lock() for _, token := range mids.index { switch token.(type) { case *PublishToken: @@ -47,25 +53,43 @@ func (mids *messageIds) cleanUp() { token.setError(fmt.Errorf("connection lost before Subscribe completed")) case *UnsubscribeToken: token.setError(fmt.Errorf("connection lost before Unsubscribe completed")) - case nil: + case nil: // should not be any nil entries continue } token.flowComplete() } mids.index = make(map[uint16]tokenCompletor) - mids.Unlock() + mids.mu.Unlock() DEBUG.Println(MID, "cleaned up") } +// cleanUpSubscribe removes all SUBSCRIBE and UNSUBSCRIBE tokens (setting error) +// This may be called when the connection is lost, and we will not be resending SUB/UNSUB packets +func (mids *messageIds) cleanUpSubscribe() { + mids.mu.Lock() + for mid, token := range mids.index { + switch token.(type) { + case *SubscribeToken: + token.setError(fmt.Errorf("connection lost before Subscribe completed")) + delete(mids.index, mid) + case *UnsubscribeToken: + token.setError(fmt.Errorf("connection lost before Unsubscribe completed")) + delete(mids.index, mid) + } + } + mids.mu.Unlock() + DEBUG.Println(MID, "cleaned up subs") +} + func (mids *messageIds) freeID(id uint16) { - mids.Lock() + mids.mu.Lock() delete(mids.index, id) - mids.Unlock() + mids.mu.Unlock() } func (mids *messageIds) claimID(token tokenCompletor, id uint16) { - mids.Lock() - defer mids.Unlock() + mids.mu.Lock() + defer mids.mu.Unlock() if _, ok := mids.index[id]; !ok { mids.index[id] = token } else { @@ -81,8 +105,8 @@ func (mids *messageIds) claimID(token tokenCompletor, id uint16) { // getID will return an available id or 0 if none available // The id will generally be the previous id + 1 (because this makes tracing messages a bit simpler) func (mids *messageIds) getID(t tokenCompletor) uint16 { - mids.Lock() - defer mids.Unlock() + mids.mu.Lock() + defer mids.mu.Unlock() i := mids.lastIssuedID // note: the only situation where lastIssuedID is 0 the map will be empty looped := false // uint16 will loop from 65535->0 for { @@ -103,8 +127,8 @@ func (mids *messageIds) getID(t tokenCompletor) uint16 { } func (mids *messageIds) getToken(id uint16) tokenCompletor { - mids.RLock() - defer mids.RUnlock() + mids.mu.RLock() + defer mids.mu.RUnlock() if token, ok := mids.index[id]; ok { return token } diff --git a/vendor/github.com/eclipse/paho.mqtt.golang/net.go b/vendor/github.com/eclipse/paho.mqtt.golang/net.go index 485bb2a..10cc7da 100644 --- a/vendor/github.com/eclipse/paho.mqtt.golang/net.go +++ b/vendor/github.com/eclipse/paho.mqtt.golang/net.go @@ -1,15 +1,20 @@ /* - * Copyright (c) 2013 IBM Corp. + * Copyright (c) 2021 IBM Corp and others. * * All rights reserved. This program and the accompanying materials - * are made available under the terms of the Eclipse Public License v1.0 - * which accompanies this distribution, and is available at - * http://www.eclipse.org/legal/epl-v10.html + * are made available under the terms of the Eclipse Public License v2.0 + * and Eclipse Distribution License v1.0 which accompany this distribution. + * + * The Eclipse Public License is available at + * https://www.eclipse.org/legal/epl-2.0/ + * and the Eclipse Distribution License is available at + * http://www.eclipse.org/org/documents/edl-v10.php. * * Contributors: * Seth Hoenig * Allan Stockdill-Mander * Mike Robertson + * Matt Brittan */ package mqtt @@ -145,7 +150,7 @@ type incomingComms struct { // startIncomingComms initiates incoming communications; this includes starting a goroutine to process incoming // messages. -// Accepts a channel of inbound messages from the store (persisted messages); note this must be closed as soon as the +// Accepts a channel of inbound messages from the store (persisted messages); note this must be closed as soon as // everything in the store has been sent. // Returns a channel that will be passed any received packets; this will be closed on a network error (and inboundFromStore closed) func startIncomingComms(conn io.Reader, @@ -327,7 +332,7 @@ func startOutgoingComms(conn net.Conn, DEBUG.Println(NET, "outbound wrote disconnect, closing connection") // As per the MQTT spec "After sending a DISCONNECT Packet the Client MUST close the Network Connection" // Closing the connection will cause the goroutines to end in sequence (starting with incoming comms) - conn.Close() + _ = conn.Close() } case msg, ok := <-oboundFromIncoming: // message triggered by an inbound message (PubrecPacket or PubrelPacket) if !ok { @@ -365,9 +370,10 @@ type commsFns interface { // startComms initiates goroutines that handles communications over the network connection // Messages will be stored (via commsFns) and deleted from the store as necessary // It returns two channels: -// packets.PublishPacket - Will receive publish packets received over the network. -// Closed when incoming comms routines exit (on shutdown or if network link closed) -// error - Any errors will be sent on this channel. The channel is closed when all comms routines have shut down +// +// packets.PublishPacket - Will receive publish packets received over the network. +// Closed when incoming comms routines exit (on shutdown or if network link closed) +// error - Any errors will be sent on this channel. The channel is closed when all comms routines have shut down // // Note: The comms routines monitoring oboundp and obound will not shutdown until those channels are both closed. Any messages received between the // connection being closed and those channels being closed will generate errors (and nothing will be sent). That way the chance of a deadlock is diff --git a/vendor/github.com/eclipse/paho.mqtt.golang/netconn.go b/vendor/github.com/eclipse/paho.mqtt.golang/netconn.go index 9f9f084..7e3899e 100644 --- a/vendor/github.com/eclipse/paho.mqtt.golang/netconn.go +++ b/vendor/github.com/eclipse/paho.mqtt.golang/netconn.go @@ -1,15 +1,20 @@ /* - * Copyright (c) 2013 IBM Corp. + * Copyright (c) 2021 IBM Corp and others. * * All rights reserved. This program and the accompanying materials - * are made available under the terms of the Eclipse Public License v1.0 - * which accompanies this distribution, and is available at - * http://www.eclipse.org/legal/epl-v10.html + * are made available under the terms of the Eclipse Public License v2.0 + * and Eclipse Distribution License v1.0 which accompany this distribution. + * + * The Eclipse Public License is available at + * https://www.eclipse.org/legal/epl-2.0/ + * and the Eclipse Distribution License is available at + * http://www.eclipse.org/org/documents/edl-v10.php. * * Contributors: * Seth Hoenig * Allan Stockdill-Mander * Mike Robertson + * MAtt Brittan */ package mqtt @@ -30,8 +35,9 @@ import ( // This just establishes the network connection; once established the type of connection should be irrelevant // -// openConnection opens a network connection using the protocol indicated in the URL. Does not carry out any MQTT specific handshakes -func openConnection(uri *url.URL, tlsc *tls.Config, timeout time.Duration, headers http.Header, websocketOptions *WebsocketOptions) (net.Conn, error) { +// openConnection opens a network connection using the protocol indicated in the URL. +// Does not carry out any MQTT specific handshakes. +func openConnection(uri *url.URL, tlsc *tls.Config, timeout time.Duration, headers http.Header, websocketOptions *WebsocketOptions, dialer *net.Dialer) (net.Conn, error) { switch uri.Scheme { case "ws": conn, err := NewWebsocket(uri.String(), nil, timeout, headers, websocketOptions) @@ -42,7 +48,7 @@ func openConnection(uri *url.URL, tlsc *tls.Config, timeout time.Duration, heade case "mqtt", "tcp": allProxy := os.Getenv("all_proxy") if len(allProxy) == 0 { - conn, err := net.DialTimeout("tcp", uri.Host, timeout) + conn, err := dialer.Dial("tcp", uri.Host) if err != nil { return nil, err } @@ -56,7 +62,17 @@ func openConnection(uri *url.URL, tlsc *tls.Config, timeout time.Duration, heade } return conn, nil case "unix": - conn, err := net.DialTimeout("unix", uri.Host, timeout) + var conn net.Conn + var err error + + // this check is preserved for compatibility with older versions + // which used uri.Host only (it works for local paths, e.g. unix://socket.sock in current dir) + if len(uri.Host) > 0 { + conn, err = dialer.Dial("unix", uri.Host) + } else { + conn, err = dialer.Dial("unix", uri.Path) + } + if err != nil { return nil, err } @@ -64,14 +80,13 @@ func openConnection(uri *url.URL, tlsc *tls.Config, timeout time.Duration, heade case "ssl", "tls", "mqtts", "mqtt+ssl", "tcps": allProxy := os.Getenv("all_proxy") if len(allProxy) == 0 { - conn, err := tls.DialWithDialer(&net.Dialer{Timeout: timeout}, "tcp", uri.Host, tlsc) + conn, err := tls.DialWithDialer(dialer, "tcp", uri.Host, tlsc) if err != nil { return nil, err } return conn, nil } proxyDialer := proxy.FromEnvironment() - conn, err := proxyDialer.Dial("tcp", uri.Host) if err != nil { return nil, err @@ -81,7 +96,7 @@ func openConnection(uri *url.URL, tlsc *tls.Config, timeout time.Duration, heade err = tlsConn.Handshake() if err != nil { - conn.Close() + _ = conn.Close() return nil, err } diff --git a/vendor/github.com/eclipse/paho.mqtt.golang/notice.html b/vendor/github.com/eclipse/paho.mqtt.golang/notice.html deleted file mode 100644 index f19c483..0000000 --- a/vendor/github.com/eclipse/paho.mqtt.golang/notice.html +++ /dev/null @@ -1,108 +0,0 @@ - - - - - -Eclipse Foundation Software User Agreement - - - -

Eclipse Foundation Software User Agreement


February 1, 2011

- -

Usage Of Content

- -


- -

Applicable Licenses

- -

Unless otherwise indicated, all Content made available by the Eclipse Foundation is provided to you under the terms and conditions of the Eclipse Public License Version 1.0 - ("EPL"). A copy of the EPL is provided with this Content and is also available at http://www.eclipse.org/legal/epl-v10.html. - For purposes of the EPL, "Program" will mean the Content.

- -

Content includes, but is not limited to, source code, object code, documentation and other files maintained in the Eclipse Foundation source code - repository ("Repository") in software modules ("Modules") and made available as downloadable archives ("Downloads").

- - - -

The terms and conditions governing Plug-ins and Fragments should be contained in files named "about.html" ("Abouts"). The terms and conditions governing Features and -Included Features should be contained in files named "license.html" ("Feature Licenses"). Abouts and Feature Licenses may be located in any directory of a Download or Module -including, but not limited to the following locations:

- - - -

Note: if a Feature made available by the Eclipse Foundation is installed using the Provisioning Technology (as defined below), you must agree to a license ("Feature Update License") during the -installation process. If the Feature contains Included Features, the Feature Update License should either provide you with the terms and conditions governing the Included Features or -inform you where you can locate them. Feature Update Licenses may be found in the "license" property of files named "feature.properties" found within a Feature. -Such Abouts, Feature Licenses, and Feature Update Licenses contain the terms and conditions (or references to such terms and conditions) that govern your use of the associated Content in -that directory.

- -


- - - -

IT IS YOUR OBLIGATION TO READ AND ACCEPT ALL SUCH TERMS AND CONDITIONS PRIOR TO USE OF THE CONTENT. If no About, Feature License, or Feature Update License is provided, please -contact the Eclipse Foundation to determine what terms and conditions govern that particular Content.

- - -

Use of Provisioning Technology

- -

The Eclipse Foundation makes available provisioning software, examples of which include, but are not limited to, p2 and the Eclipse - Update Manager ("Provisioning Technology") for the purpose of allowing users to install software, documentation, information and/or - other materials (collectively "Installable Software"). This capability is provided with the intent of allowing such users to - install, extend and update Eclipse-based products. Information about packaging Installable Software is available at http://eclipse.org/equinox/p2/repository_packaging.html - ("Specification").

- -

You may use Provisioning Technology to allow other parties to install Installable Software. You shall be responsible for enabling the - applicable license agreements relating to the Installable Software to be presented to, and accepted by, the users of the Provisioning Technology - in accordance with the Specification. By using Provisioning Technology in such a manner and making it available in accordance with the - Specification, you further acknowledge your agreement to, and the acquisition of all necessary rights to permit the following:

- -
  1. A series of actions may occur ("Provisioning Process") in which a user may execute the Provisioning Technology - on a machine ("Target Machine") with the intent of installing, extending or updating the functionality of an Eclipse-based - product.
  2. -
  3. During the Provisioning Process, the Provisioning Technology may cause third party Installable Software or a portion thereof to be - accessed and copied to the Target Machine.
  4. -
  5. Pursuant to the Specification, you will provide to the user the terms and conditions that govern the use of the Installable - Software ("Installable Software Agreement") and such Installable Software Agreement shall be accessed from the Target - Machine in accordance with the Specification. Such Installable Software Agreement must inform the user of the terms and conditions that govern - the Installable Software and must solicit acceptance by the end user in the manner prescribed in such Installable Software Agreement. Upon such - indication of agreement by the user, the provisioning Technology will complete installation of the Installable Software.
  6. -
- -


- -

Content may contain encryption software. The country in which you are currently may have restrictions on the import, possession, and use, and/or re-export to - another country, of encryption software. BEFORE using any encryption software, please check the country's laws, regulations and policies concerning the import, - possession, or use, and re-export of encryption software, to see if this is permitted.

- -

Java and all Java-based trademarks are trademarks of Oracle Corporation in the United States, other countries, or both.

- - diff --git a/vendor/github.com/eclipse/paho.mqtt.golang/oops.go b/vendor/github.com/eclipse/paho.mqtt.golang/oops.go index 39630d7..c454aeb 100644 --- a/vendor/github.com/eclipse/paho.mqtt.golang/oops.go +++ b/vendor/github.com/eclipse/paho.mqtt.golang/oops.go @@ -1,10 +1,14 @@ /* - * Copyright (c) 2013 IBM Corp. + * Copyright (c) 2021 IBM Corp and others. * * All rights reserved. This program and the accompanying materials - * are made available under the terms of the Eclipse Public License v1.0 - * which accompanies this distribution, and is available at - * http://www.eclipse.org/legal/epl-v10.html + * are made available under the terms of the Eclipse Public License v2.0 + * and Eclipse Distribution License v1.0 which accompany this distribution. + * + * The Eclipse Public License is available at + * https://www.eclipse.org/legal/epl-2.0/ + * and the Eclipse Distribution License is available at + * http://www.eclipse.org/org/documents/edl-v10.php. * * Contributors: * Seth Hoenig diff --git a/vendor/github.com/eclipse/paho.mqtt.golang/options.go b/vendor/github.com/eclipse/paho.mqtt.golang/options.go index 04f8ae6..5aaa7d9 100644 --- a/vendor/github.com/eclipse/paho.mqtt.golang/options.go +++ b/vendor/github.com/eclipse/paho.mqtt.golang/options.go @@ -1,10 +1,14 @@ /* - * Copyright (c) 2013 IBM Corp. + * Copyright (c) 2021 IBM Corp and others. * * All rights reserved. This program and the accompanying materials - * are made available under the terms of the Eclipse Public License v1.0 - * which accompanies this distribution, and is available at - * http://www.eclipse.org/legal/epl-v10.html + * are made available under the terms of the Eclipse Public License v2.0 + * and Eclipse Distribution License v1.0 which accompany this distribution. + * + * The Eclipse Public License is available at + * https://www.eclipse.org/legal/epl-2.0/ + * and the Eclipse Distribution License is available at + * http://www.eclipse.org/org/documents/edl-v10.php. * * Contributors: * Seth Hoenig @@ -19,6 +23,7 @@ package mqtt import ( "crypto/tls" + "net" "net/http" "net/url" "strings" @@ -49,8 +54,19 @@ type OnConnectHandler func(Client) // the initial connection is lost type ReconnectHandler func(Client, *ClientOptions) +// ConnectionAttemptHandler is invoked prior to making the initial connection. +type ConnectionAttemptHandler func(broker *url.URL, tlsCfg *tls.Config) *tls.Config + +// OpenConnectionFunc is invoked to establish the underlying network connection +// Its purpose if for custom network transports. +// Does not carry out any MQTT specific handshakes. +type OpenConnectionFunc func(uri *url.URL, options ClientOptions) (net.Conn, error) + // ClientOptions contains configurable options for an Client. Note that these should be set using the // relevant methods (e.g. AddBroker) rather than directly. See those functions for information on usage. +// WARNING: Create the below using NewClientOptions unless you have a compelling reason not to. It is easy +// to create a configuration with difficult to trace issues (e.g. Mosquitto 2.0.12+ will reject connections +// with KeepAlive=0 by default). type ClientOptions struct { Servers []*url.URL ClientID string @@ -67,7 +83,7 @@ type ClientOptions struct { ProtocolVersion uint protocolVersionExplicit bool TLSConfig *tls.Config - KeepAlive int64 + KeepAlive int64 // Warning: Some brokers may reject connections with Keepalive = 0. PingTimeout time.Duration ConnectTimeout time.Duration MaxReconnectInterval time.Duration @@ -79,11 +95,16 @@ type ClientOptions struct { OnConnect OnConnectHandler OnConnectionLost ConnectionLostHandler OnReconnecting ReconnectHandler + OnConnectAttempt ConnectionAttemptHandler WriteTimeout time.Duration MessageChannelDepth uint ResumeSubs bool HTTPHeaders http.Header WebsocketOptions *WebsocketOptions + MaxResumePubInFlight int // // 0 = no limit; otherwise this is the maximum simultaneous messages sent while resuming + Dialer *net.Dialer + CustomOpenConnectionFn OpenConnectionFunc + AutoAckDisabled bool } // NewClientOptions will create a new ClientClientOptions type with some @@ -120,10 +141,14 @@ func NewClientOptions() *ClientOptions { Store: nil, OnConnect: nil, OnConnectionLost: DefaultConnectionLostHandler, + OnConnectAttempt: nil, WriteTimeout: 0, // 0 represents timeout disabled ResumeSubs: false, HTTPHeaders: make(map[string][]string), WebsocketOptions: &WebsocketOptions{}, + Dialer: &net.Dialer{Timeout: 30 * time.Second}, + CustomOpenConnectionFn: nil, + AutoAckDisabled: false, } return o } @@ -321,6 +346,15 @@ func (o *ClientOptions) SetReconnectingHandler(cb ReconnectHandler) *ClientOptio return o } +// SetConnectionAttemptHandler sets the ConnectionAttemptHandler callback to be executed prior +// to each attempt to connect to an MQTT broker. Returns the *tls.Config that will be used when establishing +// the connection (a copy of the tls.Config from ClientOptions will be passed in along with the broker URL). +// This allows connection specific changes to be made to the *tls.Config. +func (o *ClientOptions) SetConnectionAttemptHandler(onConnectAttempt ConnectionAttemptHandler) *ClientOptions { + o.OnConnectAttempt = onConnectAttempt + return o +} + // SetWriteTimeout puts a limit on how long a mqtt publish should block until it unblocks with a // timeout error. A duration of 0 never times out. Default never times out func (o *ClientOptions) SetWriteTimeout(t time.Duration) *ClientOptions { @@ -333,6 +367,7 @@ func (o *ClientOptions) SetWriteTimeout(t time.Duration) *ClientOptions { // Default 30 seconds. Currently only operational on TCP/TLS connections. func (o *ClientOptions) SetConnectTimeout(t time.Duration) *ClientOptions { o.ConnectTimeout = t + o.Dialer.Timeout = t return o } @@ -387,3 +422,36 @@ func (o *ClientOptions) SetWebsocketOptions(w *WebsocketOptions) *ClientOptions o.WebsocketOptions = w return o } + +// SetMaxResumePubInFlight sets the maximum simultaneous publish messages that will be sent while resuming. Note that +// this only applies to messages coming from the store (so additional sends may push us over the limit) +// Note that the connect token will not be flagged as complete until all messages have been sent from the +// store. If broker does not respond to messages then resume may not complete. +// This option was put in place because resuming after downtime can saturate low capacity links. +func (o *ClientOptions) SetMaxResumePubInFlight(MaxResumePubInFlight int) *ClientOptions { + o.MaxResumePubInFlight = MaxResumePubInFlight + return o +} + +// SetDialer sets the tcp dialer options used in a tcp connection +func (o *ClientOptions) SetDialer(dialer *net.Dialer) *ClientOptions { + o.Dialer = dialer + return o +} + +// SetCustomOpenConnectionFn replaces the inbuilt function that establishes a network connection with a custom function. +// The passed in function should return an open `net.Conn` or an error (see the existing openConnection function for an example) +// It enables custom networking types in addition to the defaults (tcp, tls, websockets...) +func (o *ClientOptions) SetCustomOpenConnectionFn(customOpenConnectionFn OpenConnectionFunc) *ClientOptions { + if customOpenConnectionFn != nil { + o.CustomOpenConnectionFn = customOpenConnectionFn + } + return o +} + +// SetAutoAckDisabled enables or disables the Automated Acking of Messages received by the handler. +// By default it is set to false. Setting it to true will disable the auto-ack globally. +func (o *ClientOptions) SetAutoAckDisabled(autoAckDisabled bool) *ClientOptions { + o.AutoAckDisabled = autoAckDisabled + return o +} diff --git a/vendor/github.com/eclipse/paho.mqtt.golang/options_reader.go b/vendor/github.com/eclipse/paho.mqtt.golang/options_reader.go index 7ff240f..10a9e49 100644 --- a/vendor/github.com/eclipse/paho.mqtt.golang/options_reader.go +++ b/vendor/github.com/eclipse/paho.mqtt.golang/options_reader.go @@ -1,10 +1,14 @@ /* - * Copyright (c) 2013 IBM Corp. + * Copyright (c) 2021 IBM Corp and others. * * All rights reserved. This program and the accompanying materials - * are made available under the terms of the Eclipse Public License v1.0 - * which accompanies this distribution, and is available at - * http://www.eclipse.org/legal/epl-v10.html + * are made available under the terms of the Eclipse Public License v2.0 + * and Eclipse Distribution License v1.0 which accompany this distribution. + * + * The Eclipse Public License is available at + * https://www.eclipse.org/legal/epl-2.0/ + * and the Eclipse Distribution License is available at + * http://www.eclipse.org/org/documents/edl-v10.php. * * Contributors: * Seth Hoenig diff --git a/vendor/github.com/eclipse/paho.mqtt.golang/packets/connack.go b/vendor/github.com/eclipse/paho.mqtt.golang/packets/connack.go index 1a0dc95..3a7b98f 100644 --- a/vendor/github.com/eclipse/paho.mqtt.golang/packets/connack.go +++ b/vendor/github.com/eclipse/paho.mqtt.golang/packets/connack.go @@ -1,3 +1,19 @@ +/* + * Copyright (c) 2021 IBM Corp and others. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v2.0 + * and Eclipse Distribution License v1.0 which accompany this distribution. + * + * The Eclipse Public License is available at + * https://www.eclipse.org/legal/epl-2.0/ + * and the Eclipse Distribution License is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * Contributors: + * Allan Stockdill-Mander + */ + package packets import ( diff --git a/vendor/github.com/eclipse/paho.mqtt.golang/packets/connect.go b/vendor/github.com/eclipse/paho.mqtt.golang/packets/connect.go index 7284682..b4446a5 100644 --- a/vendor/github.com/eclipse/paho.mqtt.golang/packets/connect.go +++ b/vendor/github.com/eclipse/paho.mqtt.golang/packets/connect.go @@ -1,3 +1,19 @@ +/* + * Copyright (c) 2021 IBM Corp and others. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v2.0 + * and Eclipse Distribution License v1.0 which accompany this distribution. + * + * The Eclipse Public License is available at + * https://www.eclipse.org/legal/epl-2.0/ + * and the Eclipse Distribution License is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * Contributors: + * Allan Stockdill-Mander + */ + package packets import ( @@ -29,7 +45,11 @@ type ConnectPacket struct { } func (c *ConnectPacket) String() string { - return fmt.Sprintf("%s protocolversion: %d protocolname: %s cleansession: %t willflag: %t WillQos: %d WillRetain: %t Usernameflag: %t Passwordflag: %t keepalive: %d clientId: %s willtopic: %s willmessage: %s Username: %s Password: %s", c.FixedHeader, c.ProtocolVersion, c.ProtocolName, c.CleanSession, c.WillFlag, c.WillQos, c.WillRetain, c.UsernameFlag, c.PasswordFlag, c.Keepalive, c.ClientIdentifier, c.WillTopic, c.WillMessage, c.Username, c.Password) + var password string + if len(c.Password) > 0 { + password = "" + } + return fmt.Sprintf("%s protocolversion: %d protocolname: %s cleansession: %t willflag: %t WillQos: %d WillRetain: %t Usernameflag: %t Passwordflag: %t keepalive: %d clientId: %s willtopic: %s willmessage: %s Username: %s Password: %s", c.FixedHeader, c.ProtocolVersion, c.ProtocolName, c.CleanSession, c.WillFlag, c.WillQos, c.WillRetain, c.UsernameFlag, c.PasswordFlag, c.Keepalive, c.ClientIdentifier, c.WillTopic, c.WillMessage, c.Username, password) } func (c *ConnectPacket) Write(w io.Writer) error { diff --git a/vendor/github.com/eclipse/paho.mqtt.golang/packets/disconnect.go b/vendor/github.com/eclipse/paho.mqtt.golang/packets/disconnect.go index a24a84f..cf352a3 100644 --- a/vendor/github.com/eclipse/paho.mqtt.golang/packets/disconnect.go +++ b/vendor/github.com/eclipse/paho.mqtt.golang/packets/disconnect.go @@ -1,3 +1,19 @@ +/* + * Copyright (c) 2021 IBM Corp and others. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v2.0 + * and Eclipse Distribution License v1.0 which accompany this distribution. + * + * The Eclipse Public License is available at + * https://www.eclipse.org/legal/epl-2.0/ + * and the Eclipse Distribution License is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * Contributors: + * Allan Stockdill-Mander + */ + package packets import ( diff --git a/vendor/github.com/eclipse/paho.mqtt.golang/packets/packets.go b/vendor/github.com/eclipse/paho.mqtt.golang/packets/packets.go index 3c734b3..b2d7ed1 100644 --- a/vendor/github.com/eclipse/paho.mqtt.golang/packets/packets.go +++ b/vendor/github.com/eclipse/paho.mqtt.golang/packets/packets.go @@ -1,3 +1,19 @@ +/* + * Copyright (c) 2021 IBM Corp and others. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v2.0 + * and Eclipse Distribution License v1.0 which accompany this distribution. + * + * The Eclipse Public License is available at + * https://www.eclipse.org/legal/epl-2.0/ + * and the Eclipse Distribution License is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * Contributors: + * Allan Stockdill-Mander + */ + package packets import ( diff --git a/vendor/github.com/eclipse/paho.mqtt.golang/packets/pingreq.go b/vendor/github.com/eclipse/paho.mqtt.golang/packets/pingreq.go index 4fa54b2..cd52948 100644 --- a/vendor/github.com/eclipse/paho.mqtt.golang/packets/pingreq.go +++ b/vendor/github.com/eclipse/paho.mqtt.golang/packets/pingreq.go @@ -1,3 +1,19 @@ +/* + * Copyright (c) 2021 IBM Corp and others. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v2.0 + * and Eclipse Distribution License v1.0 which accompany this distribution. + * + * The Eclipse Public License is available at + * https://www.eclipse.org/legal/epl-2.0/ + * and the Eclipse Distribution License is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * Contributors: + * Allan Stockdill-Mander + */ + package packets import ( diff --git a/vendor/github.com/eclipse/paho.mqtt.golang/packets/pingresp.go b/vendor/github.com/eclipse/paho.mqtt.golang/packets/pingresp.go index 61388ca..d7becdf 100644 --- a/vendor/github.com/eclipse/paho.mqtt.golang/packets/pingresp.go +++ b/vendor/github.com/eclipse/paho.mqtt.golang/packets/pingresp.go @@ -1,3 +1,19 @@ +/* + * Copyright (c) 2021 IBM Corp and others. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v2.0 + * and Eclipse Distribution License v1.0 which accompany this distribution. + * + * The Eclipse Public License is available at + * https://www.eclipse.org/legal/epl-2.0/ + * and the Eclipse Distribution License is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * Contributors: + * Allan Stockdill-Mander + */ + package packets import ( diff --git a/vendor/github.com/eclipse/paho.mqtt.golang/packets/puback.go b/vendor/github.com/eclipse/paho.mqtt.golang/packets/puback.go index 5e6c381..f6e727e 100644 --- a/vendor/github.com/eclipse/paho.mqtt.golang/packets/puback.go +++ b/vendor/github.com/eclipse/paho.mqtt.golang/packets/puback.go @@ -1,3 +1,19 @@ +/* + * Copyright (c) 2021 IBM Corp and others. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v2.0 + * and Eclipse Distribution License v1.0 which accompany this distribution. + * + * The Eclipse Public License is available at + * https://www.eclipse.org/legal/epl-2.0/ + * and the Eclipse Distribution License is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * Contributors: + * Allan Stockdill-Mander + */ + package packets import ( diff --git a/vendor/github.com/eclipse/paho.mqtt.golang/packets/pubcomp.go b/vendor/github.com/eclipse/paho.mqtt.golang/packets/pubcomp.go index 47e043d..84a1af5 100644 --- a/vendor/github.com/eclipse/paho.mqtt.golang/packets/pubcomp.go +++ b/vendor/github.com/eclipse/paho.mqtt.golang/packets/pubcomp.go @@ -1,3 +1,19 @@ +/* + * Copyright (c) 2021 IBM Corp and others. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v2.0 + * and Eclipse Distribution License v1.0 which accompany this distribution. + * + * The Eclipse Public License is available at + * https://www.eclipse.org/legal/epl-2.0/ + * and the Eclipse Distribution License is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * Contributors: + * Allan Stockdill-Mander + */ + package packets import ( diff --git a/vendor/github.com/eclipse/paho.mqtt.golang/packets/publish.go b/vendor/github.com/eclipse/paho.mqtt.golang/packets/publish.go index cdd76b1..9fba5df 100644 --- a/vendor/github.com/eclipse/paho.mqtt.golang/packets/publish.go +++ b/vendor/github.com/eclipse/paho.mqtt.golang/packets/publish.go @@ -1,3 +1,19 @@ +/* + * Copyright (c) 2021 IBM Corp and others. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v2.0 + * and Eclipse Distribution License v1.0 which accompany this distribution. + * + * The Eclipse Public License is available at + * https://www.eclipse.org/legal/epl-2.0/ + * and the Eclipse Distribution License is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * Contributors: + * Allan Stockdill-Mander + */ + package packets import ( diff --git a/vendor/github.com/eclipse/paho.mqtt.golang/packets/pubrec.go b/vendor/github.com/eclipse/paho.mqtt.golang/packets/pubrec.go index f0a72c5..da9ed2a 100644 --- a/vendor/github.com/eclipse/paho.mqtt.golang/packets/pubrec.go +++ b/vendor/github.com/eclipse/paho.mqtt.golang/packets/pubrec.go @@ -1,3 +1,19 @@ +/* + * Copyright (c) 2021 IBM Corp and others. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v2.0 + * and Eclipse Distribution License v1.0 which accompany this distribution. + * + * The Eclipse Public License is available at + * https://www.eclipse.org/legal/epl-2.0/ + * and the Eclipse Distribution License is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * Contributors: + * Allan Stockdill-Mander + */ + package packets import ( diff --git a/vendor/github.com/eclipse/paho.mqtt.golang/packets/pubrel.go b/vendor/github.com/eclipse/paho.mqtt.golang/packets/pubrel.go index af29a82..f418ff8 100644 --- a/vendor/github.com/eclipse/paho.mqtt.golang/packets/pubrel.go +++ b/vendor/github.com/eclipse/paho.mqtt.golang/packets/pubrel.go @@ -1,3 +1,19 @@ +/* + * Copyright (c) 2021 IBM Corp and others. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v2.0 + * and Eclipse Distribution License v1.0 which accompany this distribution. + * + * The Eclipse Public License is available at + * https://www.eclipse.org/legal/epl-2.0/ + * and the Eclipse Distribution License is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * Contributors: + * Allan Stockdill-Mander + */ + package packets import ( diff --git a/vendor/github.com/eclipse/paho.mqtt.golang/packets/suback.go b/vendor/github.com/eclipse/paho.mqtt.golang/packets/suback.go index cdea660..261cf21 100644 --- a/vendor/github.com/eclipse/paho.mqtt.golang/packets/suback.go +++ b/vendor/github.com/eclipse/paho.mqtt.golang/packets/suback.go @@ -1,3 +1,19 @@ +/* + * Copyright (c) 2021 IBM Corp and others. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v2.0 + * and Eclipse Distribution License v1.0 which accompany this distribution. + * + * The Eclipse Public License is available at + * https://www.eclipse.org/legal/epl-2.0/ + * and the Eclipse Distribution License is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * Contributors: + * Allan Stockdill-Mander + */ + package packets import ( diff --git a/vendor/github.com/eclipse/paho.mqtt.golang/packets/subscribe.go b/vendor/github.com/eclipse/paho.mqtt.golang/packets/subscribe.go index daef901..313bf5a 100644 --- a/vendor/github.com/eclipse/paho.mqtt.golang/packets/subscribe.go +++ b/vendor/github.com/eclipse/paho.mqtt.golang/packets/subscribe.go @@ -1,3 +1,19 @@ +/* + * Copyright (c) 2021 IBM Corp and others. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v2.0 + * and Eclipse Distribution License v1.0 which accompany this distribution. + * + * The Eclipse Public License is available at + * https://www.eclipse.org/legal/epl-2.0/ + * and the Eclipse Distribution License is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * Contributors: + * Allan Stockdill-Mander + */ + package packets import ( diff --git a/vendor/github.com/eclipse/paho.mqtt.golang/packets/unsuback.go b/vendor/github.com/eclipse/paho.mqtt.golang/packets/unsuback.go index bdf5fd3..acdd400 100644 --- a/vendor/github.com/eclipse/paho.mqtt.golang/packets/unsuback.go +++ b/vendor/github.com/eclipse/paho.mqtt.golang/packets/unsuback.go @@ -1,3 +1,19 @@ +/* + * Copyright (c) 2021 IBM Corp and others. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v2.0 + * and Eclipse Distribution License v1.0 which accompany this distribution. + * + * The Eclipse Public License is available at + * https://www.eclipse.org/legal/epl-2.0/ + * and the Eclipse Distribution License is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * Contributors: + * Allan Stockdill-Mander + */ + package packets import ( diff --git a/vendor/github.com/eclipse/paho.mqtt.golang/packets/unsubscribe.go b/vendor/github.com/eclipse/paho.mqtt.golang/packets/unsubscribe.go index 9ccb850..54d06aa 100644 --- a/vendor/github.com/eclipse/paho.mqtt.golang/packets/unsubscribe.go +++ b/vendor/github.com/eclipse/paho.mqtt.golang/packets/unsubscribe.go @@ -1,3 +1,19 @@ +/* + * Copyright (c) 2021 IBM Corp and others. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v2.0 + * and Eclipse Distribution License v1.0 which accompany this distribution. + * + * The Eclipse Public License is available at + * https://www.eclipse.org/legal/epl-2.0/ + * and the Eclipse Distribution License is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * Contributors: + * Allan Stockdill-Mander + */ + package packets import ( diff --git a/vendor/github.com/eclipse/paho.mqtt.golang/ping.go b/vendor/github.com/eclipse/paho.mqtt.golang/ping.go index 50421f4..857aa0e 100644 --- a/vendor/github.com/eclipse/paho.mqtt.golang/ping.go +++ b/vendor/github.com/eclipse/paho.mqtt.golang/ping.go @@ -1,10 +1,14 @@ /* - * Copyright (c) 2013 IBM Corp. + * Copyright (c) 2021 IBM Corp and others. * * All rights reserved. This program and the accompanying materials - * are made available under the terms of the Eclipse Public License v1.0 - * which accompanies this distribution, and is available at - * http://www.eclipse.org/legal/epl-v10.html + * are made available under the terms of the Eclipse Public License v2.0 + * and Eclipse Distribution License v1.0 which accompany this distribution. + * + * The Eclipse Public License is available at + * https://www.eclipse.org/legal/epl-2.0/ + * and the Eclipse Distribution License is available at + * http://www.eclipse.org/org/documents/edl-v10.php. * * Contributors: * Seth Hoenig @@ -54,8 +58,8 @@ func keepalive(c *client, conn io.Writer) { if atomic.LoadInt32(&c.pingOutstanding) == 0 { DEBUG.Println(PNG, "keepalive sending ping") ping := packets.NewControlPacket(packets.Pingreq).(*packets.PingreqPacket) - // We don't want to wait behind large messages being sent, the Write call - // will block until it it able to send the packet. + // We don't want to wait behind large messages being sent, the `Write` call + // will block until it is able to send the packet. atomic.StoreInt32(&c.pingOutstanding, 1) if err := ping.Write(conn); err != nil { ERROR.Println(PNG, err) diff --git a/vendor/github.com/eclipse/paho.mqtt.golang/router.go b/vendor/github.com/eclipse/paho.mqtt.golang/router.go index 42261ee..bd05a0c 100644 --- a/vendor/github.com/eclipse/paho.mqtt.golang/router.go +++ b/vendor/github.com/eclipse/paho.mqtt.golang/router.go @@ -1,10 +1,14 @@ /* - * Copyright (c) 2013 IBM Corp. + * Copyright (c) 2021 IBM Corp and others. * * All rights reserved. This program and the accompanying materials - * are made available under the terms of the Eclipse Public License v1.0 - * which accompanies this distribution, and is available at - * http://www.eclipse.org/legal/epl-v10.html + * are made available under the terms of the Eclipse Public License v2.0 + * and Eclipse Distribution License v1.0 which accompany this distribution. + * + * The Eclipse Public License is available at + * https://www.eclipse.org/legal/epl-2.0/ + * and the Eclipse Distribution License is available at + * http://www.eclipse.org/org/documents/edl-v10.php. * * Contributors: * Seth Hoenig @@ -132,13 +136,46 @@ func (r *router) setDefaultHandler(handler MessageHandler) { // associated callback (or the defaultHandler, if one exists and no other route matched). If // anything is sent down the stop channel the function will end. func (r *router) matchAndDispatch(messages <-chan *packets.PublishPacket, order bool, client *client) <-chan *PacketAndToken { - ackChan := make(chan *PacketAndToken) - go func() { + var wg sync.WaitGroup + ackOutChan := make(chan *PacketAndToken) // Channel returned to caller; closed when messages channel closed + var ackInChan chan *PacketAndToken // ACKs generated by ackFunc get put onto this channel + + stopAckCopy := make(chan struct{}) // Closure requests stop of go routine copying ackInChan to ackOutChan + ackCopyStopped := make(chan struct{}) // Closure indicates that it is safe to close ackOutChan + goRoutinesDone := make(chan struct{}) // closed on wg.Done() + if order { + ackInChan = ackOutChan // When order = true no go routines are used so safe to use one channel and close when done + } else { + // When order = false ACK messages are sent in go routines so ackInChan cannot be closed until all goroutines done + ackInChan = make(chan *PacketAndToken) + go func() { // go routine to copy from ackInChan to ackOutChan until stopped + for { + select { + case a := <-ackInChan: + ackOutChan <- a + case <-stopAckCopy: + close(ackCopyStopped) // Signal main go routine that it is safe to close ackOutChan + for { + select { + case <-ackInChan: // drain ackInChan to ensure all goRoutines can complete cleanly (ACK dropped) + DEBUG.Println(ROU, "matchAndDispatch received acknowledgment after processing stopped (ACK dropped).") + case <-goRoutinesDone: + close(ackInChan) // Nothing further should be sent (a panic is probably better than silent failure) + DEBUG.Println(ROU, "matchAndDispatch order=false copy goroutine exiting.") + return + } + } + } + } + }() + } + + go func() { // Main go routine handling inbound messages for message := range messages { // DEBUG.Println(ROU, "matchAndDispatch received message") sent := false r.RLock() - m := messageFromPublish(message, ackFunc(ackChan, client.persist, message)) + m := messageFromPublish(message, ackFunc(ackInChan, client.persist, message)) var handlers []MessageHandler for e := r.routes.Front(); e != nil; e = e.Next() { if e.Value.(*route).match(message.TopicName) { @@ -146,9 +183,13 @@ func (r *router) matchAndDispatch(messages <-chan *packets.PublishPacket, order handlers = append(handlers, e.Value.(*route).callback) } else { hd := e.Value.(*route).callback + wg.Add(1) go func() { hd(client, m) - m.Ack() + if !client.options.AutoAckDisabled { + m.Ack() + } + wg.Done() }() } sent = true @@ -159,9 +200,13 @@ func (r *router) matchAndDispatch(messages <-chan *packets.PublishPacket, order if order { handlers = append(handlers, r.defaultHandler) } else { + wg.Add(1) go func() { r.defaultHandler(client, m) - m.Ack() + if !client.options.AutoAckDisabled { + m.Ack() + } + wg.Done() }() } } else { @@ -171,12 +216,24 @@ func (r *router) matchAndDispatch(messages <-chan *packets.PublishPacket, order r.RUnlock() for _, handler := range handlers { handler(client, m) - m.Ack() + if !client.options.AutoAckDisabled { + m.Ack() + } } // DEBUG.Println(ROU, "matchAndDispatch handled message") } - close(ackChan) + if order { + close(ackOutChan) + } else { // Ensure that nothing further will be written to ackOutChan before closing it + close(stopAckCopy) + <-ackCopyStopped + close(ackOutChan) + go func() { + wg.Wait() // Note: If this remains running then the user has handlers that are not returning + close(goRoutinesDone) + }() + } DEBUG.Println(ROU, "matchAndDispatch exiting") }() - return ackChan + return ackOutChan } diff --git a/vendor/github.com/eclipse/paho.mqtt.golang/status.go b/vendor/github.com/eclipse/paho.mqtt.golang/status.go new file mode 100644 index 0000000..d25fbf5 --- /dev/null +++ b/vendor/github.com/eclipse/paho.mqtt.golang/status.go @@ -0,0 +1,296 @@ +/* + * Copyright (c) 2021 IBM Corp and others. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v2.0 + * and Eclipse Distribution License v1.0 which accompany this distribution. + * + * The Eclipse Public License is available at + * https://www.eclipse.org/legal/epl-2.0/ + * and the Eclipse Distribution License is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * Contributors: + * Seth Hoenig + * Allan Stockdill-Mander + * Mike Robertson + * Matt Brittan + */ + +package mqtt + +import ( + "errors" + "sync" +) + +// Status - Manage the connection status + +// Multiple go routines will want to access/set this. Previously status was implemented as a `uint32` and updated +// with a mixture of atomic functions and a mutex (leading to some deadlock type issues that were very hard to debug). + +// In this new implementation `connectionStatus` takes over managing the state and provides functions that allow the +// client to request a move to a particular state (it may reject these requests!). In some cases the 'state' is +// transitory, for example `connecting`, in those cases a function will be returned that allows the client to move +// to a more static state (`disconnected` or `connected`). + +// This "belts-and-braces" may be a little over the top but issues with the status have caused a number of difficult +// to trace bugs in the past and the likelihood that introducing a new system would introduce bugs seemed high! +// I have written this in a way that should make it very difficult to misuse it (but it does make things a little +// complex with functions returning functions that return functions!). + +type status uint32 + +const ( + disconnected status = iota // default (nil) status is disconnected + disconnecting // Transitioning from one of the below states back to disconnected + connecting + reconnecting + connected +) + +// String simplify output of statuses +func (s status) String() string { + switch s { + case disconnected: + return "disconnected" + case disconnecting: + return "disconnecting" + case connecting: + return "connecting" + case reconnecting: + return "reconnecting" + case connected: + return "connected" + default: + return "invalid" + } +} + +type connCompletedFn func(success bool) error +type disconnectCompletedFn func() +type connectionLostHandledFn func(bool) (connCompletedFn, error) + +/* State transitions + +static states are `disconnected` and `connected`. For all other states a process will hold a function that will move +the state to one of those. That function effectively owns the state and any other changes must not proceed until it +completes. One exception to that is that the state can always be moved to `disconnecting` which provides a signal that +transitions to `connected` will be rejected (this is required because a Disconnect can be requested while in the +Connecting state). + +# Basic Operations + +The standard workflows are: + +disconnected -> `Connecting()` -> connecting -> `connCompletedFn(true)` -> connected +connected -> `Disconnecting()` -> disconnecting -> `disconnectCompletedFn()` -> disconnected +connected -> `ConnectionLost(false)` -> disconnecting -> `connectionLostHandledFn(true/false)` -> disconnected +connected -> `ConnectionLost(true)` -> disconnecting -> `connectionLostHandledFn(true)` -> connected + +Unfortunately the above workflows are complicated by the fact that `Disconnecting()` or `ConnectionLost()` may, +potentially, be called at any time (i.e. whilst in the middle of transitioning between states). If this happens: + +* The state will be set to disconnecting (which will prevent any request to move the status to connected) +* The call to `Disconnecting()`/`ConnectionLost()` will block until the previously active call completes and then + handle the disconnection. + +Reading the tests (unit_status_test.go) might help understand these rules. +*/ + +var ( + errAbortConnection = errors.New("disconnect called whist connection attempt in progress") + errAlreadyConnectedOrReconnecting = errors.New("status is already connected or reconnecting") + errStatusMustBeDisconnected = errors.New("status can only transition to connecting from disconnected") + errAlreadyDisconnected = errors.New("status is already disconnected") + errDisconnectionRequested = errors.New("disconnection was requested whilst the action was in progress") + errDisconnectionInProgress = errors.New("disconnection already in progress") + errAlreadyHandlingConnectionLoss = errors.New("status is already Connection Lost") + errConnLossWhileDisconnecting = errors.New("connection status is disconnecting so loss of connection is expected") +) + +// connectionStatus encapsulates, and protects, the connection status. +type connectionStatus struct { + sync.RWMutex // Protects the variables below + status status + willReconnect bool // only used when status == disconnecting. Indicates that an attempt will be made to reconnect (allows us to abort that) + + // Some statuses are transitional (e.g. connecting, connectionLost, reconnecting, disconnecting), that is, whatever + // process moves us into that status will move us out of it when an action is complete. Sometimes other users + // will need to know when the action is complete (e.g. the user calls `Disconnect()` whilst the status is + // `connecting`). `actionCompleted` will be set whenever we move into one of the above statues and the channel + // returned to anything else requesting a status change. The channel will be closed when the operation is complete. + actionCompleted chan struct{} // Only valid whilst status is Connecting or Reconnecting; will be closed when connection completed (success or failure) +} + +// ConnectionStatus returns the connection status. +// WARNING: the status may change at any time so users should not assume they are the only goroutine touching this +func (c *connectionStatus) ConnectionStatus() status { + c.RLock() + defer c.RUnlock() + return c.status +} + +// ConnectionStatusRetry returns the connection status and retry flag (indicates that we expect to reconnect). +// WARNING: the status may change at any time so users should not assume they are the only goroutine touching this +func (c *connectionStatus) ConnectionStatusRetry() (status, bool) { + c.RLock() + defer c.RUnlock() + return c.status, c.willReconnect +} + +// Connecting - Changes the status to connecting if that is a permitted operation +// Will do nothing unless the current status is disconnected +// Returns a function that MUST be called when the operation is complete (pass in true if successful) +func (c *connectionStatus) Connecting() (connCompletedFn, error) { + c.Lock() + defer c.Unlock() + // Calling Connect when already connecting (or if reconnecting) may not always be considered an error + if c.status == connected || c.status == reconnecting { + return nil, errAlreadyConnectedOrReconnecting + } + if c.status != disconnected { + return nil, errStatusMustBeDisconnected + } + c.status = connecting + c.actionCompleted = make(chan struct{}) + return c.connected, nil +} + +// connected is an internal function (it is returned by functions that set the status to connecting or reconnecting, +// calling it completes the operation). `success` is used to indicate whether the operation was successfully completed. +func (c *connectionStatus) connected(success bool) error { + c.Lock() + defer func() { + close(c.actionCompleted) // Alert anything waiting on the connection process to complete + c.actionCompleted = nil // Be tidy + c.Unlock() + }() + + // Status may have moved to disconnecting in the interim (i.e. at users request) + if c.status == disconnecting { + return errAbortConnection + } + if success { + c.status = connected + } else { + c.status = disconnected + } + return nil +} + +// Disconnecting - should be called when beginning the disconnection process (cleanup etc.). +// Can be called from ANY status and the end result will always be a status of disconnected +// Note that if a connection/reconnection attempt is in progress this function will set the status to `disconnecting` +// then block until the connection process completes (or aborts). +// Returns a function that MUST be called when the operation is complete (assumed to always be successful!) +func (c *connectionStatus) Disconnecting() (disconnectCompletedFn, error) { + c.Lock() + if c.status == disconnected { + c.Unlock() + return nil, errAlreadyDisconnected // May not always be treated as an error + } + if c.status == disconnecting { // Need to wait for existing process to complete + c.willReconnect = false // Ensure that the existing disconnect process will not reconnect + disConnectDone := c.actionCompleted + c.Unlock() + <-disConnectDone // Wait for existing operation to complete + return nil, errAlreadyDisconnected // Well we are now! + } + + prevStatus := c.status + c.status = disconnecting + + // We may need to wait for connection/reconnection process to complete (they should regularly check the status) + if prevStatus == connecting || prevStatus == reconnecting { + connectDone := c.actionCompleted + c.Unlock() // Safe because the only way to leave the disconnecting status is via this function + <-connectDone + + if prevStatus == reconnecting && !c.willReconnect { + return nil, errAlreadyDisconnected // Following connectionLost process we will be disconnected + } + c.Lock() + } + c.actionCompleted = make(chan struct{}) + c.Unlock() + return c.disconnectionCompleted, nil +} + +// disconnectionCompleted is an internal function (it is returned by functions that set the status to disconnecting) +func (c *connectionStatus) disconnectionCompleted() { + c.Lock() + defer c.Unlock() + c.status = disconnected + close(c.actionCompleted) // Alert anything waiting on the connection process to complete + c.actionCompleted = nil +} + +// ConnectionLost - should be called when the connection is lost. +// This really only differs from Disconnecting in that we may transition into a reconnection (but that could be +// cancelled something else calls Disconnecting in the meantime). +// The returned function should be called when cleanup is completed. It will return a function to be called when +// reconnect completes (or nil if no reconnect requested/disconnect called in the interim). +// Note: This function may block if a connection is in progress (the move to connected will be rejected) +func (c *connectionStatus) ConnectionLost(willReconnect bool) (connectionLostHandledFn, error) { + c.Lock() + defer c.Unlock() + if c.status == disconnected { + return nil, errAlreadyDisconnected + } + if c.status == disconnecting { // its expected that connection lost will be called during the disconnection process + return nil, errDisconnectionInProgress + } + + c.willReconnect = willReconnect + prevStatus := c.status + c.status = disconnecting + + // There is a slight possibility that a connection attempt is in progress (connection up and goroutines started but + // status not yet changed). By changing the status we ensure that process will exit cleanly + if prevStatus == connecting || prevStatus == reconnecting { + connectDone := c.actionCompleted + c.Unlock() // Safe because the only way to leave the disconnecting status is via this function + <-connectDone + c.Lock() + if !willReconnect { + // In this case the connection will always be aborted so there is nothing more for us to do + return nil, errAlreadyDisconnected + } + } + c.actionCompleted = make(chan struct{}) + + return c.getConnectionLostHandler(willReconnect), nil +} + +// getConnectionLostHandler is an internal function. It returns the function to be returned by ConnectionLost +func (c *connectionStatus) getConnectionLostHandler(reconnectRequested bool) connectionLostHandledFn { + return func(proceed bool) (connCompletedFn, error) { + // Note that connCompletedFn will only be provided if both reconnectRequested and proceed are true + c.Lock() + defer c.Unlock() + + // `Disconnecting()` may have been called while the disconnection was being processed (this makes it permanent!) + if !c.willReconnect || !proceed { + c.status = disconnected + close(c.actionCompleted) // Alert anything waiting on the connection process to complete + c.actionCompleted = nil + if !reconnectRequested || !proceed { + return nil, nil + } + return nil, errDisconnectionRequested + } + + c.status = reconnecting + return c.connected, nil // Note that c.actionCompleted is still live and will be closed in connected + } +} + +// forceConnectionStatus - forces the connection status to the specified value. +// This should only be used when there is no alternative (i.e. only in tests and to recover from situations that +// are unexpected) +func (c *connectionStatus) forceConnectionStatus(s status) { + c.Lock() + defer c.Unlock() + c.status = s +} diff --git a/vendor/github.com/eclipse/paho.mqtt.golang/store.go b/vendor/github.com/eclipse/paho.mqtt.golang/store.go index 24a76b7..f50873c 100644 --- a/vendor/github.com/eclipse/paho.mqtt.golang/store.go +++ b/vendor/github.com/eclipse/paho.mqtt.golang/store.go @@ -1,10 +1,14 @@ /* - * Copyright (c) 2013 IBM Corp. + * Copyright (c) 2021 IBM Corp and others. * * All rights reserved. This program and the accompanying materials - * are made available under the terms of the Eclipse Public License v1.0 - * which accompanies this distribution, and is available at - * http://www.eclipse.org/legal/epl-v10.html + * are made available under the terms of the Eclipse Public License v2.0 + * and Eclipse Distribution License v1.0 which accompany this distribution. + * + * The Eclipse Public License is available at + * https://www.eclipse.org/legal/epl-2.0/ + * and the Eclipse Distribution License is available at + * http://www.eclipse.org/org/documents/edl-v10.php. * * Contributors: * Seth Hoenig @@ -45,7 +49,7 @@ type Store interface { // where X is 'i' or 'o' func mIDFromKey(key string) uint16 { s := key[2:] - i, err := strconv.Atoi(s) + i, err := strconv.ParseUint(s, 10, 16) chkerr(err) return uint16(i) } diff --git a/vendor/github.com/eclipse/paho.mqtt.golang/token.go b/vendor/github.com/eclipse/paho.mqtt.golang/token.go index dade215..996ab5b 100644 --- a/vendor/github.com/eclipse/paho.mqtt.golang/token.go +++ b/vendor/github.com/eclipse/paho.mqtt.golang/token.go @@ -1,10 +1,14 @@ /* - * Copyright (c) 2014 IBM Corp. + * Copyright (c) 2021 IBM Corp and others. * * All rights reserved. This program and the accompanying materials - * are made available under the terms of the Eclipse Public License v1.0 - * which accompanies this distribution, and is available at - * http://www.eclipse.org/legal/epl-v10.html + * are made available under the terms of the Eclipse Public License v2.0 + * and Eclipse Distribution License v1.0 which accompany this distribution. + * + * The Eclipse Public License is available at + * https://www.eclipse.org/legal/epl-2.0/ + * and the Eclipse Distribution License is available at + * http://www.eclipse.org/org/documents/edl-v10.php. * * Contributors: * Allan Stockdill-Mander diff --git a/vendor/github.com/eclipse/paho.mqtt.golang/topic.go b/vendor/github.com/eclipse/paho.mqtt.golang/topic.go index 6604e68..966540a 100644 --- a/vendor/github.com/eclipse/paho.mqtt.golang/topic.go +++ b/vendor/github.com/eclipse/paho.mqtt.golang/topic.go @@ -1,10 +1,14 @@ /* - * Copyright (c) 2014 IBM Corp. + * Copyright (c) 2021 IBM Corp and others. * * All rights reserved. This program and the accompanying materials - * are made available under the terms of the Eclipse Public License v1.0 - * which accompanies this distribution, and is available at - * http://www.eclipse.org/legal/epl-v10.html + * are made available under the terms of the Eclipse Public License v2.0 + * and Eclipse Distribution License v1.0 which accompany this distribution. + * + * The Eclipse Public License is available at + * https://www.eclipse.org/legal/epl-2.0/ + * and the Eclipse Distribution License is available at + * http://www.eclipse.org/org/documents/edl-v10.php. * * Contributors: * Seth Hoenig diff --git a/vendor/github.com/eclipse/paho.mqtt.golang/trace.go b/vendor/github.com/eclipse/paho.mqtt.golang/trace.go index 904a664..b07b604 100644 --- a/vendor/github.com/eclipse/paho.mqtt.golang/trace.go +++ b/vendor/github.com/eclipse/paho.mqtt.golang/trace.go @@ -1,10 +1,14 @@ /* - * Copyright (c) 2013 IBM Corp. + * Copyright (c) 2021 IBM Corp and others. * * All rights reserved. This program and the accompanying materials - * are made available under the terms of the Eclipse Public License v1.0 - * which accompanies this distribution, and is available at - * http://www.eclipse.org/legal/epl-v10.html + * are made available under the terms of the Eclipse Public License v2.0 + * and Eclipse Distribution License v1.0 which accompany this distribution. + * + * The Eclipse Public License is available at + * https://www.eclipse.org/legal/epl-2.0/ + * and the Eclipse Distribution License is available at + * http://www.eclipse.org/org/documents/edl-v10.php. * * Contributors: * Seth Hoenig diff --git a/vendor/github.com/eclipse/paho.mqtt.golang/websocket.go b/vendor/github.com/eclipse/paho.mqtt.golang/websocket.go index 9fa41ce..e0f2583 100644 --- a/vendor/github.com/eclipse/paho.mqtt.golang/websocket.go +++ b/vendor/github.com/eclipse/paho.mqtt.golang/websocket.go @@ -1,3 +1,16 @@ +/* + * This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v2.0 + * and Eclipse Distribution License v1.0 which accompany this distribution. + * + * The Eclipse Public License is available at + * https://www.eclipse.org/legal/epl-2.0/ + * and the Eclipse Distribution License is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * Contributors: + */ + package mqtt import ( diff --git a/vendor/golang.org/x/sync/AUTHORS b/vendor/golang.org/x/sync/AUTHORS new file mode 100644 index 0000000..15167cd --- /dev/null +++ b/vendor/golang.org/x/sync/AUTHORS @@ -0,0 +1,3 @@ +# This source code refers to The Go Authors for copyright purposes. +# The master list of authors is in the main Go distribution, +# visible at http://tip.golang.org/AUTHORS. diff --git a/vendor/golang.org/x/sync/CONTRIBUTORS b/vendor/golang.org/x/sync/CONTRIBUTORS new file mode 100644 index 0000000..1c4577e --- /dev/null +++ b/vendor/golang.org/x/sync/CONTRIBUTORS @@ -0,0 +1,3 @@ +# This source code was written by the Go contributors. +# The master list of contributors is in the main Go distribution, +# visible at http://tip.golang.org/CONTRIBUTORS. diff --git a/vendor/golang.org/x/sync/LICENSE b/vendor/golang.org/x/sync/LICENSE new file mode 100644 index 0000000..6a66aea --- /dev/null +++ b/vendor/golang.org/x/sync/LICENSE @@ -0,0 +1,27 @@ +Copyright (c) 2009 The Go Authors. 