refactor: use structured logging

This commit is contained in:
Siyuan Miao
2025-04-10 17:05:34 +02:00
parent 8f6e64fd9c
commit 4c37f7e079
32 changed files with 553 additions and 341 deletions

86
web.go
View File

@@ -15,11 +15,13 @@ import (
"github.com/coder/websocket"
"github.com/coder/websocket/wsjson"
gin_logger "github.com/gin-contrib/logger"
"github.com/gin-gonic/gin"
"github.com/google/uuid"
"github.com/pion/webrtc/v4"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/rs/zerolog"
"golang.org/x/crypto/bcrypt"
)
@@ -65,7 +67,11 @@ func setupRouter() *gin.Engine {
gin.SetMode(gin.ReleaseMode)
gin.DisableConsoleColor()
r := gin.Default()
r.Use(gin_logger.SetLogger(
gin_logger.WithLogger(func(*gin.Context, zerolog.Logger) zerolog.Logger {
return ginLogger
}),
))
staticFS, _ := fs.Sub(staticFiles, "static")
// Add a custom middleware to set cache headers for images
@@ -181,16 +187,22 @@ var (
)
func handleLocalWebRTCSignal(c *gin.Context) {
cloudLogger.Infof("new websocket connection established")
// get the source from the request
source := c.ClientIP()
scopedLogger := websocketLogger.With().
Str("component", "websocket").
Str("source", source).
Str("sourceType", "local").
Logger()
scopedLogger.Info().Msg("new websocket connection established")
// Create WebSocket options with InsecureSkipVerify to bypass origin check
wsOptions := &websocket.AcceptOptions{
InsecureSkipVerify: true, // Allow connections from any origin
OnPingReceived: func(ctx context.Context, payload []byte) bool {
websocketLogger.Infof("ping frame received: %v, source: %s, sourceType: local", payload, source)
scopedLogger.Info().Bytes("payload", payload).Msg("ping frame received")
metricConnectionTotalPingReceivedCount.WithLabelValues("local", source).Inc()
metricConnectionLastPingReceivedTimestamp.WithLabelValues("local", source).SetToCurrentTime()
@@ -214,14 +226,14 @@ func handleLocalWebRTCSignal(c *gin.Context) {
return
}
err = handleWebRTCSignalWsMessages(wsCon, false, source)
err = handleWebRTCSignalWsMessages(wsCon, false, source, &scopedLogger)
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
return
}
}
func handleWebRTCSignalWsMessages(wsCon *websocket.Conn, isCloudConnection bool, source string) error {
func handleWebRTCSignalWsMessages(wsCon *websocket.Conn, isCloudConnection bool, source string, scopedLogger *zerolog.Logger) error {
runCtx, cancelRun := context.WithCancel(context.Background())
defer cancelRun()
@@ -236,21 +248,13 @@ func handleWebRTCSignalWsMessages(wsCon *websocket.Conn, isCloudConnection bool,
sourceType = "local"
}
// probably we can use a better logging framework here
logInfof := func(format string, args ...interface{}) {
args = append(args, source, sourceType, connectionID)
websocketLogger.Infof(format+", source: %s, sourceType: %s, id: %s", args...)
}
logWarnf := func(format string, args ...interface{}) {
args = append(args, source, sourceType, connectionID)
websocketLogger.Warnf(format+", source: %s, sourceType: %s, id: %s", args...)
}
logTracef := func(format string, args ...interface{}) {
args = append(args, source, sourceType, connectionID)
websocketLogger.Tracef(format+", source: %s, sourceType: %s, id: %s", args...)
}
l := scopedLogger.With().
Str("source", source).
Str("sourceType", sourceType).
Str("connectionID", connectionID).
Logger()
logInfof("new websocket connection established")
l.Info().Msg("new websocket connection established")
go func() {
for {
@@ -258,9 +262,9 @@ func handleWebRTCSignalWsMessages(wsCon *websocket.Conn, isCloudConnection bool,
if ctxErr := runCtx.Err(); ctxErr != nil {
if !errors.Is(ctxErr, context.Canceled) {
logWarnf("websocket connection closed: %v", ctxErr)
l.Warn().Str("error", ctxErr.Error()).Msg("websocket connection closed")
} else {
logTracef("websocket connection closed as the context was canceled: %v")
l.Trace().Str("error", ctxErr.Error()).Msg("websocket connection closed as the context was canceled")
}
return
}
@@ -271,11 +275,11 @@ func handleWebRTCSignalWsMessages(wsCon *websocket.Conn, isCloudConnection bool,
metricConnectionPingDuration.WithLabelValues(sourceType, source).Observe(v)
}))
logTracef("sending ping frame")
l.Trace().Msg("sending ping frame")
err := wsCon.Ping(runCtx)
if err != nil {
logWarnf("websocket ping error: %v", err)
l.Warn().Str("error", err.Error()).Msg("websocket ping error")
cancelRun()
return
}
@@ -286,7 +290,7 @@ func handleWebRTCSignalWsMessages(wsCon *websocket.Conn, isCloudConnection bool,
metricConnectionTotalPingSentCount.WithLabelValues(sourceType, source).Inc()
metricConnectionLastPingTimestamp.WithLabelValues(sourceType, source).SetToCurrentTime()
logTracef("received pong frame, duration: %v", duration)
l.Trace().Str("duration", duration.String()).Msg("received pong frame")
}
}()
@@ -302,7 +306,7 @@ func handleWebRTCSignalWsMessages(wsCon *websocket.Conn, isCloudConnection bool,
if err == nil {
continue
}
cloudLogger.Infof("disconnecting from cloud due to: %v", err)
cloudLogger.Info().Err(err).Msg("disconnecting from cloud due to")
cancelRun()
}
}()
@@ -311,7 +315,7 @@ func handleWebRTCSignalWsMessages(wsCon *websocket.Conn, isCloudConnection bool,
for {
typ, msg, err := wsCon.Read(runCtx)
if err != nil {
logWarnf("websocket read error: %v", err)
l.Warn().Str("error", err.Error()).Msg("websocket read error")
return err
}
if typ != websocket.MessageText {
@@ -325,10 +329,10 @@ func handleWebRTCSignalWsMessages(wsCon *websocket.Conn, isCloudConnection bool,
}
if bytes.Equal(msg, pingMessage) {
logInfof("ping message received: %s", string(msg))
l.Info().Str("message", string(msg)).Msg("ping message received")
err = wsCon.Write(context.Background(), websocket.MessageText, pongMessage)
if err != nil {
logWarnf("unable to write pong message: %v", err)
l.Warn().Str("error", err.Error()).Msg("unable to write pong message")
return err
}
@@ -340,55 +344,55 @@ func handleWebRTCSignalWsMessages(wsCon *websocket.Conn, isCloudConnection bool,
err = json.Unmarshal(msg, &message)
if err != nil {
logWarnf("unable to parse ws message: %v", err)
l.Warn().Str("error", err.Error()).Msg("unable to parse ws message")
continue
}
if message.Type == "offer" {
logInfof("new session request received")
l.Info().Msg("new session request received")
var req WebRTCSessionRequest
err = json.Unmarshal(message.Data, &req)
if err != nil {
logWarnf("unable to parse session request data: %v", err)
l.Warn().Str("error", err.Error()).Msg("unable to parse session request data")
continue
}
if req.OidcGoogle != "" {
logInfof("new session request with OIDC Google: %v", req.OidcGoogle)
l.Info().Str("oidcGoogle", req.OidcGoogle).Msg("new session request with OIDC Google")
}
metricConnectionSessionRequestCount.WithLabelValues(sourceType, source).Inc()
metricConnectionLastSessionRequestTimestamp.WithLabelValues(sourceType, source).SetToCurrentTime()
err = handleSessionRequest(runCtx, wsCon, req, isCloudConnection, source)
if err != nil {
logWarnf("error starting new session: %v", err)
l.Warn().Str("error", err.Error()).Msg("error starting new session")
continue
}
} else if message.Type == "new-ice-candidate" {
logInfof("The client sent us a new ICE candidate: %v", string(message.Data))
l.Info().Str("data", string(message.Data)).Msg("The client sent us a new ICE candidate")
var candidate webrtc.ICECandidateInit
// Attempt to unmarshal as a ICECandidateInit
if err := json.Unmarshal(message.Data, &candidate); err != nil {
logWarnf("unable to parse incoming ICE candidate data: %v", string(message.Data))
l.Warn().Str("error", err.Error()).Msg("unable to parse incoming ICE candidate data")
continue
}
if candidate.Candidate == "" {
logWarnf("empty incoming ICE candidate, skipping")
l.Warn().Msg("empty incoming ICE candidate, skipping")
continue
}
logInfof("unmarshalled incoming ICE candidate: %v", candidate)
l.Info().Str("data", fmt.Sprintf("%v", candidate)).Msg("unmarshalled incoming ICE candidate")
if currentSession == nil {
logInfof("no current session, skipping incoming ICE candidate")
l.Warn().Msg("no current session, skipping incoming ICE candidate")
continue
}
logInfof("adding incoming ICE candidate to current session: %v", candidate)
l.Info().Str("data", fmt.Sprintf("%v", candidate)).Msg("adding incoming ICE candidate to current session")
if err = currentSession.peerConnection.AddICECandidate(candidate); err != nil {
logWarnf("failed to add incoming ICE candidate to our peer connection: %v", err)
l.Warn().Str("error", err.Error()).Msg("failed to add incoming ICE candidate to our peer connection")
}
}
}