/* * SPDX-FileCopyrightText: 2024 kerms * * SPDX-License-Identifier: Apache-2.0 */ #include "web_uri_module.h" #include "api_json_router.h" #include "memory_pool.h" #include #include #include #include #include #include #define TAG __FILE_NAME__ #define MSG_BUSY_ERROR "{\"error\":\"Resource busy\"}" #define MSG_JSON_ERROR "{\"error\":\"JSON parse error\"}" #define MSG_BAD_REQUEST_ERROR "{\"error\":\"Bad json request\"}" #define MSG_SEND_JSON_ERROR "{\"error\":\"JSON generation error\"}" #define MSG_INTERNAL_ERROR "{\"error\":\"Internal error\"}" #define WS_MODULE_ID 3 typedef struct ws_msg_t { api_json_req_t json; api_json_module_async_t async; httpd_handle_t hd; int fd; httpd_ws_frame_t ws_pkt; uint8_t delim[0]; uint8_t payload[0]; /* size = static_buf_size - offsetof(this, delim) */ } ws_msg_t; #define PAYLOAD_LEN memory_pool_get_buf_size() - sizeof(ws_msg_t) struct ws_ctx_t { struct ws_client_info_t { httpd_handle_t hd; int fd; /* range 64 - max_socket ~ 64 */ } clients[CONFIG_LWIP_MAX_SOCKETS + 1]; TaskHandle_t task_heartbeat; int8_t client_count; } ws_ctx; static int ws_on_text_data(httpd_req_t *req, ws_msg_t *ws_msg); static int ws_on_binary_data(httpd_req_t *req, ws_msg_t *ws_msg); static int ws_on_socket_open(httpd_req_t *req); static int ws_on_close(httpd_req_t *req, httpd_ws_frame_t *ws_pkt, void *msg); static void ws_async_resp(void *arg); static void async_send_out_cb(void *arg, int module_status); static void json_to_text(ws_msg_t *msg); /* Heartbeat related */ static inline int8_t ws_add_fd(httpd_handle_t hd, int fd); static inline void ws_rm_fd(int fd); _Noreturn static void heartbeat_task(void *arg); static esp_err_t ws_req_handler(httpd_req_t *req) { if (unlikely(req->method == HTTP_GET)) { return ws_on_socket_open(req); } ESP_LOGI(TAG, "ws_handler: httpd_handle_t=%p, sockfd=%d, client_info:%d, client_count: %d", req->handle, httpd_req_to_sockfd(req), httpd_ws_get_fd_info(req->handle, httpd_req_to_sockfd(req)), ws_ctx.client_count); int err = ESP_OK; httpd_ws_frame_t *ws_pkt; ws_msg_t *ws_msg; ws_msg = memory_pool_get(pdMS_TO_TICKS(10)); if (unlikely(ws_msg == NULL)) { httpd_ws_frame_t resp_pkt; resp_pkt.type = HTTPD_WS_TYPE_TEXT; resp_pkt.len = strlen(MSG_BUSY_ERROR); resp_pkt.payload = (uint8_t *)MSG_BUSY_ERROR; resp_pkt.final = 1; httpd_ws_send_frame_async(req->handle, httpd_req_to_sockfd(req), &resp_pkt); goto end; } ws_pkt = &ws_msg->ws_pkt; ws_pkt->len = 0; /* get and check frame size */ err = httpd_ws_recv_frame(req, ws_pkt, 0); if (unlikely(err != ESP_OK)) { ESP_LOGE(TAG, "ws recv len error"); return ws_on_close(req, ws_pkt, ws_msg); } ESP_LOGI(TAG, "frame len: %d, type: %d", ws_pkt->len, ws_pkt->type); if (unlikely(ws_pkt->len > PAYLOAD_LEN)) { ESP_LOGE(TAG, "frame len is too big"); return ws_on_close(req, ws_pkt, ws_msg); } switch (ws_pkt->type) { case HTTPD_WS_TYPE_CONTINUE: goto end; case HTTPD_WS_TYPE_TEXT: ws_pkt->payload = ws_msg->payload; /* read incoming data */ err = httpd_ws_recv_frame(req, ws_pkt, ws_pkt->len); if (unlikely(err != ESP_OK)) { ESP_LOGE(TAG, "ws recv data error"); return ws_on_close(req, ws_pkt, ws_msg); } return ws_on_text_data(req, ws_msg); case HTTPD_WS_TYPE_BINARY: return ws_on_binary_data(req, ws_msg); case HTTPD_WS_TYPE_CLOSE: return ws_on_close(req, ws_pkt, ws_msg); case HTTPD_WS_TYPE_PING: /* Now turn the frame to PONG */ ws_pkt->type = HTTPD_WS_TYPE_PONG; err = httpd_ws_send_frame(req, ws_pkt); goto end; case HTTPD_WS_TYPE_PONG: err = ESP_OK; goto end; default: err = ESP_FAIL; goto end; } end: memory_pool_put(ws_msg); return err; } void ws_set_err_msg(httpd_ws_frame_t *p_frame, api_json_req_status_e ret); int ws_on_text_data(httpd_req_t *req, ws_msg_t *ws_msg) { int err = ESP_OK; int ret; httpd_ws_frame_t *ws_pkt; ws_pkt = &ws_msg->ws_pkt; ESP_LOGI(TAG, "=========== RECEIVED DATA =========="); ESP_LOGI(TAG, "%.*s", ws_pkt->len, ws_pkt->payload); ESP_LOGI(TAG, "===================================="); ESP_LOGI(TAG, "heap min: %lu, cur: %lu", esp_get_minimum_free_heap_size(), esp_get_free_heap_size()); /* Decode */ ws_msg->json.in = cJSON_ParseWithLength((char *)ws_pkt->payload, ws_pkt->len); if (unlikely(ws_msg->json.in == NULL)) { ws_pkt->payload = (uint8_t *)MSG_JSON_ERROR; ws_pkt->len = strlen(MSG_JSON_ERROR); goto put_buf; } ws_msg->json.out = NULL; ret = api_json_route(&ws_msg->json, &ws_msg->async); if (ret == API_JSON_ASYNC) { ws_msg->hd = req->handle; ws_msg->fd = httpd_req_to_sockfd(req); ws_msg->async.req_task.send_out.cb = async_send_out_cb; ws_msg->async.req_task.send_out.arg = ws_msg; if (req_queue_push_long_run(&ws_msg->async.req_task, pdMS_TO_TICKS(20))) { /* queued failed */ goto end; } /* ret, buf will be release latter in async send out */ return ESP_OK; } else if (ret != API_JSON_OK) { ws_set_err_msg(ws_pkt, ret); goto end; } else if (ws_msg->json.out == NULL) { goto end; } /* api function returns something, send it to http client */ json_to_text(ws_msg); end: cJSON_Delete(ws_msg->json.in); put_buf: httpd_ws_send_frame_async(req->handle, httpd_req_to_sockfd(req), ws_pkt); memory_pool_put(ws_msg); return err; } void ws_set_err_msg(httpd_ws_frame_t *p_frame, api_json_req_status_e ret) { switch (ret) { case API_JSON_BAD_REQUEST: p_frame->len = strlen(MSG_BAD_REQUEST_ERROR); p_frame->payload = (uint8_t *)MSG_BAD_REQUEST_ERROR; p_frame->final = 1; break; case API_JSON_INTERNAL_ERR: p_frame->len = strlen(MSG_INTERNAL_ERROR); p_frame->payload = (uint8_t *)MSG_INTERNAL_ERROR; p_frame->final = 1; break; default: return; } } _Static_assert(sizeof(httpd_ws_frame_t) <= 16, "bin_data_internal padding not sufficient for httpd_ws_frame_t"); int ws_on_binary_data(httpd_req_t *req, ws_msg_t *ws_msg) { (void) req; memory_pool_put(ws_msg); return 0; } int ws_on_socket_open(httpd_req_t *req) { int sock_fd = httpd_req_to_sockfd(req); ESP_LOGI(TAG, "ws open: %d", sock_fd); return ws_add_fd(req->handle, sock_fd); } /** * @param msg must only used as generic ptr, truncated by ws_on_bin() * @return */ static int ws_on_close(httpd_req_t *req, httpd_ws_frame_t *ws_pkt, void *msg) { /* Read the rest of the CLOSE frame and response */ /* Please refer to RFC6455 Section 5.5.1 for more details */ ws_pkt->len = 0; ws_pkt->type = HTTPD_WS_TYPE_CLOSE; ESP_LOGI(TAG, "ws %d closed", httpd_req_to_sockfd(req)); ws_rm_fd(httpd_req_to_sockfd(req)); int err = httpd_ws_send_frame(req, ws_pkt); if (err) { ESP_LOGE(TAG, "on close %s", esp_err_to_name(err)); } httpd_sess_trigger_close(req->handle, httpd_req_to_sockfd(req)); memory_pool_put(msg); return err; } static void ws_async_resp(void *arg) { ws_msg_t *req = arg; httpd_handle_t hd = req->hd; int fd = req->fd; int err; ESP_LOGI(TAG, "ws async fd : %d", fd); err = httpd_ws_send_frame_async(hd, fd, &req->ws_pkt); if (unlikely(err)) { ESP_LOGE(TAG, "%s", esp_err_to_name(err)); } memory_pool_put(req); } void async_send_out_cb(void *arg, int module_status) { ws_msg_t *req = arg; if (module_status != API_JSON_OK) { goto end; } int err; json_to_text(req); err = httpd_queue_work(req->hd, ws_async_resp, req); if (likely(err == ESP_OK)) { /* msg queued, let callee release the buffer */ return; } ESP_LOGE(TAG, "errno: %d, fd %p: %s", errno, req->hd, esp_err_to_name(err)); end: /* clean resources */ memory_pool_put(req); } void json_to_text(ws_msg_t *ws_msg) { int err; httpd_ws_frame_t *ws_pkt = &ws_msg->ws_pkt; /* api function returns something, send it to http client */ err = !cJSON_PrintPreallocated(ws_msg->json.out, (char *)ws_msg->payload, PAYLOAD_LEN - 5, 0); cJSON_Delete(ws_msg->json.out); if (unlikely(err)) { ws_pkt->len = strlen(MSG_SEND_JSON_ERROR); ws_pkt->payload = (uint8_t *)MSG_SEND_JSON_ERROR; ws_pkt->final = 1; } ws_pkt->len = strlen((char *)ws_pkt->payload); } /* Clients array manipulation function * */ static inline int8_t ws_add_fd(httpd_handle_t hd, int fd) { if (ws_ctx.client_count > CONFIG_LWIP_MAX_SOCKETS) { return 1; } ws_ctx.clients[ws_ctx.client_count].fd = fd; ws_ctx.clients[ws_ctx.client_count].hd = hd; ws_ctx.client_count++; return 0; } /** * @brief replace the fd and hd to be removed by the last item * all front items are valid */ static inline void ws_rm_fd(int fd) { for (int i = 0; i < ws_ctx.client_count; ++i) { if (ws_ctx.clients[i].fd == fd) { ws_ctx.clients[i].fd = ws_ctx.clients[ws_ctx.client_count - 1].fd; ws_ctx.clients[i].hd = ws_ctx.clients[ws_ctx.client_count - 1].hd; ws_ctx.client_count--; return; } } } static void send_heartbeat(void *arg) { static httpd_ws_frame_t ws_pkt = { .len = 0, .payload = NULL, .type = HTTPD_WS_TYPE_TEXT, }; struct ws_client_info_t *client_info = arg; int err; err = httpd_ws_send_frame_async(client_info->hd, client_info->fd, &ws_pkt); if (err) { ws_rm_fd(client_info->fd); httpd_sess_trigger_close(client_info->hd, client_info->fd); ESP_LOGE(TAG, "hb send err: %s", esp_err_to_name(err)); } } static inline void ws_broadcast_heartbeat() { int err; for (int i = 0; i < ws_ctx.client_count; ++i) { err = httpd_queue_work(ws_ctx.clients[i].hd, send_heartbeat, &ws_ctx.clients[i]); if (err) { ESP_LOGE(TAG, "hb queue work err: %s", esp_err_to_name(err)); } } } _Noreturn void heartbeat_task(void *arg) { (void)arg; while (1) { vTaskDelay(pdMS_TO_TICKS(2000)); ws_broadcast_heartbeat(); } }; /** * REGISTER MODULE * */ static const httpd_uri_t uri_api = { .uri = "/ws", .method = HTTP_GET, .handler = ws_req_handler, .user_ctx = NULL, .is_websocket = true, .supported_subprotocol = NULL, .handle_ws_control_frames = true, }; static int WS_REQ_INIT(const httpd_uri_t **uri_conf) { *uri_conf = &uri_api; xTaskCreate(heartbeat_task, "hb task", 2048, NULL, 3, &ws_ctx.task_heartbeat); return 0; } static int WS_REQ_EXIT(const httpd_uri_t **uri_conf) { *uri_conf = &uri_api; vTaskDelete(ws_ctx.task_heartbeat); ws_ctx.task_heartbeat = NULL; return 0; } WEB_URI_MODULE_REGISTER(101, WS_REQ_INIT, WS_REQ_EXIT)