Al trabajar con WebSockets en la ESP32 usando AsyncTCP y ESPAsyncWebServer con AsyncWebSocket puede que te haya aparecido el mensaje:
Too many messages queued: closing connection
O el mensaje:
Too many messages queued: discarding new message
En este artículo te voy a explicar las causas y algunas soluciones que yo he encontrado.
Solución
Para que ya no te aparezca este mensaje de error:
- No llenes la cola tan rápido, agrega un delay antes de enviar datos usando
vTaskDelay
o su equivalente en tu plataforma - Aumenta el tamaño de la cola cambiando
WS_MAX_QUEUED_MESSAGES
- Descarta mensajes cuando la cola esté llena con
setCloseClientOnQueueFull
- Revisa el estado de la cola antes de enviar mensajes con
queueLen
- Haz que el cliente consuma los mensajes más rápido
Voy a explicar todos estos puntos con más detenimiento a continuación, así como la causa del Too many messages queued
¿Por qué ocurre el Too many messages queued?
Este mensaje de error va a aparecer cuando
haya muchos mensajes encolados. Cada vez
que usamos un ws.binaryAll()
, ws.binary()
, ws.text()
,
ws.textAll()
, client.binary()
o client.text()
se añade
un mensaje a la cola.
Si revisamos el código fuente
de AsyncWebSocket.cpp en la línea 409
dentro de la función _queueMessage
vemos
el siguiente fragmento que hace un emplace_back
encolando el mensaje:
_messageQueue.emplace_back(buffer, opcode, mask);
if (_client && _client->canSend()) {
_runQueue();
}
Y ahí mismo tiene la validación que ocasiona el Too many messages queued:
if (_messageQueue.size() >= WS_MAX_QUEUED_MESSAGES) {
if (closeWhenFull) {
_status = WS_DISCONNECTED;
if (_client) {
#ifdef ESP32
/*
Unlocking has to be called before return execution otherwise std::unique_lock ::~unique_lock() will get an exception pthread_mutex_unlock.
Due to _client->close(true) shall call the callback function _onDisconnect()
The calling flow _onDisconnect() --> _handleDisconnect() --> ~AsyncWebSocketClient()
*/
lock.unlock();
#endif
_client->close(true);
}
async_ws_log_e("Too many messages queued: closing connection");
} else {
async_ws_log_e("Too many messages queued: discarding new message");
}
return false;
}
Básicamente el mensaje nos está diciendo que hay muchos mensajes encolados y que no se pueden añadir más, así de simple. Hay que esperar a que el cliente consuma esos mensajes antes de enviarle más.
Nota: fíjate que existe WS_MAX_QUEUED_MESSAGES
que indica el límite. Vamos a
ver cómo cambiar ese límite más adelante como una solución.
Estado de cola
Puedes revisar el estado de la cola antes de escribirle mensajes para saber qué tan llena está.
Para conocer la longitud de la cola puedes invocar a client.queueLen()
,
y para saber si puedes enviar invoca a client.canSend()
que internamente comprueba
que la cola no está llena.
Si quieres escribir en todos los clientes comprobando la cola invoca
a ws.availableForWriteAll
o ws.availableForWrite()
indicando el id
de cliente.
Esto te permite conocer el estado de la cola y, por ejemplo, hacer un
vTaskDelay
más largo.
Cerrar conexión o descartar mensajes: closeWhenFull
Hay 2 variantes del mensaje Too many messages queued, uno dice closing connection y otro dice discarding new message.
Este mensaje y el comportamiento va a cambiar
según el valor closeWhenFull
que por defecto es true
.
Si closeWhenFull
es true
entonces la conexión se va a cerrar
cuando la cola se llene.
Por otro lado, cuando closeWhenFull
es false
el mensaje que
queremos encolar va a ser descartado, es decir, simplemente no se va
a encolar y habrá pérdida de datos pero la conexión seguirá activa.
Podemos configurar esto en el evento de un cliente conectado
invocando a client->setCloseClientOnQueueFull
.
Por ejemplo:
ws.onEvent([](AsyncWebSocket *, AsyncWebSocketClient *client, AwsEventType type, void *, uint8_t *, size_t)
{
switch (type) {
case WS_EVT_CONNECT:
Serial.printf("Client #%u connected from %s\n", client->id(), client->remoteIP().toString().c_str());
client->setCloseClientOnQueueFull(false); // Aquí le estoy indicando que NO cierre la conexión cuando la cola esté llena
break;
case WS_EVT_DISCONNECT:
// Un cliente se ha desconectado
Serial.printf("Client #%u disconnected\n", client->id());
break;
case WS_EVT_DATA:
// Se han recibido datos del cliente
//handleWebSocketMessage(client, arg, data, len); // Función auxiliar para procesar el mensaje
break;
case WS_EVT_PONG:
// Respuesta a un ping
break;
case WS_EVT_ERROR:
// Error en la conexión
//Serial.printf("Client #%u error(%u): %s\n", client->id(), *((uint16_t*)arg), (char*)data);
break;
} });
Es importante que sepas que esto no hace más grande la cola de datos, solo cambia el comportamiento para cuando la misma está llena.
Si estás dispuesto a perder mensajes y no quieres que la conexión se cierre
entonces deja el valor en false
. Por otro lado, si lo dejas en true
tal vez debas modificar tu cliente para que se reconecte cuando pierda la conexión, misma
que será cerrada cuando la cola se llene.
Esperar en cada escritura del WebSocket
Esta solución para eliminar el mensaje Too many messages queued
espera un poco antes de enviar un mensaje. Puedes hacer un delay
o un vTaskDelay(pdMS_TO_TICKS(500))
donde el 500
es la cantidad de milisegundos.
Por ejemplo:
void sendTouch(void *)
{
for (;;)
{
/*Este es el valor que quiero enviar infinitamente usando WebSockets:*/
for (AsyncWebSocketClient &client : ws.getClients())
{
if (client.status() == WS_CONNECTED)
{
client.text(String(val));
vTaskDelay(pdMS_TO_TICKS(100)); // <- Aquí estoy usando vTaskDelay
}
}
}
}
En este ejemplo yo espero 100ms antes de enviar el valor de nuevo. Así dejo que la cola se vacíe un poco y no la saturo.
Modificando límite
Como puedes ver también es posible cambiar el valor
de WS_MAX_QUEUED_MESSAGES
, para ello vemos que
en AsyncWebSocket.h tiene:
#ifndef WS_MAX_QUEUED_MESSAGES
#define WS_MAX_QUEUED_MESSAGES 32
#endif
Como puedes ver, el máximo para la cola es
32. Y como tiene un ifndef
podemos definir
ese valor antes de incluir la librería.
Para ello haz lo siguiente en tu código antes de hacer
el #include
:
#define WS_MAX_QUEUED_MESSAGES 64 // <- Aquí lo estoy cambiando
// Y ya después hago los include
#include <Arduino.h>
#include <driver/i2s.h>
#include <WiFi.h>
#include <AsyncTCP.h>
#include <ESPAsyncWebServer.h>
Ajusta el valor según tus necesidades y la cantidad de RAM que tengas disponible.
Tal vez esto solucione el problema pero recuerda: si el cliente no consume la cola lo suficientemente rápido o no esperas un poco antes de encolar cada mensaje entonces puede que, eventualmente, la cola se vuelva a llenar sin importar el límite.
El cliente también debe ser rápido
Si el cliente del WS no consume los mensajes muy rápido entonces la cola se va a llenar independientemente de las soluciones que hayas probado.
Yo tengo un ejemplo con Python:
IP_ESP = "192.168.0.8"
uri = f"ws://{IP_ESP}/touch"
async def connect_ws():
while True:
try:
print("Conectando...")
async with websockets.connect(uri) as ws:
while True:
try:
msg = await asyncio.wait_for(ws.recv(), timeout=5)
print("Mensaje:", msg)
Fíjate que leo el mensaje usando await asyncio.wait_for(ws.recv())
y
luego simplemente lo imprimo usando print
. Esto es muy rápido y consume
los mensajes rápido, pero si tú haces algo como:
IP_ESP = "192.168.0.8"
uri = f"ws://{IP_ESP}/touch"
async def connect_ws():
while True:
try:
print("Conectando...")
async with websockets.connect(uri) as ws:
while True:
try:
msg = await asyncio.wait_for(ws.recv(), timeout=5)
procesar_mensaje_tardado(msg)
Suponiendo que en procesar_mensaje_tardado
estás dibujando con matplotlib
o algo que bloquea la ejecución aunque sea unos segundos entonces tus mensajes no se van
a consumir lo suficientemente rápido; ahí tendrías que buscar el balance
entre el tiempo de espera entre envío de mensajes de la ESP32 y
el consumo en tu cliente.
También puedes usar varios hilos en el cliente. Uno de ellos lee el mensaje y lo añade a una cola (del cliente) y el otro los procesa ya sea dibujándolos, imprimiéndolos o haciendo cualquier cálculo intensivo.
Obviamente todo esto va a cambiar dependiendo de tu cliente y de lo que necesites.
No es lo mismo enviar audio en tiempo real (mismo que no puede ser cortado) sin interrupciones a enviar el valor de un sensor cada segundo o medio segundo.