103 lines
3.2 KiB
Go
103 lines
3.2 KiB
Go
package socketserver
|
|
|
|
import (
|
|
"encoding/json"
|
|
"fmt"
|
|
|
|
"time"
|
|
|
|
"git.ex.umbach.dev/Alex/roese-utils/rslogger"
|
|
"git.ex.umbach.dev/LMS/libcore/models"
|
|
"github.com/gofiber/websocket/v2"
|
|
"github.com/rs/zerolog/log"
|
|
"lms.de/backend/modules/cache"
|
|
"lms.de/backend/modules/database"
|
|
"lms.de/backend/modules/logger"
|
|
"lms.de/backend/modules/structs"
|
|
"lms.de/backend/modules/utils"
|
|
)
|
|
|
|
var register = make(chan *structs.SocketClient)
|
|
var broadcast = make(chan structs.SocketMessage)
|
|
var unregister = make(chan *websocket.Conn)
|
|
|
|
func RunHub() {
|
|
for {
|
|
select {
|
|
case newSocketClient := <-register:
|
|
userId := fmt.Sprintf("%v", newSocketClient.Conn.Locals("userId"))
|
|
browserTabSession := fmt.Sprintf("%v", newSocketClient.Conn.Locals("browserTabSession"))
|
|
sessionId := fmt.Sprintf("%v", newSocketClient.Conn.Locals("sessionId"))
|
|
|
|
// close connection instantly if sessionId is empty
|
|
if sessionId == "<nil>" {
|
|
newSocketClient.SendUnauthorizedCloseMessage()
|
|
continue
|
|
}
|
|
|
|
newSocketClient.SessionId = sessionId
|
|
newSocketClient.BrowserTabSession = browserTabSession
|
|
newSocketClient.UserId = userId
|
|
newSocketClient.OrganizationId = fmt.Sprintf("%v", newSocketClient.Conn.Locals("organizationId"))
|
|
|
|
cache.AddSocketClient(newSocketClient)
|
|
|
|
// check that user session is not expired
|
|
var userSession models.UserSession
|
|
|
|
database.DB.Select("expires_at").First(&userSession, "session = ?", sessionId)
|
|
|
|
if !userSession.ExpiresAt.IsZero() && time.Now().After(userSession.ExpiresAt) {
|
|
newSocketClient.SendUnauthorizedCloseMessage()
|
|
database.DB.Delete(&models.UserSession{}, "session = ?", sessionId)
|
|
continue
|
|
}
|
|
|
|
// update session last used time
|
|
database.DB.Model(&models.UserSession{}).Where("session = ?", sessionId).Updates(models.UserSession{
|
|
LastUsedAt: time.Now(),
|
|
ExpiresAt: utils.GetSessionExpiresAtTime(),
|
|
})
|
|
|
|
// socketclients.UpdateUserSessionsForUser(userId, sessionId)
|
|
|
|
logger.AddSystemLog(rslogger.LogTypeInfo, "User %v has come online", userId)
|
|
|
|
case data := <-broadcast:
|
|
var receivedMessage structs.ReceivedMessage
|
|
|
|
if err := json.Unmarshal(data.Msg, &receivedMessage); err != nil {
|
|
log.Error().Msgf("Failed to unmarshal received msg, err: %s", err)
|
|
continue
|
|
}
|
|
|
|
log.Debug().Msgf("Received message: %v %v", receivedMessage, receivedMessage.Cmd)
|
|
|
|
switch receivedMessage.Cmd {
|
|
case utils.ReceivedCmdSubscribeToTopic:
|
|
cache.SubscribeSocketClientToTopic(receivedMessage.Body["browserTabSession"].(string), receivedMessage.Body["topic"].(string))
|
|
case 2:
|
|
break
|
|
default:
|
|
log.Error().Msgf("Received unknown message: %v", receivedMessage)
|
|
}
|
|
|
|
case connection := <-unregister:
|
|
cache.DeleteClientByConn(connection)
|
|
|
|
if connection.Locals("userId") != nil && connection.Locals("sessionId") != nil {
|
|
userId := connection.Locals("userId").(string)
|
|
// sessionId := connection.Locals("sessionId").(string)
|
|
|
|
database.DB.Model(&models.User{}).Where("id = ?", userId).Updates(models.User{
|
|
LastOnlineAt: time.Now(),
|
|
})
|
|
|
|
// socketclients.UpdateUserSessionsForUser(userId, sessionId)
|
|
|
|
logger.AddSystemLog(rslogger.LogTypeInfo, "User %s has gone offline", userId)
|
|
}
|
|
}
|
|
}
|
|
}
|