From 2a2890e7b3933654bc7e93be44e9078583926abe Mon Sep 17 00:00:00 2001 From: luckfox-eng29 Date: Fri, 15 May 2026 18:46:51 +0800 Subject: [PATCH] feat(video): enhance video frame handling with atomic reference counting and memory management Signed-off-by: luckfox-eng29 --- native.go | 22 +++++++---- native_display.go | 5 +-- native_vpn.go | 5 +-- stream_broadcaster.go | 87 ++++++++++++++++++++++++++++++++++++------- web.go | 5 ++- 5 files changed, 95 insertions(+), 29 deletions(-) diff --git a/native.go b/native.go index d7d485e..7fa7be1 100644 --- a/native.go +++ b/native.go @@ -43,6 +43,10 @@ var ( videoCmdLock = &sync.Mutex{} ) +// jpegReadyCh is written by the ctrl socket event handler and read by captureScreenshot. +// Buffered to 1 so the jpeg thread doesn't block if captureScreenshot has already timed out. +var jpegReadyCh = make(chan struct{}, 1) + func CallCtrlAction(action string, params map[string]interface{}) (*CtrlResponse, error) { lock.Lock() defer lock.Unlock() @@ -76,7 +80,7 @@ func CallCtrlAction(action string, params map[string]interface{}) (*CtrlResponse select { case response := <-responseChan: - delete(ongoingRequests, seq) + delete(ongoingRequests, ctrlAction.Seq) if response.Error != "" { return nil, ErrorfL( &scopedLogger, @@ -87,7 +91,7 @@ func CallCtrlAction(action string, params map[string]interface{}) (*CtrlResponse return response, nil case <-time.After(5 * time.Second): close(responseChan) - delete(ongoingRequests, seq) + delete(ongoingRequests, ctrlAction.Seq) return nil, ErrorfL(&scopedLogger, "timeout waiting for response", nil) } } @@ -194,12 +198,11 @@ func handleCtrlClient(conn net.Conn) { scopedLogger.Warn().Err(err).Msg("error reading from ctrl sock") break } - readMsg := string(readBuf[:n]) ctrlResp := CtrlResponse{} - err = json.Unmarshal([]byte(readMsg), &ctrlResp) + err = json.Unmarshal(readBuf[:n], &ctrlResp) if err != nil { - scopedLogger.Warn().Err(err).Str("data", readMsg).Msg("error parsing ctrl sock msg") + scopedLogger.Warn().Err(err).Str("data", string(readBuf[:n])).Msg("error parsing ctrl sock msg") continue } scopedLogger.Trace().Interface("data", ctrlResp).Msg("ctrl sock msg") @@ -213,6 +216,11 @@ func handleCtrlClient(conn net.Conn) { switch ctrlResp.Event { case "video_input_state": HandleVideoStateMessage(ctrlResp) + case "jpeg_ready": + select { + case jpegReadyCh <- struct{}{}: + default: + } } } @@ -242,9 +250,7 @@ func handleVideoClient(conn net.Conn) { lastFrame = now // Broadcast to HTTP clients - dataCopy := make([]byte, n) - copy(dataCopy, inboundPacket[:n]) - videoBroadcaster.Broadcast(dataCopy) + videoBroadcaster.Broadcast(inboundPacket[:n]) if currentSession != nil { err := currentSession.VideoTrack.WriteSample(media.Sample{Data: inboundPacket[:n], Duration: sinceLastFrame}) diff --git a/native_display.go b/native_display.go index b081424..9d34b43 100644 --- a/native_display.go +++ b/native_display.go @@ -163,12 +163,11 @@ func handleDisplayCtrlClient(conn net.Conn) { scopedLogger.Warn().Err(err).Msg("error reading from display sock") break } - readMsg := string(readBuf[:n]) displayResp := CtrlResponse{} - err = json.Unmarshal([]byte(readMsg), &displayResp) + err = json.Unmarshal(readBuf[:n], &displayResp) if err != nil { - scopedLogger.Warn().Err(err).Str("data", readMsg).Msg("error parsing display sock msg") + scopedLogger.Warn().Err(err).Str("data", string(readBuf[:n])).Msg("error parsing display sock msg") continue } scopedLogger.Trace().Interface("data", displayResp).Msg("display sock msg") diff --git a/native_vpn.go b/native_vpn.go index 923305b..7b79d6c 100644 --- a/native_vpn.go +++ b/native_vpn.go @@ -163,12 +163,11 @@ func handleVpnCtrlClient(conn net.Conn) { scopedLogger.Warn().Err(err).Msg("error reading from vpn sock") break } - readMsg := string(readBuf[:n]) vpnResp := CtrlResponse{} - err = json.Unmarshal([]byte(readMsg), &vpnResp) + err = json.Unmarshal(readBuf[:n], &vpnResp) if err != nil { - scopedLogger.Warn().Err(err).Str("data", readMsg).Msg("error parsing vpn sock msg") + scopedLogger.Warn().Err(err).Str("data", string(readBuf[:n])).Msg("error parsing vpn sock msg") continue } scopedLogger.Trace().Interface("data", vpnResp).Msg("vpn sock msg") diff --git a/stream_broadcaster.go b/stream_broadcaster.go index 32b52d5..9942412 100644 --- a/stream_broadcaster.go +++ b/stream_broadcaster.go @@ -2,30 +2,64 @@ package kvm import ( "sync" + "sync/atomic" "github.com/google/uuid" ) +type VideoFrame struct { + data []byte + refs atomic.Int32 + pool *sync.Pool +} + +func (f *VideoFrame) Data() []byte { + return f.data +} + +func (f *VideoFrame) Release() { + if f.refs.Add(-1) == 0 { + f.data = f.data[:cap(f.data)] + f.pool.Put(f.data) + } +} + +var framePool = sync.Pool{ + New: func() interface{} { + return make([]byte, maxFrameSize) + }, +} + type VideoBroadcaster struct { - subscribers map[string]chan []byte - lock sync.RWMutex - onFirstSubscribe func() + subscribers map[string]chan *VideoFrame + subscriberList []chan *VideoFrame // cached flat slice, rebuilt on Subscribe/Unsubscribe + count atomic.Int32 // len(subscribers) as atomic for fast Broadcast check + lock sync.RWMutex + onFirstSubscribe func() onLastUnsubscribe func() } var videoBroadcaster = &VideoBroadcaster{ - subscribers: make(map[string]chan []byte), + subscribers: make(map[string]chan *VideoFrame), } -func (b *VideoBroadcaster) Subscribe() (string, chan []byte) { +func (b *VideoBroadcaster) rebuildList() { + list := make([]chan *VideoFrame, 0, len(b.subscribers)) + for _, ch := range b.subscribers { + list = append(list, ch) + } + b.subscriberList = list +} + +func (b *VideoBroadcaster) Subscribe() (string, chan *VideoFrame) { b.lock.Lock() defer b.lock.Unlock() id := uuid.New().String() - // Buffer a bit to avoid dropping frames too easily, - // but not too much to avoid latency build-up - ch := make(chan []byte, 200) + ch := make(chan *VideoFrame, 200) wasEmpty := len(b.subscribers) == 0 b.subscribers[id] = ch + b.rebuildList() + b.count.Store(int32(len(b.subscribers))) if wasEmpty && b.onFirstSubscribe != nil { b.onFirstSubscribe() } @@ -38,6 +72,8 @@ func (b *VideoBroadcaster) Unsubscribe(id string) { if ch, ok := b.subscribers[id]; ok { close(ch) delete(b.subscribers, id) + b.rebuildList() + b.count.Store(int32(len(b.subscribers))) if len(b.subscribers) == 0 && b.onLastUnsubscribe != nil { b.onLastUnsubscribe() } @@ -45,15 +81,38 @@ func (b *VideoBroadcaster) Unsubscribe(id string) { } func (b *VideoBroadcaster) Broadcast(data []byte) { + // atomic check avoids acquiring RLock on every video frame when no HTTP clients are connected + if b.count.Load() == 0 { + return + } + b.lock.RLock() - defer b.lock.RUnlock() - for _, ch := range b.subscribers { - // Non-blocking send + subscribers := b.subscriberList + subscriberCount := len(subscribers) + if subscriberCount == 0 { + b.lock.RUnlock() + return + } + + buf := framePool.Get().([]byte) + if cap(buf) < len(data) { + buf = make([]byte, len(data)) + } + n := copy(buf, data) + + frame := &VideoFrame{ + data: buf[:n], + pool: &framePool, + } + frame.refs.Store(int32(subscriberCount + 1)) + + for _, ch := range subscribers { select { - case ch <- data: + case ch <- frame: default: - // Drop frame if channel is full to avoid blocking other subscribers - // Ideally we should have a ring buffer or similar, but this is simple + frame.Release() } } + b.lock.RUnlock() + frame.Release() } diff --git a/web.go b/web.go index d8cfb44..02efc06 100644 --- a/web.go +++ b/web.go @@ -942,20 +942,23 @@ func handleVideoStream(c *gin.Context) { for { select { - case data, ok := <-ch: + case frame, ok := <-ch: if !ok { logger.Info().Int("total_frames", frameCount).Msg("video broadcaster channel closed") return } + data := frame.Data() frameCount++ if frameCount == 1 { logger.Info().Int("size", len(data)).Msg("first video frame received") } if _, err := c.Writer.Write(data); err != nil { logger.Warn().Err(err).Int("total_frames", frameCount).Msg("error writing video data") + frame.Release() return } c.Writer.Flush() + frame.Release() case <-ctx.Done(): logger.Info().Int("total_frames", frameCount).Msg("client disconnected") return