Просмотр исходного кода

[FL-2335] Gui, Rpc: multisession, asynchronous screen streaming with adaptive frame rate (#1033)

* Gui,Rpc: multisession, asynchronous screen streaming with adaptive frame rate
* Fix compact build, add missing aray initialization.
あく 3 лет назад
Родитель
Сommit
eed49bf863
6 измененных файлов с 138 добавлено и 48 удалено
  1. 30 16
      applications/gui/gui.c
  2. 13 7
      applications/gui/gui.h
  3. 13 2
      applications/gui/gui_i.h
  4. 7 2
      applications/rpc/rpc.c
  5. 72 21
      applications/rpc/rpc_gui.c
  6. 3 0
      applications/rpc/rpc_i.h

+ 30 - 16
applications/gui/gui.c

@@ -211,12 +211,11 @@ void gui_redraw(Gui* gui) {
     }
     }
 
 
     canvas_commit(gui->canvas);
     canvas_commit(gui->canvas);
-    if(gui->canvas_callback) {
-        gui->canvas_callback(
-            canvas_get_buffer(gui->canvas),
-            canvas_get_buffer_size(gui->canvas),
-            gui->canvas_callback_context);
-    }
+    for
+        M_EACH(p, gui->canvas_callback_pair, CanvasCallbackPairArray_t) {
+            p->callback(
+                canvas_get_buffer(gui->canvas), canvas_get_buffer_size(gui->canvas), p->context);
+        }
     gui_unlock(gui);
     gui_unlock(gui);
 }
 }
 
 
@@ -396,24 +395,36 @@ void gui_view_port_send_to_back(Gui* gui, ViewPort* view_port) {
     gui_unlock(gui);
     gui_unlock(gui);
 }
 }
 
 
-void gui_set_framebuffer_callback(Gui* gui, GuiCanvasCommitCallback callback, void* context) {
+void gui_add_framebuffer_callback(Gui* gui, GuiCanvasCommitCallback callback, void* context) {
     furi_assert(gui);
     furi_assert(gui);
+
+    const CanvasCallbackPair p = {callback, context};
+
     gui_lock(gui);
     gui_lock(gui);
-    gui->canvas_callback = callback;
-    gui->canvas_callback_context = context;
-    gui_unlock(gui);
 
 
-    if(callback != NULL) {
-        gui_update(gui);
-    }
+    furi_assert(CanvasCallbackPairArray_count(gui->canvas_callback_pair, p) == 0);
+    CanvasCallbackPairArray_push_back(gui->canvas_callback_pair, p);
+
+    gui_unlock(gui);
+    gui_update(gui);
 }
 }
 
 
-GuiCanvasCommitCallback gui_get_framebuffer_callback(Gui* gui) {
+void gui_remove_framebuffer_callback(Gui* gui, GuiCanvasCommitCallback callback, void* context) {
     furi_assert(gui);
     furi_assert(gui);
+
+    const CanvasCallbackPair p = {callback, context};
+
     gui_lock(gui);
     gui_lock(gui);
-    GuiCanvasCommitCallback callback = gui->canvas_callback;
+
+    furi_assert(CanvasCallbackPairArray_count(gui->canvas_callback_pair, p) == 1);
+    CanvasCallbackPairArray_remove_val(gui->canvas_callback_pair, p);
+
     gui_unlock(gui);
     gui_unlock(gui);
-    return callback;
+}
+
+size_t gui_get_framebuffer_size(Gui* gui) {
+    furi_assert(gui);
+    return canvas_get_buffer_size(gui->canvas);
 }
 }
 
 
 void gui_set_lockdown(Gui* gui, bool lockdown) {
 void gui_set_lockdown(Gui* gui, bool lockdown) {
@@ -437,9 +448,12 @@ Gui* gui_alloc() {
     }
     }
     // Drawing canvas
     // Drawing canvas
     gui->canvas = canvas_init();
     gui->canvas = canvas_init();
+    CanvasCallbackPairArray_init(gui->canvas_callback_pair);
+
     // Input
     // Input
     gui->input_queue = osMessageQueueNew(8, sizeof(InputEvent), NULL);
     gui->input_queue = osMessageQueueNew(8, sizeof(InputEvent), NULL);
     gui->input_events = furi_record_open("input_events");
     gui->input_events = furi_record_open("input_events");
+
     furi_check(gui->input_events);
     furi_check(gui->input_events);
     furi_pubsub_subscribe(gui->input_events, gui_input_events_callback, gui);
     furi_pubsub_subscribe(gui->input_events, gui_input_events_callback, gui);
 
 

+ 13 - 7
applications/gui/gui.h

@@ -68,7 +68,7 @@ void gui_view_port_send_to_front(Gui* gui, ViewPort* view_port);
  */
  */
 void gui_view_port_send_to_back(Gui* gui, ViewPort* view_port);
 void gui_view_port_send_to_back(Gui* gui, ViewPort* view_port);
 
 
-/** Set gui canvas commit callback
+/** Add gui canvas commit callback
  *
  *
  * This callback will be called upon Canvas commit Callback dispatched from GUI
  * This callback will be called upon Canvas commit Callback dispatched from GUI
  * thread and is time critical
  * thread and is time critical
@@ -77,16 +77,22 @@ void gui_view_port_send_to_back(Gui* gui, ViewPort* view_port);
  * @param      callback  GuiCanvasCommitCallback
  * @param      callback  GuiCanvasCommitCallback
  * @param      context   GuiCanvasCommitCallback context
  * @param      context   GuiCanvasCommitCallback context
  */
  */
-void gui_set_framebuffer_callback(Gui* gui, GuiCanvasCommitCallback callback, void* context);
+void gui_add_framebuffer_callback(Gui* gui, GuiCanvasCommitCallback callback, void* context);
 
 
-/** Get gui canvas commit callback
- *
- * Can be used to check if some application is using framebufer
+/** Remove gui canvas commit callback
  *
  *
  * @param      gui       Gui instance
  * @param      gui       Gui instance
- * @return     GuiCanvasCommitCallback
+ * @param      callback  GuiCanvasCommitCallback
+ * @param      context   GuiCanvasCommitCallback context
+ */
+void gui_remove_framebuffer_callback(Gui* gui, GuiCanvasCommitCallback callback, void* context);
+
+/** Get gui canvas frame buffer size
+ * *
+ * @param      gui       Gui instance
+ * @return     size_t    size of frame buffer in bytes
  */
  */
-GuiCanvasCommitCallback gui_get_framebuffer_callback(Gui* gui);
+size_t gui_get_framebuffer_size(Gui* gui);
 
 
 /** Set lockdown mode
 /** Set lockdown mode
  *
  *

+ 13 - 2
applications/gui/gui_i.h

@@ -9,6 +9,7 @@
 
 
 #include <furi.h>
 #include <furi.h>
 #include <m-array.h>
 #include <m-array.h>
+#include <m-algo.h>
 #include <stdio.h>
 #include <stdio.h>
 
 
 #include "canvas.h"
 #include "canvas.h"
@@ -42,6 +43,17 @@
 
 
 ARRAY_DEF(ViewPortArray, ViewPort*, M_PTR_OPLIST);
 ARRAY_DEF(ViewPortArray, ViewPort*, M_PTR_OPLIST);
 
 
+typedef struct {
+    GuiCanvasCommitCallback callback;
+    void* context;
+} CanvasCallbackPair;
+
+ARRAY_DEF(CanvasCallbackPairArray, CanvasCallbackPair, M_POD_OPLIST);
+
+#define M_OPL_CanvasCallbackPairArray_t() ARRAY_OPLIST(CanvasCallbackPairArray, M_POD_OPLIST)
+
+ALGO_DEF(CanvasCallbackPairArray, CanvasCallbackPairArray_t);
+
 /** Gui structure */
 /** Gui structure */
 struct Gui {
 struct Gui {
     // Thread and lock
     // Thread and lock
@@ -52,8 +64,7 @@ struct Gui {
     bool lockdown;
     bool lockdown;
     ViewPortArray_t layers[GuiLayerMAX];
     ViewPortArray_t layers[GuiLayerMAX];
     Canvas* canvas;
     Canvas* canvas;
-    GuiCanvasCommitCallback canvas_callback;
-    void* canvas_callback_context;
+    CanvasCallbackPairArray_t canvas_callback_pair;
 
 
     // Input
     // Input
     osMessageQueueId_t input_queue;
     osMessageQueueId_t input_queue;

+ 7 - 2
applications/rpc/rpc.c

@@ -623,7 +623,7 @@ RpcSession* rpc_session_open(Rpc* rpc) {
     rpc_add_handler(session, PB_Main_stop_session_tag, &rpc_handler);
     rpc_add_handler(session, PB_Main_stop_session_tag, &rpc_handler);
 
 
     session->thread = furi_thread_alloc();
     session->thread = furi_thread_alloc();
-    furi_thread_set_name(session->thread, "RPC Session");
+    furi_thread_set_name(session->thread, "RpcSessionWorker");
     furi_thread_set_stack_size(session->thread, 2048);
     furi_thread_set_stack_size(session->thread, 2048);
     furi_thread_set_context(session->thread, session);
     furi_thread_set_context(session->thread, session);
     furi_thread_set_callback(session->thread, rpc_session_worker);
     furi_thread_set_callback(session->thread, rpc_session_worker);
@@ -666,9 +666,10 @@ void rpc_add_handler(RpcSession* session, pb_size_t message_tag, RpcHandler* han
     RpcHandlerDict_set_at(session->handlers, message_tag, *handler);
     RpcHandlerDict_set_at(session->handlers, message_tag, *handler);
 }
 }
 
 
-void rpc_send_and_release(RpcSession* session, PB_Main* message) {
+void rpc_send(RpcSession* session, PB_Main* message) {
     furi_assert(session);
     furi_assert(session);
     furi_assert(message);
     furi_assert(message);
+
     pb_ostream_t ostream = PB_OSTREAM_SIZING;
     pb_ostream_t ostream = PB_OSTREAM_SIZING;
 
 
 #if SRV_RPC_DEBUG
 #if SRV_RPC_DEBUG
@@ -695,6 +696,10 @@ void rpc_send_and_release(RpcSession* session, PB_Main* message) {
     osMutexRelease(session->callbacks_mutex);
     osMutexRelease(session->callbacks_mutex);
 
 
     free(buffer);
     free(buffer);
+}
+
+void rpc_send_and_release(RpcSession* session, PB_Main* message) {
+    rpc_send(session, message);
     pb_release(&PB_Main_msg, message);
     pb_release(&PB_Main_msg, message);
 }
 }
 
 

+ 72 - 21
applications/rpc/rpc_gui.c

@@ -5,11 +5,25 @@
 
 
 #define TAG "RpcGui"
 #define TAG "RpcGui"
 
 
+typedef enum {
+    RpcGuiWorkerFlagTransmit = (1 << 0),
+    RpcGuiWorkerFlagExit = (1 << 1),
+} RpcGuiWorkerFlag;
+
+#define RpcGuiWorkerFlagAny (RpcGuiWorkerFlagTransmit | RpcGuiWorkerFlagExit)
+
 typedef struct {
 typedef struct {
     RpcSession* session;
     RpcSession* session;
     Gui* gui;
     Gui* gui;
+
+    // Receive part
     ViewPort* virtual_display_view_port;
     ViewPort* virtual_display_view_port;
     uint8_t* virtual_display_buffer;
     uint8_t* virtual_display_buffer;
+
+    // Transmit
+    PB_Main* transmit_frame;
+    FuriThread* transmit_thread;
+
     bool virtual_display_not_empty;
     bool virtual_display_not_empty;
     bool is_streaming;
     bool is_streaming;
 } RpcGuiSystem;
 } RpcGuiSystem;
@@ -17,25 +31,35 @@ typedef struct {
 static void
 static void
     rpc_system_gui_screen_stream_frame_callback(uint8_t* data, size_t size, void* context) {
     rpc_system_gui_screen_stream_frame_callback(uint8_t* data, size_t size, void* context) {
     furi_assert(data);
     furi_assert(data);
-    furi_assert(size == 1024);
     furi_assert(context);
     furi_assert(context);
 
 
     RpcGuiSystem* rpc_gui = (RpcGuiSystem*)context;
     RpcGuiSystem* rpc_gui = (RpcGuiSystem*)context;
-    RpcSession* session = rpc_gui->session;
+    uint8_t* buffer = rpc_gui->transmit_frame->content.gui_screen_frame.data->bytes;
 
 
-    PB_Main* frame = malloc(sizeof(PB_Main));
+    furi_assert(size == rpc_gui->transmit_frame->content.gui_screen_frame.data->size);
 
 
-    frame->which_content = PB_Main_gui_screen_frame_tag;
-    frame->command_status = PB_CommandStatus_OK;
-    frame->content.gui_screen_frame.data = malloc(PB_BYTES_ARRAY_T_ALLOCSIZE(size));
-    uint8_t* buffer = frame->content.gui_screen_frame.data->bytes;
-    uint16_t* frame_size_msg = &frame->content.gui_screen_frame.data->size;
-    *frame_size_msg = size;
     memcpy(buffer, data, size);
     memcpy(buffer, data, size);
 
 
-    rpc_send_and_release(session, frame);
+    osThreadFlagsSet(
+        furi_thread_get_thread_id(rpc_gui->transmit_thread), RpcGuiWorkerFlagTransmit);
+}
 
 
-    free(frame);
+static int32_t rpc_system_gui_screen_stream_frame_transmit_thread(void* context) {
+    furi_assert(context);
+
+    RpcGuiSystem* rpc_gui = (RpcGuiSystem*)context;
+
+    while(true) {
+        uint32_t flags = osThreadFlagsWait(RpcGuiWorkerFlagAny, osFlagsWaitAny, osWaitForever);
+        if(flags & RpcGuiWorkerFlagTransmit) {
+            rpc_send(rpc_gui->session, rpc_gui->transmit_frame);
+        }
+        if(flags & RpcGuiWorkerFlagExit) {
+            break;
+        }
+    }
+
+    return 0;
 }
 }
 
 
 static void rpc_system_gui_start_screen_stream_process(const PB_Main* request, void* context) {
 static void rpc_system_gui_start_screen_stream_process(const PB_Main* request, void* context) {
@@ -45,15 +69,30 @@ static void rpc_system_gui_start_screen_stream_process(const PB_Main* request, v
 
 
     RpcSession* session = rpc_gui->session;
     RpcSession* session = rpc_gui->session;
     furi_assert(session);
     furi_assert(session);
+    furi_assert(!rpc_gui->is_streaming);
 
 
-    if(gui_get_framebuffer_callback(rpc_gui->gui) == NULL) {
-        rpc_send_and_release_empty(session, request->command_id, PB_CommandStatus_OK);
-        rpc_gui->is_streaming = true;
-        gui_set_framebuffer_callback(
-            rpc_gui->gui, rpc_system_gui_screen_stream_frame_callback, context);
-    } else {
-        rpc_send_and_release_empty(session, request->command_id, PB_CommandStatus_ERROR_BUSY);
-    }
+    rpc_send_and_release_empty(session, request->command_id, PB_CommandStatus_OK);
+
+    rpc_gui->is_streaming = true;
+    size_t framebuffer_size = gui_get_framebuffer_size(rpc_gui->gui);
+    // Reusable Frame
+    rpc_gui->transmit_frame = malloc(sizeof(PB_Main));
+    rpc_gui->transmit_frame->which_content = PB_Main_gui_screen_frame_tag;
+    rpc_gui->transmit_frame->command_status = PB_CommandStatus_OK;
+    rpc_gui->transmit_frame->content.gui_screen_frame.data =
+        malloc(PB_BYTES_ARRAY_T_ALLOCSIZE(framebuffer_size));
+    rpc_gui->transmit_frame->content.gui_screen_frame.data->size = framebuffer_size;
+    // Transmission thread for async TX
+    rpc_gui->transmit_thread = furi_thread_alloc();
+    furi_thread_set_name(rpc_gui->transmit_thread, "GuiRpcWorker");
+    furi_thread_set_callback(
+        rpc_gui->transmit_thread, rpc_system_gui_screen_stream_frame_transmit_thread);
+    furi_thread_set_context(rpc_gui->transmit_thread, rpc_gui);
+    furi_thread_set_stack_size(rpc_gui->transmit_thread, 1024);
+    furi_thread_start(rpc_gui->transmit_thread);
+    // GUI framebuffer callback
+    gui_add_framebuffer_callback(
+        rpc_gui->gui, rpc_system_gui_screen_stream_frame_callback, context);
 }
 }
 
 
 static void rpc_system_gui_stop_screen_stream_process(const PB_Main* request, void* context) {
 static void rpc_system_gui_stop_screen_stream_process(const PB_Main* request, void* context) {
@@ -66,7 +105,18 @@ static void rpc_system_gui_stop_screen_stream_process(const PB_Main* request, vo
 
 
     if(rpc_gui->is_streaming) {
     if(rpc_gui->is_streaming) {
         rpc_gui->is_streaming = false;
         rpc_gui->is_streaming = false;
-        gui_set_framebuffer_callback(rpc_gui->gui, NULL, NULL);
+        // Remove GUI framebuffer callback
+        gui_remove_framebuffer_callback(
+            rpc_gui->gui, rpc_system_gui_screen_stream_frame_callback, context);
+        // Stop and release worker thread
+        osThreadFlagsSet(
+            furi_thread_get_thread_id(rpc_gui->transmit_thread), RpcGuiWorkerFlagExit);
+        furi_thread_join(rpc_gui->transmit_thread);
+        furi_thread_free(rpc_gui->transmit_thread);
+        // Release frame
+        pb_release(&PB_Main_msg, rpc_gui->transmit_frame);
+        free(rpc_gui->transmit_frame);
+        rpc_gui->transmit_frame = NULL;
     }
     }
 
 
     rpc_send_and_release_empty(session, request->command_id, PB_CommandStatus_OK);
     rpc_send_and_release_empty(session, request->command_id, PB_CommandStatus_OK);
@@ -296,7 +346,8 @@ void rpc_system_gui_free(void* context) {
     }
     }
 
 
     if(rpc_gui->is_streaming) {
     if(rpc_gui->is_streaming) {
-        gui_set_framebuffer_callback(rpc_gui->gui, NULL, NULL);
+        gui_remove_framebuffer_callback(
+            rpc_gui->gui, rpc_system_gui_screen_stream_frame_callback, context);
     }
     }
     furi_record_close("gui");
     furi_record_close("gui");
     free(rpc_gui);
     free(rpc_gui);

+ 3 - 0
applications/rpc/rpc_i.h

@@ -17,7 +17,10 @@ typedef struct {
     void* context;
     void* context;
 } RpcHandler;
 } RpcHandler;
 
 
+void rpc_send(RpcSession* session, PB_Main* main_message);
+
 void rpc_send_and_release(RpcSession* session, PB_Main* main_message);
 void rpc_send_and_release(RpcSession* session, PB_Main* main_message);
+
 void rpc_send_and_release_empty(RpcSession* session, uint32_t command_id, PB_CommandStatus status);
 void rpc_send_and_release_empty(RpcSession* session, uint32_t command_id, PB_CommandStatus status);
 
 
 void rpc_add_handler(RpcSession* session, pb_size_t message_tag, RpcHandler* handler);
 void rpc_add_handler(RpcSession* session, pb_size_t message_tag, RpcHandler* handler);