We have a Flask app that needs to send messages to different subscriptions on a VernqMQ broker. We are using paho.mqtt library to connect to the broker, and we are using JWT-based authentication, which is validated ok the broker with a custom webhook script. However, the JWT expires every seven days, so the Flask app needs to renew that token and reconnect. We are currently handling this in the on_disconnect event. This setup works, however, after a while, we are starting to get this error message :
can't register client {xxxx} with username 'xxxx' due to register_subscriber_retry_exhausted
The message appears hundreds of times on the broker's log files, but we can't find any information on this error. Here is the code for how we establish/renew the MQTT broker connection within Flask:
def mqtt_on_connect(client, userdata, flags, rc):
if rc == 0:
print("MQTT broker Connection successful")
else:
print("MQTT broker Connection attempt failed.")
def mqtt_on_disconnect(client, userdata, rc):
token = crypto.get_jwt('our_username').decode()
client.username_pw_set("our_username", password=f'Bearer {token}')
try:
client.connect(
app.config['MQTT_BROKER_URL'],
port=app.config['MQTT_BROKER_PORT']
)
except Exception as ex:
print("Could not reconnect to broker.")
print(ex)
def establish_mqtt_connection(app, get_jwt):
token = get_jwt(
'our_username'
).decode()
client_id = str(uuid.uuid4())
if app.config['ENVIRONMENT'] not in {'LOCAL', 'TEST'}:
client_id = str(uuid.uuid4())
mqtt_client = mqtt.Client(
f'control_{client_id}',
clean_session=True,
transport="websockets"
)
mqtt_client.tls_set(
ca_certs=certifi.where(),
tls_version=ssl.PROTOCOL_TLSv1_2
)
mqtt_client.on_connect = mqtt_on_connect
mqtt_client.on_disconnect = mqtt_on_disconnect
mqtt_client.username_pw_set("our_username", password=f'Bearer {token}')
host = app.config['MQTT_BROKER_URL']
mqtt_client._host = host
mqtt_client.loop_start()
try:
mqtt_client.connect(
app.config['MQTT_BROKER_URL'],
port=app.config['MQTT_BROKER_PORT'],
)
except Exception as ex:
print(f'Exception establishing connection to MQTT broker: {ex}')
app.config['MQTT_CLIENT'] = mqtt_client
Why does this error message happen and what can we do to fix it? Is there a problem with how we are setting up the broker connection? Any info on this willbe greatly appreciated.
Additional info: the broker is a single instance of VerneMQ, not a cluster. All configuration values are set to defaults.