feat(ws): heartbeat to prevent irregular connection loss such as wifi connection change
This commit is contained in:
parent
6fdfa76a14
commit
5bab7fed51
|
@ -52,7 +52,7 @@ static esp_err_t api_post_handler(httpd_req_t *req)
|
|||
goto put_buf;
|
||||
}
|
||||
|
||||
ESP_LOGI(TAG, "=========== RECEIVED DATA ==========");
|
||||
ESP_LOGI(TAG, "=========== RECEIVED DATA ========== %d", httpd_req_to_sockfd(req));
|
||||
ESP_LOGI(TAG, "%.*s", data_len, buf);
|
||||
ESP_LOGI(TAG, "====================================");
|
||||
ESP_LOGI(TAG, "heap min: %lu, cur: %lu", esp_get_minimum_free_heap_size(), esp_get_free_heap_size());
|
||||
|
@ -128,10 +128,8 @@ void async_send_out_cb(void *arg, int module_status)
|
|||
{
|
||||
post_request_t *req = arg;
|
||||
if (module_status != API_JSON_OK) {
|
||||
/* BUG: httpd_req_to_sockfd cause crash, fd not closed for the moment
|
||||
* issue is opened on esp-idf github */
|
||||
// httpd_sess_trigger_close(req->req_out->handle,
|
||||
// httpd_req_to_sockfd(req->req_out->handle));
|
||||
httpd_sess_trigger_close(req->req_out->handle,
|
||||
httpd_req_to_sockfd(req->req_out));
|
||||
}
|
||||
|
||||
uri_api_send_out(req->req_out, req, module_status);
|
||||
|
|
|
@ -10,11 +10,13 @@
|
|||
|
||||
#include <esp_http_server.h>
|
||||
#include <esp_log.h>
|
||||
#include <list.h>
|
||||
|
||||
#include <cJSON.h>
|
||||
#include <errno.h>
|
||||
|
||||
#include <freertos/FreeRTOS.h>
|
||||
#include <freertos/queue.h>
|
||||
|
||||
#define TAG __FILE_NAME__
|
||||
#define MSG_BUSY_ERROR "{\"error\":\"Resource busy\"}"
|
||||
#define MSG_JSON_ERROR "{\"error\":\"JSON parse error\"}"
|
||||
|
@ -33,30 +35,41 @@ typedef struct ws_msg_t {
|
|||
|
||||
#define PAYLOAD_LEN static_buffer_get_buf_size() - sizeof(ws_msg_t)
|
||||
|
||||
struct ws_ctx_t {
|
||||
struct ws_client_info_t {
|
||||
httpd_handle_t hd;
|
||||
int fd; /* range 58 ~ 58+max_socket */
|
||||
} clients[CONFIG_LWIP_MAX_SOCKETS+1];
|
||||
TaskHandle_t task_heartbeat;
|
||||
int8_t client_count;
|
||||
} ws_ctx;
|
||||
|
||||
static int ws_on_text_data(httpd_req_t *req, ws_msg_t *ws_msg);
|
||||
static int ws_on_binary_data(httpd_req_t *req, ws_msg_t *ws_msg);
|
||||
static int ws_on_socket_open(httpd_req_t *req);
|
||||
static int ws_on_close(httpd_req_t *req, ws_msg_t *msg);
|
||||
|
||||
static void ws_async_resp(void *arg);
|
||||
static void async_send_out_cb(void *arg, int module_status);
|
||||
static void json_to_text(ws_msg_t *msg);
|
||||
|
||||
static void test(void *arg)
|
||||
{
|
||||
ESP_LOGE(TAG, "this is a test");
|
||||
}
|
||||
/* Heartbeat related */
|
||||
static inline void ws_add_fd(httpd_handle_t hd, int fd);
|
||||
static inline void ws_rm_fd(int fd);
|
||||
|
||||
_Noreturn static void heartbeat_task(void *arg);
|
||||
|
||||
|
||||
|
||||
static esp_err_t ws_req_handler(httpd_req_t *req)
|
||||
{
|
||||
if (unlikely(req->method == HTTP_GET)) {
|
||||
int sock_fd = httpd_req_to_sockfd(req);
|
||||
/**
|
||||
* TODO: add socket to array
|
||||
* */
|
||||
ESP_LOGI(TAG, "ws open: %d", sock_fd);
|
||||
return ESP_OK;
|
||||
return ws_on_socket_open(req);
|
||||
}
|
||||
|
||||
ESP_LOGI(TAG, "ws_handler: httpd_handle_t=%p, sockfd=%d, client_info:%d", req->handle,
|
||||
httpd_req_to_sockfd(req), httpd_ws_get_fd_info(req->handle, httpd_req_to_sockfd(req)));
|
||||
ESP_LOGI(TAG, "ws_handler: httpd_handle_t=%p, sockfd=%d, client_info:%d, client_count: %d", req->handle,
|
||||
httpd_req_to_sockfd(req), httpd_ws_get_fd_info(req->handle, httpd_req_to_sockfd(req)),
|
||||
ws_ctx.client_count);
|
||||
|
||||
int err = ESP_OK;
|
||||
httpd_ws_frame_t *ws_pkt;
|
||||
|
@ -79,52 +92,55 @@ static esp_err_t ws_req_handler(httpd_req_t *req)
|
|||
err = httpd_ws_recv_frame(req, ws_pkt, 0);
|
||||
if (unlikely(err != ESP_OK)) {
|
||||
ESP_LOGE(TAG, "ws recv len error");
|
||||
goto end;
|
||||
return ws_on_close(req, ws_msg);
|
||||
}
|
||||
ESP_LOGI(TAG, "frame len: %d, type: %d", ws_pkt->len, ws_pkt->type);
|
||||
if (unlikely(ws_pkt->len > PAYLOAD_LEN)) {
|
||||
ESP_LOGE(TAG, "frame len is too big");
|
||||
err = ESP_FAIL;
|
||||
goto end;
|
||||
}
|
||||
|
||||
ws_pkt->payload = ws_msg->payload;
|
||||
/* read incoming data */
|
||||
err = httpd_ws_recv_frame(req, ws_pkt, ws_pkt->len);
|
||||
if (unlikely(err != ESP_OK)) {
|
||||
ESP_LOGE(TAG, "ws recv data error");
|
||||
goto end;
|
||||
return ws_on_close(req, ws_msg);
|
||||
}
|
||||
|
||||
switch (ws_pkt->type) {
|
||||
case HTTPD_WS_TYPE_CONTINUE:
|
||||
break;
|
||||
goto end;
|
||||
case HTTPD_WS_TYPE_TEXT:
|
||||
ws_pkt->payload = ws_msg->payload;
|
||||
/* read incoming data */
|
||||
err = httpd_ws_recv_frame(req, ws_pkt, ws_pkt->len);
|
||||
if (unlikely(err != ESP_OK)) {
|
||||
ESP_LOGE(TAG, "ws recv data error");
|
||||
return ws_on_close(req, ws_msg);
|
||||
}
|
||||
return ws_on_text_data(req, ws_msg);
|
||||
case HTTPD_WS_TYPE_BINARY:
|
||||
ws_pkt->payload = ws_msg->payload;
|
||||
/* read incoming data */
|
||||
err = httpd_ws_recv_frame(req, ws_pkt, ws_pkt->len);
|
||||
if (unlikely(err != ESP_OK)) {
|
||||
ESP_LOGE(TAG, "ws recv data error");
|
||||
return ws_on_close(req, ws_msg);
|
||||
}
|
||||
return ws_on_binary_data(req, ws_msg);
|
||||
case HTTPD_WS_TYPE_CLOSE:
|
||||
/* Read the rest of the CLOSE frame and response */
|
||||
/* Please refer to RFC6455 Section 5.5.1 for more details */
|
||||
ws_pkt->len = 0;
|
||||
ws_pkt->type = HTTPD_WS_TYPE_CLOSE;
|
||||
err = httpd_ws_send_frame(req, ws_pkt);
|
||||
break;
|
||||
return ws_on_close(req, ws_msg);
|
||||
case HTTPD_WS_TYPE_PING:
|
||||
/* Now turn the frame to PONG */
|
||||
ws_pkt->type = HTTPD_WS_TYPE_PONG;
|
||||
err = httpd_ws_send_frame(req, ws_pkt);
|
||||
break;
|
||||
goto end;
|
||||
case HTTPD_WS_TYPE_PONG:
|
||||
err = ESP_OK;
|
||||
break;
|
||||
goto end;
|
||||
default:
|
||||
err = ESP_FAIL;
|
||||
goto end;
|
||||
}
|
||||
|
||||
end:
|
||||
static_buffer_put(ws_msg);
|
||||
return err;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* REGISTER MODULE
|
||||
* */
|
||||
|
@ -141,11 +157,14 @@ static const httpd_uri_t uri_api = {
|
|||
|
||||
static int WS_REQ_INIT(const httpd_uri_t **uri_conf) {
|
||||
*uri_conf = &uri_api;
|
||||
xTaskCreate(heartbeat_task, "hb task", 1024, NULL, 3, &ws_ctx.task_heartbeat);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int WS_REQ_EXIT(const httpd_uri_t **uri_conf) {
|
||||
*uri_conf = &uri_api;
|
||||
vTaskDelete(ws_ctx.task_heartbeat);
|
||||
ws_ctx.task_heartbeat = NULL;
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -207,10 +226,36 @@ put_buf:
|
|||
|
||||
int ws_on_binary_data(httpd_req_t *req, ws_msg_t *ws_msg)
|
||||
{
|
||||
(void) req;
|
||||
static_buffer_put(ws_msg);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int ws_on_socket_open(httpd_req_t *req)
|
||||
{
|
||||
int sock_fd = httpd_req_to_sockfd(req);
|
||||
ws_add_fd(req->handle, sock_fd);
|
||||
ESP_LOGI(TAG, "ws open: %d", sock_fd);
|
||||
return ESP_OK;
|
||||
}
|
||||
|
||||
static int ws_on_close(httpd_req_t *req, ws_msg_t *msg)
|
||||
{
|
||||
/* Read the rest of the CLOSE frame and response */
|
||||
/* Please refer to RFC6455 Section 5.5.1 for more details */
|
||||
msg->ws_pkt.len = 0;
|
||||
msg->ws_pkt.type = HTTPD_WS_TYPE_CLOSE;
|
||||
ESP_LOGI(TAG, "ws %d closed", httpd_req_to_sockfd(req));
|
||||
ws_rm_fd(httpd_req_to_sockfd(req));
|
||||
int err = httpd_ws_send_frame(req, &msg->ws_pkt);
|
||||
if (err) {
|
||||
ESP_LOGE(TAG, "on close %s", esp_err_to_name(err));
|
||||
}
|
||||
httpd_sess_trigger_close(req->handle, httpd_req_to_sockfd(req));
|
||||
static_buffer_put(msg);
|
||||
return err;
|
||||
}
|
||||
|
||||
static void ws_async_resp(void *arg)
|
||||
{
|
||||
ws_msg_t *req = arg;
|
||||
|
@ -260,4 +305,71 @@ void json_to_text(ws_msg_t *ws_msg)
|
|||
ws_pkt->final = 1;
|
||||
}
|
||||
ws_pkt->len = strlen((char *) ws_pkt->payload);
|
||||
}
|
||||
|
||||
|
||||
/* Clients array manipulation function
|
||||
* */
|
||||
static inline void ws_add_fd(httpd_handle_t hd, int fd)
|
||||
{
|
||||
if (ws_ctx.client_count > CONFIG_LWIP_MAX_SOCKETS) {
|
||||
return;
|
||||
}
|
||||
ws_ctx.clients[ws_ctx.client_count].fd = fd;
|
||||
ws_ctx.clients[ws_ctx.client_count].hd = hd;
|
||||
ws_ctx.client_count++;
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief replace the fd and hd to be removed by the last item
|
||||
* all front items are valid
|
||||
*/
|
||||
static inline void ws_rm_fd(int fd)
|
||||
{
|
||||
for (int i = 0; i < ws_ctx.client_count; ++i) {
|
||||
if (ws_ctx.clients[i].fd == fd) {
|
||||
ws_ctx.clients[i].fd = ws_ctx.clients[ws_ctx.client_count-1].fd;
|
||||
ws_ctx.clients[i].hd = ws_ctx.clients[ws_ctx.client_count-1].hd;
|
||||
ws_ctx.client_count--;
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static void send_heartbeat(void *arg)
|
||||
{
|
||||
static httpd_ws_frame_t ws_pkt = {
|
||||
.len = 0,
|
||||
.payload = NULL,
|
||||
.type = HTTPD_WS_TYPE_TEXT,
|
||||
};
|
||||
|
||||
struct ws_client_info_t *client_info = arg;
|
||||
int err;
|
||||
err = httpd_ws_send_frame_async(client_info->hd, client_info->fd, &ws_pkt);
|
||||
if (err) {
|
||||
ESP_LOGE(TAG, "hb send err: %s", esp_err_to_name(err));
|
||||
}
|
||||
}
|
||||
|
||||
static inline void ws_broadcast_heartbeat()
|
||||
{
|
||||
int err;
|
||||
for (int i = 0; i < ws_ctx.client_count; ++i) {
|
||||
err = httpd_queue_work(ws_ctx.clients[i].hd, send_heartbeat, &ws_ctx.clients[i]);
|
||||
if (err) {
|
||||
ESP_LOGE(TAG, "hb queue work err: %s", esp_err_to_name(err));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
_Noreturn
|
||||
void heartbeat_task(void *arg)
|
||||
{
|
||||
(void) arg;
|
||||
while (1) {
|
||||
vTaskDelay(pdMS_TO_TICKS(2000));
|
||||
ws_broadcast_heartbeat();
|
||||
}
|
||||
};
|
||||
|
||||
|
|
|
@ -105,6 +105,8 @@ void start_webserver(void)
|
|||
config.uri_match_fn = uri_match;
|
||||
config.open_fn = web_server_on_open;
|
||||
config.close_fn = web_server_on_close;
|
||||
config.keep_alive_enable = 1;
|
||||
config.keep_alive_count = 1;
|
||||
|
||||
ESP_LOGI(TAG, "Starting server on port: '%d'", config.server_port);
|
||||
if ((err = httpd_start(&http_server, &config)) != ESP_OK) {
|
||||
|
|
Loading…
Reference in New Issue