feat: hid rpc channel (#755)

* feat: use hidRpcChannel to save bandwidth

* chore: simplify handshake of hid rpc

* add logs

* chore: add timeout when writing to hid endpoints

* fix issues

* chore: show hid rpc version

* refactor hidrpc marshal / unmarshal

* add queues for keyboard / mouse event

* chore: change logging level of JSONRPC send event to trace

* minor changes related to logging

* fix: nil check

* chore: add comments and remove unused code

* add useMouse

* chore: log msg data only when debug or trace mode

* chore: make tslint happy

* chore: unlock keyboardStateLock before calling onKeysDownChange

* chore: remove keyPressReportApiAvailable

* chore: change version handle

* chore: clean up unused functions

* remove comments
This commit is contained in:
Aveline
2025-09-04 22:27:56 +02:00
committed by GitHub
parent e8ef82e582
commit bcc307b147
21 changed files with 1292 additions and 216 deletions

View File

@@ -6,10 +6,12 @@ import (
"encoding/json"
"net"
"strings"
"sync"
"github.com/coder/websocket"
"github.com/coder/websocket/wsjson"
"github.com/gin-gonic/gin"
"github.com/jetkvm/kvm/internal/hidrpc"
"github.com/jetkvm/kvm/internal/logging"
"github.com/pion/webrtc/v4"
"github.com/rs/zerolog"
@@ -22,7 +24,12 @@ type Session struct {
RPCChannel *webrtc.DataChannel
HidChannel *webrtc.DataChannel
shouldUmountVirtualMedia bool
rpcQueue chan webrtc.DataChannelMessage
rpcQueue chan webrtc.DataChannelMessage
hidRPCAvailable bool
hidQueueLock sync.Mutex
hidQueue []chan webrtc.DataChannelMessage
}
type SessionConfig struct {
@@ -67,6 +74,23 @@ func (s *Session) ExchangeOffer(offerStr string) (string, error) {
return base64.StdEncoding.EncodeToString(localDescription), nil
}
func (s *Session) initQueues() {
s.hidQueueLock.Lock()
defer s.hidQueueLock.Unlock()
s.hidQueue = make([]chan webrtc.DataChannelMessage, 0)
for i := 0; i < 4; i++ {
q := make(chan webrtc.DataChannelMessage, 256)
s.hidQueue = append(s.hidQueue, q)
}
}
func (s *Session) handleQueues(index int) {
for msg := range s.hidQueue[index] {
onHidMessage(msg.Data, s)
}
}
func newSession(config SessionConfig) (*Session, error) {
webrtcSettingEngine := webrtc.SettingEngine{
LoggerFactory: logging.GetPionDefaultLoggerFactory(),
@@ -105,17 +129,68 @@ func newSession(config SessionConfig) (*Session, error) {
scopedLogger.Warn().Err(err).Msg("Failed to create PeerConnection")
return nil, err
}
session := &Session{peerConnection: peerConnection}
session.rpcQueue = make(chan webrtc.DataChannelMessage, 256)
session.initQueues()
go func() {
for msg := range session.rpcQueue {
onRPCMessage(msg, session)
// TODO: only use goroutine if the task is asynchronous
go onRPCMessage(msg, session)
}
}()
for i := 0; i < len(session.hidQueue); i++ {
go session.handleQueues(i)
}
peerConnection.OnDataChannel(func(d *webrtc.DataChannel) {
defer func() {
if r := recover(); r != nil {
scopedLogger.Error().Interface("error", r).Msg("Recovered from panic in DataChannel handler")
}
}()
scopedLogger.Info().Str("label", d.Label()).Uint16("id", *d.ID()).Msg("New DataChannel")
switch d.Label() {
case "hidrpc":
session.HidChannel = d
d.OnMessage(func(msg webrtc.DataChannelMessage) {
l := scopedLogger.With().Int("length", len(msg.Data)).Logger()
// only log data if the log level is debug or lower
if scopedLogger.GetLevel() > zerolog.DebugLevel {
l = l.With().Str("data", string(msg.Data)).Logger()
}
if msg.IsString {
l.Warn().Msg("received string data in HID RPC message handler")
return
}
if len(msg.Data) < 1 {
l.Warn().Msg("received empty data in HID RPC message handler")
return
}
l.Trace().Msg("received data in HID RPC message handler")
// Enqueue to ensure ordered processing
queueIndex := hidrpc.GetQueueIndex(hidrpc.MessageType(msg.Data[0]))
if queueIndex >= len(session.hidQueue) || queueIndex < 0 {
l.Warn().Int("queueIndex", queueIndex).Msg("received data in HID RPC message handler, but queue index not found")
queueIndex = 3
}
queue := session.hidQueue[queueIndex]
if queue != nil {
queue <- msg
} else {
l.Warn().Int("queueIndex", queueIndex).Msg("received data in HID RPC message handler, but queue is nil")
return
}
})
case "rpc":
session.RPCChannel = d
d.OnMessage(func(msg webrtc.DataChannelMessage) {
@@ -198,6 +273,13 @@ func newSession(config SessionConfig) (*Session, error) {
close(session.rpcQueue)
session.rpcQueue = nil
}
// Stop HID RPC processor
for i := 0; i < len(session.hidQueue); i++ {
close(session.hidQueue[i])
session.hidQueue[i] = nil
}
if session.shouldUmountVirtualMedia {
if err := rpcUnmountImage(); err != nil {
scopedLogger.Warn().Err(err).Msg("unmount image failed on connection close")