commit edd379cfd1124f0a15634f3ea5bc03c554626534 Author: alex Date: Wed Dec 21 20:12:12 2022 +0100 sse server diff --git a/config.example.yml b/config.example.yml new file mode 100644 index 0000000..8762bb4 --- /dev/null +++ b/config.example.yml @@ -0,0 +1,5 @@ +debug: false +rabbitmq: + host: "127.0.0.1" + username: "guest" + password: "guest" \ No newline at end of file diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..d59c33d --- /dev/null +++ b/go.mod @@ -0,0 +1,22 @@ +module clickandjoin.app/serversenteventsserver + +go 1.19 + +require ( + github.com/andybalholm/brotli v1.0.4 // indirect + github.com/gofiber/adaptor/v2 v2.1.30 // indirect + github.com/gofiber/fiber/v2 v2.40.1 // indirect + github.com/google/uuid v1.3.0 // indirect + github.com/klauspost/compress v1.15.9 // indirect + github.com/mattn/go-colorable v0.1.13 // indirect + github.com/mattn/go-isatty v0.0.16 // indirect + github.com/mattn/go-runewidth v0.0.14 // indirect + github.com/rabbitmq/amqp091-go v1.5.0 // indirect + github.com/rivo/uniseg v0.2.0 // indirect + github.com/sirupsen/logrus v1.9.0 // indirect + github.com/valyala/bytebufferpool v1.0.0 // indirect + github.com/valyala/fasthttp v1.41.0 // indirect + github.com/valyala/tcplisten v1.0.0 // indirect + golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab // indirect + gopkg.in/yaml.v2 v2.4.0 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..b96580d --- /dev/null +++ b/go.sum @@ -0,0 +1,81 @@ +github.com/andybalholm/brotli v1.0.4 h1:V7DdXeJtZscaqfNuAdSRuRFzuiKlHSC/Zh3zl9qY3JY= +github.com/andybalholm/brotli v1.0.4/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/gofiber/adaptor/v2 v2.1.30 h1:ak5sCY8HbnafdMjvxUhS8wz5JWghwfxhnvig3/ACjVo= +github.com/gofiber/adaptor/v2 v2.1.30/go.mod h1:20KxTjpAea1f+9mcMwManJWKvWl37YxW1yuNGoNoGS4= +github.com/gofiber/fiber/v2 v2.40.1 h1:pc7n9VVpGIqNsvg9IPLQhyFEMJL8gCs1kneH5D1pIl4= +github.com/gofiber/fiber/v2 v2.40.1/go.mod h1:Gko04sLksnHbzLSRBFWPFdzM9Ws9pRxvvIaohJK1dsk= +github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= +github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/klauspost/compress v1.15.9 h1:wKRjX6JRtDdrE9qwa4b/Cip7ACOshUI4smpCQanqjSY= +github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU= +github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= +github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= +github.com/mattn/go-isatty v0.0.16 h1:bq3VjFmv/sOjHtdEhmkEV4x1AJtvUvOJ2PFAZ5+peKQ= +github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= +github.com/mattn/go-runewidth v0.0.14 h1:+xnbZSEeDbOIg5/mE6JF0w6n9duR1l3/WmbinWVwUuU= +github.com/mattn/go-runewidth v0.0.14/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rabbitmq/amqp091-go v1.5.0 h1:VouyHPBu1CrKyJVfteGknGOGCzmOz0zcv/tONLkb7rg= +github.com/rabbitmq/amqp091-go v1.5.0/go.mod h1:JsV0ofX5f1nwOGafb8L5rBItt9GyhfQfcJj+oyz0dGg= +github.com/rivo/uniseg v0.2.0 h1:S1pD9weZBuJdFmowNwbpi7BJ8TNftyUImj/0WQi72jY= +github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= +github.com/sirupsen/logrus v1.9.0 h1:trlNQbNUG3OdDrDil03MCb1H2o9nJ1x4/5LYw7byDE0= +github.com/sirupsen/logrus v1.9.0/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= +github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= +github.com/valyala/fasthttp v1.41.0 h1:zeR0Z1my1wDHTRiamBCXVglQdbUwgb9uWG3k1HQz6jY= +github.com/valyala/fasthttp v1.41.0/go.mod h1:f6VbjjoI3z1NDOZOv17o6RvtRSWxC77seBFc2uWtgiY= +github.com/valyala/tcplisten v1.0.0 h1:rBHj/Xf+E1tRGZyWIWwJDiRY0zc1Js+CV5DqwacVSA8= +github.com/valyala/tcplisten v1.0.0/go.mod h1:T0xQ8SeCZGxckz9qRXTfG43PvQ/mcWh7FwZEA7Ioqkc= +github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= +go.uber.org/goleak v1.1.12/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20220214200702-86341886e292/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= +golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= +golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= +golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= +golang.org/x/net v0.0.0-20220906165146-f3363e06e74c/go.mod h1:YDH+HFinaLZZlnHAfSS6ZXJJ9M9t4Dl22yv3iI2vPwk= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8 h1:0A+M6Uqn+Eje4kHMK80dtF3JCXC4ykBgQG4Fe06QRhQ= +golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab h1:2QkjZIsXupsJbJIdSjjUOgWK3aEtzyuh2mPt3l/CkeU= +golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= +gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/main.go b/main.go new file mode 100644 index 0000000..3b6b05f --- /dev/null +++ b/main.go @@ -0,0 +1,94 @@ +package main + +import ( + "bufio" + "fmt" + "os" + "time" + + "clickandjoin.app/serversenteventsserver/modules/cache" + "clickandjoin.app/serversenteventsserver/modules/config" + "clickandjoin.app/serversenteventsserver/modules/rabbitmq" + "clickandjoin.app/serversenteventsserver/modules/structs" + "github.com/gofiber/fiber/v2" + "github.com/gofiber/fiber/v2/middleware/cors" + "github.com/google/uuid" + "github.com/sirupsen/logrus" + "github.com/valyala/fasthttp" +) + +func init() { + config.LoadConfig() + + if config.Cfg.Debug { + logrus.SetLevel(logrus.DebugLevel) + } + + logrus.Println("Debug:", config.Cfg.Debug) + + go rabbitmq.Init() +} + +func main() { + app := fiber.New() + + // wait so that rabbitmq can connect + // TODO: better way to handle this + time.Sleep(500 * time.Millisecond) + + if len(os.Args) < 2 { + logrus.Fatalln("Please specify port") + } + + // TODO: origin + // CORS for external resources + app.Use(cors.New(cors.Config{ + AllowOrigins: "*", + AllowHeaders: "Cache-Control", + AllowCredentials: true, + })) + + app.Get("/sse", func(c *fiber.Ctx) error { + c.Set("Content-Type", "text/event-stream") + c.Set("Cache-Control", "no-cache") + c.Set("Connection", "keep-alive") + c.Set("Transfer-Encoding", "chunked") + + c.Context().SetBodyStreamWriter(fasthttp.StreamWriter(func(w *bufio.Writer) { + var sseclient structs.SSEClient + + sseclient.MessageChan = make(chan structs.SSEClientChanMessage) + + clientId := uuid.New() + + cache.SSEClients[clientId] = sseclient + + logrus.Debugln("NEW CLIENT ID:", clientId) + logrus.Debugln(len(cache.SSEClients), "CLIENTS CONNECTED") + + for message := range sseclient.MessageChan { + fmt.Fprintf(w, "data: %s\n\n", message.Message) + + err := w.Flush() + + if err != nil { + // Refreshing page in web browser will establish a new + // SSE connection, but only (the last) one is alive, so + // dead connections must be closed here. + + for id, sseClient := range cache.SSEClients { + if id == message.ClientId { + close(sseClient.MessageChan) + delete(cache.SSEClients, id) + logrus.Debugln("DELETE clientId:", id) + } + } + } + } + })) + + return nil + }) + + app.Listen("127.0.0.1:" + os.Args[1]) +} diff --git a/modules/cache/cache.go b/modules/cache/cache.go new file mode 100644 index 0000000..7cd03d6 --- /dev/null +++ b/modules/cache/cache.go @@ -0,0 +1,8 @@ +package cache + +import ( + "clickandjoin.app/serversenteventsserver/modules/structs" + "github.com/google/uuid" +) + +var SSEClients = make(map[uuid.UUID]structs.SSEClient) diff --git a/modules/config/config.go b/modules/config/config.go new file mode 100644 index 0000000..acc6981 --- /dev/null +++ b/modules/config/config.go @@ -0,0 +1,33 @@ +package config + +import ( + "os" + + "github.com/sirupsen/logrus" + "gopkg.in/yaml.v2" +) + +var Cfg Config + +type Config struct { + Debug bool + RabbitMq RabbitMq +} + +type RabbitMq struct { + Host string + Username string + Password string +} + +func LoadConfig() { + data, err := os.ReadFile("config.yml") + + if err != nil { + logrus.Fatalln("Failed to read config file, err:", err) + } + + if err := yaml.Unmarshal(data, &Cfg); err != nil { + logrus.Fatalln("Failed to unmarshal config file, err:", err) + } +} diff --git a/modules/rabbitmq/rabbitmq.go b/modules/rabbitmq/rabbitmq.go new file mode 100644 index 0000000..d921b0d --- /dev/null +++ b/modules/rabbitmq/rabbitmq.go @@ -0,0 +1,101 @@ +package rabbitmq + +import ( + "fmt" + + "clickandjoin.app/serversenteventsserver/modules/cache" + "clickandjoin.app/serversenteventsserver/modules/config" + "clickandjoin.app/serversenteventsserver/modules/structs" + "clickandjoin.app/serversenteventsserver/modules/utils" + amqp "github.com/rabbitmq/amqp091-go" + "github.com/sirupsen/logrus" +) + +var Conn *amqp.Connection +var Channel *amqp.Channel + +const exchangeBroadcastMessages = "cnj.api.broadcast.messages" + +func getConnectionString() string { + cfg := &config.Cfg.RabbitMq + + return fmt.Sprintf("amqp://%s:%s@%s/", cfg.Username, cfg.Password, cfg.Host) +} + +func Init() { + conn, err := amqp.Dial(getConnectionString()) + + if err != nil { + logrus.Fatalln("RabbitMQ connection failed, err:", err) + } + + ch, err := conn.Channel() + + if err != nil { + logrus.Fatalln(err) + } + + Channel = ch + + /* + * api broadcast messages + */ + + q, err := ch.QueueDeclare( + "", // name + false, // durable + false, // delete when unused + true, // exclusive + false, // no-wait + nil, // arguments + ) + + if err != nil { + logrus.Fatalln("Failed to declare queue, err:", err) + } + + err = ch.QueueBind( + q.Name, // queue name + "", // routing key + exchangeBroadcastMessages, // exchange + false, + nil, + ) + + if err != nil { + logrus.Fatalln("Failed to declare queue, err:", err) + } + + msgs, err := ch.Consume( + q.Name, // queue + "", // consumer + true, // auto-ack + false, // exclusive + false, // no-local + false, // no-wait + nil, // args + ) + + if err != nil { + logrus.Fatalln("Failed to register consumer, err:", err) + } + + go func() { + for d := range msgs { + var receivedMessage structs.ReceivedMessage + + err = utils.UnmarshalReceivedMessage(d.Body, &receivedMessage) + + if err != nil { + logrus.Errorln("Failed to unmarshal received msg, err:", err) + } + + logrus.Debugln("RABBITMQ RECEIVED BROADCAST MESSAGE:", receivedMessage) + logrus.Debugln("SENDING MSG TO", len(cache.SSEClients), "CLIENTS") + + for id, sseClient := range cache.SSEClients { + sseClient.MessageChan <- structs.SSEClientChanMessage{ClientId: id, Message: string(d.Body)} + } + } + }() +} diff --git a/modules/structs/message.go b/modules/structs/message.go new file mode 100644 index 0000000..38f578f --- /dev/null +++ b/modules/structs/message.go @@ -0,0 +1,7 @@ +package structs + +type ReceivedMessage struct { + Cmd int + Rec string // represent receiver user id + Body any +} diff --git a/modules/structs/sseclient.go b/modules/structs/sseclient.go new file mode 100644 index 0000000..398983a --- /dev/null +++ b/modules/structs/sseclient.go @@ -0,0 +1,12 @@ +package structs + +import "github.com/google/uuid" + +type SSEClient struct { + MessageChan chan SSEClientChanMessage +} + +type SSEClientChanMessage struct { + ClientId uuid.UUID + Message string +} diff --git a/modules/utils/utils.go b/modules/utils/utils.go new file mode 100644 index 0000000..3c28159 --- /dev/null +++ b/modules/utils/utils.go @@ -0,0 +1,30 @@ +package utils + +import ( + "encoding/json" + + "clickandjoin.app/serversenteventsserver/modules/structs" + "github.com/sirupsen/logrus" +) + +func MarshalMessage(message any) (marshaledMessage []byte, err error) { + marshaledMessage, err = json.Marshal(message) + + if err != nil { + logrus.Errorln("Failed to marshal send message, err:", err) + return nil, err + } + + return marshaledMessage, nil +} + +func UnmarshalReceivedMessage(body []byte, receivedMessage *structs.ReceivedMessage) error { + err := json.Unmarshal(body, &receivedMessage) + + if err != nil { + logrus.Errorln("Failed to unmarshal received message, err:", err) + return err + } + + return nil +}