2

We have a Flask app that needs to send messages to different subscriptions on a VernqMQ broker. We are using paho.mqtt library to connect to the broker, and we are using JWT-based authentication, which is validated ok the broker with a custom webhook script. However, the JWT expires every seven days, so the Flask app needs to renew that token and reconnect. We are currently handling this in the on_disconnect event. This setup works, however, after a while, we are starting to get this error message :

can't register client {xxxx} with username 'xxxx' due to register_subscriber_retry_exhausted

The message appears hundreds of times on the broker's log files, but we can't find any information on this error. Here is the code for how we establish/renew the MQTT broker connection within Flask:

def mqtt_on_connect(client, userdata, flags, rc):
    if rc == 0:
        print("MQTT broker Connection successful")
    else:
        print("MQTT broker Connection attempt failed.")


def mqtt_on_disconnect(client, userdata, rc):
    token = crypto.get_jwt('our_username').decode()
    client.username_pw_set("our_username", password=f'Bearer {token}')
    try:
        client.connect(
            app.config['MQTT_BROKER_URL'],
            port=app.config['MQTT_BROKER_PORT']
        )
    except Exception as ex:
        print("Could not reconnect to broker.")
        print(ex)


def establish_mqtt_connection(app, get_jwt):
    token = get_jwt(
        'our_username'
    ).decode()
    client_id = str(uuid.uuid4())
    if app.config['ENVIRONMENT'] not in {'LOCAL', 'TEST'}:
        client_id = str(uuid.uuid4())
        mqtt_client = mqtt.Client(
        f'control_{client_id}',
        clean_session=True,
        transport="websockets"
    )
    mqtt_client.tls_set(
        ca_certs=certifi.where(),
        tls_version=ssl.PROTOCOL_TLSv1_2
    )
    mqtt_client.on_connect = mqtt_on_connect
    mqtt_client.on_disconnect = mqtt_on_disconnect
    mqtt_client.username_pw_set("our_username", password=f'Bearer {token}')
    host = app.config['MQTT_BROKER_URL']
    mqtt_client._host = host
    mqtt_client.loop_start()
    try:
        mqtt_client.connect(
            app.config['MQTT_BROKER_URL'],
            port=app.config['MQTT_BROKER_PORT'],
        )
    except Exception as ex:
        print(f'Exception establishing connection to MQTT broker: {ex}')

    app.config['MQTT_CLIENT'] = mqtt_client

Why does this error message happen and what can we do to fix it? Is there a problem with how we are setting up the broker connection? Any info on this willbe greatly appreciated.

Additional info: the broker is a single instance of VerneMQ, not a cluster. All configuration values are set to defaults.

tutiplain
  • 1,427
  • 4
  • 19
  • 37

0 Answers0