Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

error called: read tcp use of closed network connection in logs #282

Open
CypherpunkSamurai opened this issue Dec 4, 2024 · 6 comments
Open

Comments

@CypherpunkSamurai
Copy link

Describe the bug
When i publish to a topic i get the following in mqtt logs:

error called: read tcp [2409:40e1:3012:73e0:a847:805c:d865:a71]:59868->[2001:41d0:a:6f1c::1]:8885: use of closed network connection

To reproduce
Copy the copy and run tests:

package signaling_test

import (
	"context"
	"testing"
	"time"
)

const (
	// public test server thanks to https://test.mosquitto.org
	// check for active servers here: https://testclient-cloud.mqtt.cool
	testServer   = "test.mosquitto.org:8885" // Using secure port
	testUser     = "rw"                      // Read-write user
	testPass     = "readwrite"               // Password for authenticated access
	testClientID = "mqtt-test-client"
	testTopic    = "test" // More specific test topic
	testMessage  = "TEST PASSED"
)

func setupTestClient(t *testing.T) *MQTTClient {
	ctx := context.Background()
	client, err := NewMQTT(ctx, testUser, testPass, testServer, testClientID, false)
	if err != nil {
		t.Fatalf("Failed to create MQTT client: %v", err)
	}

	err = client.Connect()
	if err != nil {
		t.Fatalf("Failed to connect MQTT client: %v", err)
	}

	return client
}

func TestMQTTConnect(t *testing.T) {
	client := setupTestClient(t)
	defer client.Disconnect()
}

func TestMQTTSubscribe(t *testing.T) {
	client := setupTestClient(t)
	defer client.Disconnect()

	err := client.Subscribe(testTopic, 1)
	defer client.Unsubscribe(testTopic)
	if err != nil {
		t.Fatalf("Failed to subscribe: %v", err)
	}
}

func TestMQTTPublish(t *testing.T) {
	client := setupTestClient(t)
	defer client.Disconnect()

	err := client.Publish(testTopic, testMessage, 1)
	if err != nil {
		t.Fatalf("Failed to publish: %v", err)
	}
}

func TestMQTTDisconnect(t *testing.T) {
	client := setupTestClient(t)

	err := client.Disconnect()
	if err != nil {
		t.Fatalf("Failed to disconnect: %v", err)
	}
}

func TestMQTTMultiplePublish(t *testing.T) {
	client := setupTestClient(t)
	defer client.Disconnect()

	messages := []string{
		"TEST PASSED 1",
		"TEST PASSED 2",
		"TEST PASSED 3",
	}

	for _, msg := range messages {
		err := client.Publish(testTopic, msg, 1)
		if err != nil {
			t.Fatalf("Failed to publish message %s: %v", msg, err)
		}
		time.Sleep(100 * time.Millisecond)
	}
}

func TestMQTTMultipleSubscribe(t *testing.T) {
	client := setupTestClient(t)
	defer client.Disconnect()

	topics := []string{
		"test/topic1",
		"test/topic2",
		"test/topic3",
	}

	for _, topic := range topics {
		err := client.Subscribe(topic, 1) // Changed QoS from 0 to 1 for more reliable delivery
		defer client.Unsubscribe(topic)
		if err != nil {
			t.Fatalf("Failed to subscribe to topic %s: %v", topic, err)
		}
		time.Sleep(1 * time.Second) // Added delay between subscriptions to prevent connection issues
	}
}

mqtt:

package signaling

import (
	"context"
	"crypto/tls"
	"fmt"
	"log"

	"github.com/eclipse/paho.golang/paho"
)

// MQTTClient is a wrapper around the paho MQTT client
type MQTTClient struct {
	tlsConf        *tls.Config
	conn           *tls.Conn
	ctx            context.Context
	logger         *log.Logger
	pahoClient     *paho.Client
	pahoClientConf *paho.ClientConfig
	pahoConConf    *paho.Connect
}

// Create a new MQTT client
func NewMQTT(ctx context.Context, username, password, server, clientid string, verifySSL bool) (*MQTTClient, error) {
	// create a logger
	logger := log.New(log.Writer(), "[mqtt-client]: ", log.LstdFlags)
	// create a TLS config
	tlsConf := &tls.Config{
		InsecureSkipVerify: !verifySSL,
		MinVersion:         tls.VersionTLS12,
	}

	// dial tcp
	con, err := tls.Dial("tcp", server, tlsConf)
	if err != nil {
		return nil, err
	}

	// create client
	client := paho.NewClient(paho.ClientConfig{
		Conn: con,
		// Handlers
		// You can write the function(s) yourself or use the supplied Router
		OnPublishReceived: []func(paho.PublishReceived) (bool, error){
			func(pr paho.PublishReceived) (bool, error) {
				logger.Printf("received message on topic %s; body: %s (retain: %t)\n", pr.Packet.Topic, pr.Packet.Payload, pr.Packet.Retain)
				return true, nil
			}},
		OnClientError: func(err error) { logger.Printf("client error: %s\n", err) },
		OnServerDisconnect: func(d *paho.Disconnect) {
			if d.Properties != nil {
				logger.Printf("server requested disconnect: %s\n", d.Properties.ReasonString)
			} else {
				logger.Printf("server requested disconnect; reason code: %d\n", d.ReasonCode)
			}
		},
	})

	// logger
	client.SetDebugLogger(logger)
	client.SetErrorLogger(logger)

	// paho connection
	pconConf := &paho.Connect{
		CleanStart: true,
		KeepAlive:  10 * 60,
		ClientID:   clientid,
		Username:   username,
		Password:   []byte(password),
	}
	// set flags for username and password
	pconConf.UsernameFlag = true
	pconConf.PasswordFlag = true

	return &MQTTClient{
		tlsConf:     tlsConf,
		logger:      logger,
		conn:        con,
		ctx:         ctx,
		pahoClient:  client,
		pahoConConf: pconConf,
	}, nil
}

// Connect to the MQTT broker
func (client *MQTTClient) Connect() error {
	ack, err := client.pahoClient.Connect(client.ctx, client.pahoConConf)
	if err != nil {
		return err
	}
	if ack.ReasonCode != *paho.Byte(0x00) {
		return fmt.Errorf("Connection failed with reason code %d", ack.ReasonCode)
	}
	return nil
}

// Disconnect from the MQTT broker
func (client *MQTTClient) Disconnect() error {
	d := &paho.Disconnect{
		ReasonCode: *paho.Byte(0x00),
	}
	return client.pahoClient.Disconnect(d)
}

// Subscribe to a topic
func (client *MQTTClient) Subscribe(topic string, qos byte) error {
	sub := paho.Subscribe{
		Subscriptions: []paho.SubscribeOptions{
			{
				Topic: topic,
				QoS:   0,
			},
		},
	}
	_, err := client.pahoClient.Subscribe(client.ctx, &sub)
	if err != nil {
		return err
	}
	return nil
}

// Unsubscribe to a topic
func (client *MQTTClient) Unsubscribe(topic string) error {
	unsub := paho.Unsubscribe{
		Topics: []string{topic},
	}
	_, err := client.pahoClient.Unsubscribe(client.ctx, &unsub)
	if err != nil {
		return err
	}
	return nil
}

// Publish to a topic
func (client *MQTTClient) Publish(topic string, msg string, qos byte) error {
	ack, err := client.pahoClient.Publish(client.ctx, &paho.Publish{
		Topic:   topic,
		Payload: []byte(msg),
		QoS:     qos,
		Retain:  true,
		Properties: &paho.PublishProperties{
			MessageExpiry: func(v uint32) *uint32 { return &v }(60),
		},
	})
	if err != nil {
		return err
	}
	if ack.ReasonCode != *paho.Byte(0x00) {
		return fmt.Errorf("Connection failed with reason code %d", ack.ReasonCode)
	}
	return nil
}

Debug output

=== RUN   TestMQTTPublish
[mqtt-client]: 2024/12/05 00:35:45 connecting
[mqtt-client]: 2024/12/05 00:35:45 sending CONNECT
[mqtt-client]: 2024/12/05 00:35:45 waiting for CONNACK/AUTH
[mqtt-client]: 2024/12/05 00:35:47 received CONNACK
[mqtt-client]: 2024/12/05 00:35:47 no session present - cleaning session state
[mqtt-client]: 2024/12/05 00:35:47 State.clean() called
[mqtt-client]: 2024/12/05 00:35:47 0 inflight transactions upon connection
[mqtt-client]: 2024/12/05 00:35:47 retransmitting 0 messages
[mqtt-client]: 2024/12/05 00:35:47 received CONNACK, starting PingHandler
[mqtt-client]: 2024/12/05 00:35:47 starting publish packets loop
[mqtt-client]: 2024/12/05 00:35:47 starting incoming
[mqtt-client]: 2024/12/05 00:35:47 sending message to test
[mqtt-client]: 2024/12/05 00:35:47 sending QoS12 message
[mqtt-client]: 2024/12/05 00:35:49 received PUBACK packet with id  1
[mqtt-client]: 2024/12/05 00:35:49 received PINGRESP
[mqtt-client]: 2024/12/05 00:35:49 disconnecting &{<nil> 0}
[mqtt-client]: 2024/12/05 00:35:49 client stop requested
[mqtt-client]: 2024/12/05 00:35:49 returning from ping handler worker
[mqtt-client]: 2024/12/05 00:35:49 conn closed
[mqtt-client]: 2024/12/05 00:35:49 acks tracker reset
[mqtt-client]: 2024/12/05 00:35:49 sessionExpiryInterval is 0 and connection lost - cleaning session state
[mqtt-client]: 2024/12/05 00:35:49 State.clean() called
[mqtt-client]: 2024/12/05 00:35:49 client stopping, incoming stopping
[mqtt-client]: 2024/12/05 00:35:49 session updated, waiting on workers
[mqtt-client]: 2024/12/05 00:35:49 returning from publish packets loop worker
[mqtt-client]: 2024/12/05 00:35:49 returning from incoming worker
[mqtt-client]: 2024/12/05 00:35:49 workers done
--- PASS: TestMQTTPublish (5.43s)
[mqtt-client]: 2024/12/05 00:35:49 error called: read tcp [2409:40e1:3012:73e0:a847:805c:d865:a71]:59868->[2001:41d0:a:6f1c::1]:8885: use of closed network connection
PASS
[mqtt-client]: 2024/12/05 00:35:49 client error: read tcp [2409:40e1:3012:73e0:a847:805c:d865:a71]:59868->[2001:41d0:a:6f1c::1]:8885: use of closed network connection
ok      github.com/CypherpunkSamurai/webpad-server/backend/rtc/signaling        5.645s

Expected behaviour
Unsure.

Software used:

  • Go: go version go1.23.3 windows/amd64
  • Paho github.com/eclipse/paho.golang v0.22.0

Additional context
No

@MattBrittan
Copy link
Contributor

MattBrittan commented Dec 4, 2024

This is not unexpected. Paho is a fairly low level library and calls the ErrorLogger whenever an error is detected, even if the error is to be expected based on user action (e.g. if you trigger a disconnection use of closed network connection is expected). This is deliberate to keep paho simple and avoid the need for syncronisation between goroutines (as this was a source of bugs in the v3 client).

See here for how Autopaho handles this.

The documentation should probably be ammended to make this more clear.

Adding notes from slack:

I'd suggesting using autopaho unless you need the lower level functionality of paho; whilst on the face of it MQTT is a simple protocol there are a lot of edge cases, and handling things like reconnections gets complicated...

With paho seeing some errors logged is expected during normal operations (there are multiple goroutines running and they will raise errors upon shutdown even if it's deliberate - i.e. a call to Disconnect) - here is how autopaho handles this (it basically ignores errors after Disconnect is called). This is something that we probably should improve in the docs, but as I expect most users to start with autopaho it's not something I've given much focus (the main paho code was written before I became involved, I wrote autopaho because paho was difficult to use correctly and things like session state needed to outlive a single paho instance).

@CypherpunkSamurai
Copy link
Author

CypherpunkSamurai commented Dec 5, 2024

So, it's ok to not handle errors at a Disconnect?

Can i ignore those errors? cause this project only uses mqtt partially (webrtc signalling) so i won't require to handle any errors later on. like a function to pass to OnServerDisconnect to just handle / dump errors to log.

@MattBrittan
Copy link
Contributor

So, it's ok to not handle errors at a Disconnect?

Correct, Disconnect transmits a Disconnect packet and then shuts down the connection (so errors are to be expected at that point).

@CypherpunkSamurai
Copy link
Author

What kind of errors, and how can we gracefully shut down?

I have the following requirements

  1. Create a MQTT Client
  2. Create a random topic string, write the webrtc offer to the string and expect an answer in "/answer" topic of that topic string.
  3. Close MQTT client gracefully when answer is received

very basic webrtc signaling :D requirement, so i don't really plan on handling a lot of error cases, maybe can ignore a few.

@MattBrittan
Copy link
Contributor

Calling Disconnect means that you are asking the library to drop the connection pretty much immediatly (it sends a Disconnect packet to the broker, as per the protocol spec, then drops the connection). So what errors would be of interest after this point? (in your situation you could just drop the network connection, a clean disconnect is not really needed as you don't care about the session state etc).

Sorry - I don't really have time to provide support here (this area is really intended for bugs, I will leave this open because the docs for paho.OnError do need to cover this situation better).

@CypherpunkSamurai
Copy link
Author

CypherpunkSamurai commented Dec 5, 2024

Sounds cool, I guess error handing isn't required in my case then. Thanks for the help

Appreciate your time. Leaving this issue stale for now 👍🏻

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants