feat(video): enhance video frame handling with atomic reference counting and memory management

Signed-off-by: luckfox-eng29 <eng29@luckfox.com>
This commit is contained in:
luckfox-eng29
2026-05-15 18:46:51 +08:00
parent 40f5af2120
commit 2a2890e7b3
5 changed files with 95 additions and 29 deletions

View File

@@ -43,6 +43,10 @@ var (
videoCmdLock = &sync.Mutex{} videoCmdLock = &sync.Mutex{}
) )
// jpegReadyCh is written by the ctrl socket event handler and read by captureScreenshot.
// Buffered to 1 so the jpeg thread doesn't block if captureScreenshot has already timed out.
var jpegReadyCh = make(chan struct{}, 1)
func CallCtrlAction(action string, params map[string]interface{}) (*CtrlResponse, error) { func CallCtrlAction(action string, params map[string]interface{}) (*CtrlResponse, error) {
lock.Lock() lock.Lock()
defer lock.Unlock() defer lock.Unlock()
@@ -76,7 +80,7 @@ func CallCtrlAction(action string, params map[string]interface{}) (*CtrlResponse
select { select {
case response := <-responseChan: case response := <-responseChan:
delete(ongoingRequests, seq) delete(ongoingRequests, ctrlAction.Seq)
if response.Error != "" { if response.Error != "" {
return nil, ErrorfL( return nil, ErrorfL(
&scopedLogger, &scopedLogger,
@@ -87,7 +91,7 @@ func CallCtrlAction(action string, params map[string]interface{}) (*CtrlResponse
return response, nil return response, nil
case <-time.After(5 * time.Second): case <-time.After(5 * time.Second):
close(responseChan) close(responseChan)
delete(ongoingRequests, seq) delete(ongoingRequests, ctrlAction.Seq)
return nil, ErrorfL(&scopedLogger, "timeout waiting for response", nil) return nil, ErrorfL(&scopedLogger, "timeout waiting for response", nil)
} }
} }
@@ -194,12 +198,11 @@ func handleCtrlClient(conn net.Conn) {
scopedLogger.Warn().Err(err).Msg("error reading from ctrl sock") scopedLogger.Warn().Err(err).Msg("error reading from ctrl sock")
break break
} }
readMsg := string(readBuf[:n])
ctrlResp := CtrlResponse{} ctrlResp := CtrlResponse{}
err = json.Unmarshal([]byte(readMsg), &ctrlResp) err = json.Unmarshal(readBuf[:n], &ctrlResp)
if err != nil { if err != nil {
scopedLogger.Warn().Err(err).Str("data", readMsg).Msg("error parsing ctrl sock msg") scopedLogger.Warn().Err(err).Str("data", string(readBuf[:n])).Msg("error parsing ctrl sock msg")
continue continue
} }
scopedLogger.Trace().Interface("data", ctrlResp).Msg("ctrl sock msg") scopedLogger.Trace().Interface("data", ctrlResp).Msg("ctrl sock msg")
@@ -213,6 +216,11 @@ func handleCtrlClient(conn net.Conn) {
switch ctrlResp.Event { switch ctrlResp.Event {
case "video_input_state": case "video_input_state":
HandleVideoStateMessage(ctrlResp) HandleVideoStateMessage(ctrlResp)
case "jpeg_ready":
select {
case jpegReadyCh <- struct{}{}:
default:
}
} }
} }
@@ -242,9 +250,7 @@ func handleVideoClient(conn net.Conn) {
lastFrame = now lastFrame = now
// Broadcast to HTTP clients // Broadcast to HTTP clients
dataCopy := make([]byte, n) videoBroadcaster.Broadcast(inboundPacket[:n])
copy(dataCopy, inboundPacket[:n])
videoBroadcaster.Broadcast(dataCopy)
if currentSession != nil { if currentSession != nil {
err := currentSession.VideoTrack.WriteSample(media.Sample{Data: inboundPacket[:n], Duration: sinceLastFrame}) err := currentSession.VideoTrack.WriteSample(media.Sample{Data: inboundPacket[:n], Duration: sinceLastFrame})

View File

@@ -163,12 +163,11 @@ func handleDisplayCtrlClient(conn net.Conn) {
scopedLogger.Warn().Err(err).Msg("error reading from display sock") scopedLogger.Warn().Err(err).Msg("error reading from display sock")
break break
} }
readMsg := string(readBuf[:n])
displayResp := CtrlResponse{} displayResp := CtrlResponse{}
err = json.Unmarshal([]byte(readMsg), &displayResp) err = json.Unmarshal(readBuf[:n], &displayResp)
if err != nil { if err != nil {
scopedLogger.Warn().Err(err).Str("data", readMsg).Msg("error parsing display sock msg") scopedLogger.Warn().Err(err).Str("data", string(readBuf[:n])).Msg("error parsing display sock msg")
continue continue
} }
scopedLogger.Trace().Interface("data", displayResp).Msg("display sock msg") scopedLogger.Trace().Interface("data", displayResp).Msg("display sock msg")

View File

@@ -163,12 +163,11 @@ func handleVpnCtrlClient(conn net.Conn) {
scopedLogger.Warn().Err(err).Msg("error reading from vpn sock") scopedLogger.Warn().Err(err).Msg("error reading from vpn sock")
break break
} }
readMsg := string(readBuf[:n])
vpnResp := CtrlResponse{} vpnResp := CtrlResponse{}
err = json.Unmarshal([]byte(readMsg), &vpnResp) err = json.Unmarshal(readBuf[:n], &vpnResp)
if err != nil { if err != nil {
scopedLogger.Warn().Err(err).Str("data", readMsg).Msg("error parsing vpn sock msg") scopedLogger.Warn().Err(err).Str("data", string(readBuf[:n])).Msg("error parsing vpn sock msg")
continue continue
} }
scopedLogger.Trace().Interface("data", vpnResp).Msg("vpn sock msg") scopedLogger.Trace().Interface("data", vpnResp).Msg("vpn sock msg")

View File

@@ -2,30 +2,64 @@ package kvm
import ( import (
"sync" "sync"
"sync/atomic"
"github.com/google/uuid" "github.com/google/uuid"
) )
type VideoFrame struct {
data []byte
refs atomic.Int32
pool *sync.Pool
}
func (f *VideoFrame) Data() []byte {
return f.data
}
func (f *VideoFrame) Release() {
if f.refs.Add(-1) == 0 {
f.data = f.data[:cap(f.data)]
f.pool.Put(f.data)
}
}
var framePool = sync.Pool{
New: func() interface{} {
return make([]byte, maxFrameSize)
},
}
type VideoBroadcaster struct { type VideoBroadcaster struct {
subscribers map[string]chan []byte subscribers map[string]chan *VideoFrame
lock sync.RWMutex subscriberList []chan *VideoFrame // cached flat slice, rebuilt on Subscribe/Unsubscribe
onFirstSubscribe func() count atomic.Int32 // len(subscribers) as atomic for fast Broadcast check
lock sync.RWMutex
onFirstSubscribe func()
onLastUnsubscribe func() onLastUnsubscribe func()
} }
var videoBroadcaster = &VideoBroadcaster{ var videoBroadcaster = &VideoBroadcaster{
subscribers: make(map[string]chan []byte), subscribers: make(map[string]chan *VideoFrame),
} }
func (b *VideoBroadcaster) Subscribe() (string, chan []byte) { func (b *VideoBroadcaster) rebuildList() {
list := make([]chan *VideoFrame, 0, len(b.subscribers))
for _, ch := range b.subscribers {
list = append(list, ch)
}
b.subscriberList = list
}
func (b *VideoBroadcaster) Subscribe() (string, chan *VideoFrame) {
b.lock.Lock() b.lock.Lock()
defer b.lock.Unlock() defer b.lock.Unlock()
id := uuid.New().String() id := uuid.New().String()
// Buffer a bit to avoid dropping frames too easily, ch := make(chan *VideoFrame, 200)
// but not too much to avoid latency build-up
ch := make(chan []byte, 200)
wasEmpty := len(b.subscribers) == 0 wasEmpty := len(b.subscribers) == 0
b.subscribers[id] = ch b.subscribers[id] = ch
b.rebuildList()
b.count.Store(int32(len(b.subscribers)))
if wasEmpty && b.onFirstSubscribe != nil { if wasEmpty && b.onFirstSubscribe != nil {
b.onFirstSubscribe() b.onFirstSubscribe()
} }
@@ -38,6 +72,8 @@ func (b *VideoBroadcaster) Unsubscribe(id string) {
if ch, ok := b.subscribers[id]; ok { if ch, ok := b.subscribers[id]; ok {
close(ch) close(ch)
delete(b.subscribers, id) delete(b.subscribers, id)
b.rebuildList()
b.count.Store(int32(len(b.subscribers)))
if len(b.subscribers) == 0 && b.onLastUnsubscribe != nil { if len(b.subscribers) == 0 && b.onLastUnsubscribe != nil {
b.onLastUnsubscribe() b.onLastUnsubscribe()
} }
@@ -45,15 +81,38 @@ func (b *VideoBroadcaster) Unsubscribe(id string) {
} }
func (b *VideoBroadcaster) Broadcast(data []byte) { func (b *VideoBroadcaster) Broadcast(data []byte) {
// atomic check avoids acquiring RLock on every video frame when no HTTP clients are connected
if b.count.Load() == 0 {
return
}
b.lock.RLock() b.lock.RLock()
defer b.lock.RUnlock() subscribers := b.subscriberList
for _, ch := range b.subscribers { subscriberCount := len(subscribers)
// Non-blocking send if subscriberCount == 0 {
b.lock.RUnlock()
return
}
buf := framePool.Get().([]byte)
if cap(buf) < len(data) {
buf = make([]byte, len(data))
}
n := copy(buf, data)
frame := &VideoFrame{
data: buf[:n],
pool: &framePool,
}
frame.refs.Store(int32(subscriberCount + 1))
for _, ch := range subscribers {
select { select {
case ch <- data: case ch <- frame:
default: default:
// Drop frame if channel is full to avoid blocking other subscribers frame.Release()
// Ideally we should have a ring buffer or similar, but this is simple
} }
} }
b.lock.RUnlock()
frame.Release()
} }

5
web.go
View File

@@ -942,20 +942,23 @@ func handleVideoStream(c *gin.Context) {
for { for {
select { select {
case data, ok := <-ch: case frame, ok := <-ch:
if !ok { if !ok {
logger.Info().Int("total_frames", frameCount).Msg("video broadcaster channel closed") logger.Info().Int("total_frames", frameCount).Msg("video broadcaster channel closed")
return return
} }
data := frame.Data()
frameCount++ frameCount++
if frameCount == 1 { if frameCount == 1 {
logger.Info().Int("size", len(data)).Msg("first video frame received") logger.Info().Int("size", len(data)).Msg("first video frame received")
} }
if _, err := c.Writer.Write(data); err != nil { if _, err := c.Writer.Write(data); err != nil {
logger.Warn().Err(err).Int("total_frames", frameCount).Msg("error writing video data") logger.Warn().Err(err).Int("total_frames", frameCount).Msg("error writing video data")
frame.Release()
return return
} }
c.Writer.Flush() c.Writer.Flush()
frame.Release()
case <-ctx.Done(): case <-ctx.Done():
logger.Info().Int("total_frames", frameCount).Msg("client disconnected") logger.Info().Int("total_frames", frameCount).Msg("client disconnected")
return return