package socketserver import ( "encoding/json" "fmt" "time" "git.ex.umbach.dev/Alex/roese-utils/rslogger" "git.ex.umbach.dev/LMS/libcore/models" "github.com/gofiber/websocket/v2" "github.com/rs/zerolog/log" "lms.de/backend/modules/cache" "lms.de/backend/modules/database" "lms.de/backend/modules/logger" "lms.de/backend/modules/structs" "lms.de/backend/modules/utils" ) var register = make(chan *structs.SocketClient) var broadcast = make(chan structs.SocketMessage) var unregister = make(chan *websocket.Conn) func RunHub() { for { select { case newSocketClient := <-register: userId := fmt.Sprintf("%v", newSocketClient.Conn.Locals("userId")) browserTabSession := fmt.Sprintf("%v", newSocketClient.Conn.Locals("browserTabSession")) sessionId := fmt.Sprintf("%v", newSocketClient.Conn.Locals("sessionId")) // close connection instantly if sessionId is empty if sessionId == "" { newSocketClient.SendUnauthorizedCloseMessage() continue } newSocketClient.SessionId = sessionId newSocketClient.BrowserTabSession = browserTabSession newSocketClient.UserId = userId newSocketClient.OrganizationId = fmt.Sprintf("%v", newSocketClient.Conn.Locals("organizationId")) cache.AddSocketClient(newSocketClient) // check that user session is not expired var userSession models.UserSession database.DB.Select("expires_at").First(&userSession, "session = ?", sessionId) if !userSession.ExpiresAt.IsZero() && time.Now().After(userSession.ExpiresAt) { newSocketClient.SendUnauthorizedCloseMessage() database.DB.Delete(&models.UserSession{}, "session = ?", sessionId) continue } // update session last used time database.DB.Model(&models.UserSession{}).Where("session = ?", sessionId).Updates(models.UserSession{ LastUsedAt: time.Now(), ExpiresAt: utils.GetSessionExpiresAtTime(), }) // socketclients.UpdateUserSessionsForUser(userId, sessionId) logger.AddSystemLog(rslogger.LogTypeInfo, "User %v has come online", userId) case data := <-broadcast: var receivedMessage structs.ReceivedMessage if err := json.Unmarshal(data.Msg, &receivedMessage); err != nil { log.Error().Msgf("Failed to unmarshal received msg, err: %s", err) continue } log.Debug().Msgf("Received message: %v %v", receivedMessage, receivedMessage.Cmd) switch receivedMessage.Cmd { case utils.ReceivedCmdSubscribeToTopic: cache.SubscribeSocketClientToTopic(receivedMessage.Body["browserTabSession"].(string), receivedMessage.Body["topic"].(string)) case 2: break default: log.Error().Msgf("Received unknown message: %v", receivedMessage) } case connection := <-unregister: cache.DeleteClientByConn(connection) if connection.Locals("userId") != nil && connection.Locals("sessionId") != nil { userId := connection.Locals("userId").(string) // sessionId := connection.Locals("sessionId").(string) database.DB.Model(&models.User{}).Where("id = ?", userId).Updates(models.User{ LastOnlineAt: time.Now(), }) // socketclients.UpdateUserSessionsForUser(userId, sessionId) logger.AddSystemLog(rslogger.LogTypeInfo, "User %s has gone offline", userId) } } } }