From aa02a460c225d35bdbd48fa32442c423e7df8b21 Mon Sep 17 00:00:00 2001 From: alex Date: Sat, 10 Dec 2022 23:42:34 +0100 Subject: [PATCH] rabbitmq --- config.example.yml | 6 +- go.mod | 16 +++- go.sum | 85 ++++++++++++++++++ main.go | 30 +++++++ modules/cache/cache.go | 5 ++ modules/config/config.go | 9 +- modules/rabbitmq/rabbitmq.go | 149 +++++++++++++++++++++++++++++++ modules/structs/SocketClient.go | 7 ++ modules/structs/SocketMessage.go | 8 ++ socketserver/hub.go | 28 ++++++ socketserver/server.go | 38 ++++++++ 11 files changed, 378 insertions(+), 3 deletions(-) create mode 100644 modules/cache/cache.go create mode 100644 modules/rabbitmq/rabbitmq.go create mode 100644 modules/structs/SocketClient.go create mode 100644 modules/structs/SocketMessage.go create mode 100644 socketserver/hub.go create mode 100644 socketserver/server.go diff --git a/config.example.yml b/config.example.yml index a7dc7b9..8762bb4 100644 --- a/config.example.yml +++ b/config.example.yml @@ -1 +1,5 @@ -debug: false \ No newline at end of file +debug: false +rabbitmq: + host: "127.0.0.1" + username: "guest" + password: "guest" \ No newline at end of file diff --git a/go.mod b/go.mod index 40464f9..6f9ad2e 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,21 @@ module clickandjoin.app/websocketserver go 1.19 require ( + github.com/andybalholm/brotli v1.0.4 // indirect + github.com/fasthttp/websocket v1.5.0 // indirect + github.com/gofiber/fiber/v2 v2.40.1 // indirect + github.com/gofiber/websocket/v2 v2.1.2 // indirect + github.com/klauspost/compress v1.15.12 // indirect + github.com/mattn/go-colorable v0.1.13 // indirect + github.com/mattn/go-isatty v0.0.16 // indirect + github.com/mattn/go-runewidth v0.0.14 // indirect + github.com/rabbitmq/amqp091-go v1.5.0 // indirect + github.com/rivo/uniseg v0.4.3 // indirect + github.com/savsgio/gotils v0.0.0-20220530130905-52f3993e8d6d // indirect github.com/sirupsen/logrus v1.9.0 // indirect - golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8 // indirect + github.com/valyala/bytebufferpool v1.0.0 // indirect + github.com/valyala/fasthttp v1.43.0 // indirect + github.com/valyala/tcplisten v1.0.0 // indirect + golang.org/x/sys v0.3.0 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect ) diff --git a/go.sum b/go.sum index c2100e4..468ff71 100644 --- a/go.sum +++ b/go.sum @@ -1,13 +1,98 @@ +github.com/andybalholm/brotli v1.0.4 h1:V7DdXeJtZscaqfNuAdSRuRFzuiKlHSC/Zh3zl9qY3JY= +github.com/andybalholm/brotli v1.0.4/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/fasthttp/websocket v1.5.0 h1:B4zbe3xXyvIdnqjOZrafVFklCUq5ZLo/TqCt5JA1wLE= +github.com/fasthttp/websocket v1.5.0/go.mod h1:n0BlOQvJdPbTuBkZT0O5+jk/sp/1/VCzquR1BehI2F4= +github.com/gofiber/fiber/v2 v2.40.1 h1:pc7n9VVpGIqNsvg9IPLQhyFEMJL8gCs1kneH5D1pIl4= +github.com/gofiber/fiber/v2 v2.40.1/go.mod h1:Gko04sLksnHbzLSRBFWPFdzM9Ws9pRxvvIaohJK1dsk= +github.com/gofiber/websocket/v2 v2.1.2 h1:EulKyLB/fJgui5+6c8irwEnYQ9FRsrLZfkrq9OfTDGc= +github.com/gofiber/websocket/v2 v2.1.2/go.mod h1:S+sKWo0xeC7Wnz5h4/8f6D/NxsrLFIdWDYB3SyVO9pE= +github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/klauspost/compress v1.14.1/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= +github.com/klauspost/compress v1.15.9 h1:wKRjX6JRtDdrE9qwa4b/Cip7ACOshUI4smpCQanqjSY= +github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU= +github.com/klauspost/compress v1.15.12 h1:YClS/PImqYbn+UILDnqxQCZ3RehC9N318SU3kElDUEM= +github.com/klauspost/compress v1.15.12/go.mod h1:QPwzmACJjUTFsnSHH934V6woptycfrDDJnH7hvFVbGM= +github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= +github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= +github.com/mattn/go-isatty v0.0.16 h1:bq3VjFmv/sOjHtdEhmkEV4x1AJtvUvOJ2PFAZ5+peKQ= +github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= +github.com/mattn/go-runewidth v0.0.14 h1:+xnbZSEeDbOIg5/mE6JF0w6n9duR1l3/WmbinWVwUuU= +github.com/mattn/go-runewidth v0.0.14/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rabbitmq/amqp091-go v1.5.0 h1:VouyHPBu1CrKyJVfteGknGOGCzmOz0zcv/tONLkb7rg= +github.com/rabbitmq/amqp091-go v1.5.0/go.mod h1:JsV0ofX5f1nwOGafb8L5rBItt9GyhfQfcJj+oyz0dGg= +github.com/rivo/uniseg v0.2.0 h1:S1pD9weZBuJdFmowNwbpi7BJ8TNftyUImj/0WQi72jY= +github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= +github.com/rivo/uniseg v0.4.3 h1:utMvzDsuh3suAEnhH0RdHmoPbU648o6CvXxTx4SBMOw= +github.com/rivo/uniseg v0.4.3/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88= +github.com/savsgio/gotils v0.0.0-20211223103454-d0aaa54c5899/go.mod h1:oejLrk1Y/5zOF+c/aHtXqn3TFlzzbAgPWg8zBiAHDas= +github.com/savsgio/gotils v0.0.0-20220530130905-52f3993e8d6d h1:Q+gqLBOPkFGHyCJxXMRqtUgUbTjI8/Ze8vu8GGyNFwo= +github.com/savsgio/gotils v0.0.0-20220530130905-52f3993e8d6d/go.mod h1:Gy+0tqhJvgGlqnTF8CVGP0AaGRjwBtXs/a5PA0Y3+A4= github.com/sirupsen/logrus v1.9.0 h1:trlNQbNUG3OdDrDil03MCb1H2o9nJ1x4/5LYw7byDE0= github.com/sirupsen/logrus v1.9.0/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= +github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= +github.com/valyala/fasthttp v1.33.0/go.mod h1:KJRK/MXx0J+yd0c5hlR+s1tIHD72sniU8ZJjl97LIw4= +github.com/valyala/fasthttp v1.41.0 h1:zeR0Z1my1wDHTRiamBCXVglQdbUwgb9uWG3k1HQz6jY= +github.com/valyala/fasthttp v1.41.0/go.mod h1:f6VbjjoI3z1NDOZOv17o6RvtRSWxC77seBFc2uWtgiY= +github.com/valyala/fasthttp v1.43.0 h1:Gy4sb32C98fbzVWZlTM1oTMdLWGyvxR03VhM6cBIU4g= +github.com/valyala/fasthttp v1.43.0/go.mod h1:f6VbjjoI3z1NDOZOv17o6RvtRSWxC77seBFc2uWtgiY= +github.com/valyala/tcplisten v1.0.0 h1:rBHj/Xf+E1tRGZyWIWwJDiRY0zc1Js+CV5DqwacVSA8= +github.com/valyala/tcplisten v1.0.0/go.mod h1:T0xQ8SeCZGxckz9qRXTfG43PvQ/mcWh7FwZEA7Ioqkc= +github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= +go.uber.org/goleak v1.1.12/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20220112180741-5e0467b6c7ce/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= +golang.org/x/crypto v0.0.0-20220214200702-86341886e292/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= +golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= +golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= +golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= +golang.org/x/net v0.0.0-20220111093109-d55c255bac03/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= +golang.org/x/net v0.0.0-20220906165146-f3363e06e74c/go.mod h1:YDH+HFinaLZZlnHAfSS6ZXJJ9M9t4Dl22yv3iI2vPwk= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220111092808-5a964db01320/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8 h1:0A+M6Uqn+Eje4kHMK80dtF3JCXC4ykBgQG4Fe06QRhQ= golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab h1:2QkjZIsXupsJbJIdSjjUOgWK3aEtzyuh2mPt3l/CkeU= +golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.3.0 h1:w8ZOecv6NaNa/zC8944JTU3vz4u6Lagfk4RPQxv92NQ= +golang.org/x/sys v0.3.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/main.go b/main.go index 60444c8..003a670 100644 --- a/main.go +++ b/main.go @@ -1,7 +1,15 @@ package main import ( + "log" + "os" + "clickandjoin.app/websocketserver/modules/config" + "clickandjoin.app/websocketserver/modules/rabbitmq" + "clickandjoin.app/websocketserver/socketserver" + + "github.com/gofiber/fiber/v2" + "github.com/gofiber/websocket/v2" "github.com/sirupsen/logrus" ) @@ -13,8 +21,30 @@ func init() { } logrus.Println("Debug:", config.Cfg.Debug) + + go rabbitmq.Init() } func main() { + app := fiber.New() + app.Use("/", func(c *fiber.Ctx) error { + // IsWebSocketUpgrade returns true if the client + // requested upgrade to the WebSocket protocol. + if websocket.IsWebSocketUpgrade(c) { + c.Locals("allowed", true) + return c.Next() + } + return fiber.ErrUpgradeRequired + }) + + go socketserver.RunHub() + + socketserver.WebSocketServer(app) + + if len(os.Args) < 2 { + log.Fatalln("Please specify port") + } + + app.Listen("127.0.0.1:" + os.Args[1]) } diff --git a/modules/cache/cache.go b/modules/cache/cache.go new file mode 100644 index 0000000..b6431f0 --- /dev/null +++ b/modules/cache/cache.go @@ -0,0 +1,5 @@ +package cache + +import "clickandjoin.app/websocketserver/modules/structs" + +var WebSocketClients = make(map[string]*structs.SocketClient) diff --git a/modules/config/config.go b/modules/config/config.go index 467c1d1..acc6981 100644 --- a/modules/config/config.go +++ b/modules/config/config.go @@ -10,7 +10,14 @@ import ( var Cfg Config type Config struct { - Debug bool + Debug bool + RabbitMq RabbitMq +} + +type RabbitMq struct { + Host string + Username string + Password string } func LoadConfig() { diff --git a/modules/rabbitmq/rabbitmq.go b/modules/rabbitmq/rabbitmq.go new file mode 100644 index 0000000..264088b --- /dev/null +++ b/modules/rabbitmq/rabbitmq.go @@ -0,0 +1,149 @@ +package rabbitmq + +import ( + "fmt" + "log" + + "clickandjoin.app/websocketserver/modules/config" + amqp "github.com/rabbitmq/amqp091-go" + "github.com/sirupsen/logrus" +) + +var Conn *amqp.Connection +var Channel *amqp.Channel + +var MessagesQueue amqp.Queue + +func getConnectionString() string { + cfg := &config.Cfg.RabbitMq + + return fmt.Sprintf("amqp://%s:%s@%s/", cfg.Username, cfg.Password, cfg.Host) +} + +func Init() { + conn, err := amqp.Dial(getConnectionString()) + + if err != nil { + logrus.Fatalln("RabbitMQ connection failed, err:", err) + } + + ch, err := conn.Channel() + + if err != nil { + logrus.Fatalln(err) + } + + Channel = ch + + //declareQueue(ch, "messages", &MessagesQueue) + + err = ch.ExchangeDeclare( + "messages", // name + "fanout", // type + true, // durable + false, // auto-deleted + false, // internal + false, // no-wait + nil, // arguments + ) + + if err != nil { + logrus.Fatalln("Failed to declare exchange, err:", err) + } + + q, err := ch.QueueDeclare( + "", // name + false, // durable + false, // delete when unused + true, // exclusive + false, // no-wait + nil, // arguments + ) + + if err != nil { + logrus.Fatalln("Failed to declare queue, err:", err) + } + + err = ch.QueueBind( + q.Name, // queue name + "", // routing key + "messages", // exchange + false, + nil, + ) + + if err != nil { + logrus.Fatalln("Failed to bind queue, err:", err) + } + + msgs, err := ch.Consume( + q.Name, // queue + "", // consumer + true, // auto-ack + false, // exclusive + false, // no-local + false, // no-wait + nil, // args + ) + + if err != nil { + logrus.Fatalln("Failed to register consumer, err:", err) + } + + var forever chan struct{} + + go func() { + for d := range msgs { + log.Printf(" [x] %s", d.Body) + } + }() + + log.Printf(" [*] Waiting for messages") + <-forever + + /* + + msgs, err := ch.Consume( + MessagesQueue.Name, // queue + "", // consumer + true, // auto-ack + false, // exclusive + false, // no-local + false, // no-wait + nil, // args + ) + + if err != nil { + logrus.Fatalln("Failed to consume msgs, err:", err) + } + + var forever chan struct{} + + go func() { + for d := range msgs { + logrus.Println("Received msg", d.Body, string(d.Body)) + } + }() + + logrus.Println("Waiting for messages") + <-forever */ +} + +/* +func declareQueue(channel *amqp.Channel, name string, queue *amqp.Queue) { + q, err := channel.QueueDeclare( + name, // name + true, // durable + false, // delete when unused + false, // exclusive + false, // no-wait + nil, // arguments + ) + + if err != nil { + log.Fatalln("Failed to declare a queue", err) + } + + *queue = q +} +*/ diff --git a/modules/structs/SocketClient.go b/modules/structs/SocketClient.go new file mode 100644 index 0000000..1a8e512 --- /dev/null +++ b/modules/structs/SocketClient.go @@ -0,0 +1,7 @@ +package structs + +import "github.com/gofiber/websocket/v2" + +type SocketClient struct { + Conn *websocket.Conn +} diff --git a/modules/structs/SocketMessage.go b/modules/structs/SocketMessage.go new file mode 100644 index 0000000..f4a664c --- /dev/null +++ b/modules/structs/SocketMessage.go @@ -0,0 +1,8 @@ +package structs + +import "github.com/gofiber/websocket/v2" + +type SocketMessage struct { + Conn *websocket.Conn + Msg []byte +} diff --git a/socketserver/hub.go b/socketserver/hub.go new file mode 100644 index 0000000..41a28fb --- /dev/null +++ b/socketserver/hub.go @@ -0,0 +1,28 @@ +package socketserver + +import ( + "clickandjoin.app/websocketserver/modules/structs" + "github.com/gofiber/websocket/v2" + "github.com/sirupsen/logrus" +) + +var register = make(chan *structs.SocketClient) +var broadcast = make(chan structs.SocketMessage) +var unregister = make(chan *websocket.Conn) + +func RunHub() { + for { + select { + case newSocketClient := <-register: + logrus.Println("register", newSocketClient) + + newSocketClient.Conn.WriteMessage(websocket.TextMessage, []byte("Good afternoon")) + + case data := <-broadcast: + logrus.Println("data", data) + + case connection := <-unregister: + logrus.Println("unregister", connection) + } + } +} diff --git a/socketserver/server.go b/socketserver/server.go new file mode 100644 index 0000000..30d9405 --- /dev/null +++ b/socketserver/server.go @@ -0,0 +1,38 @@ +package socketserver + +import ( + "log" + + "clickandjoin.app/websocketserver/modules/structs" + "github.com/gofiber/fiber/v2" + "github.com/gofiber/websocket/v2" +) + +func WebSocketServer(app *fiber.App) { + app.Get("/", websocket.New(func(c *websocket.Conn) { + defer func() { + unregister <- c + c.Close() + }() + + register <- &structs.SocketClient{Conn: c} + + for { + messageType, msg, err := c.ReadMessage() + + if err != nil { + if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) { + log.Println("read error:", err) + } + + return + } + + if messageType == websocket.BinaryMessage { + broadcast <- structs.SocketMessage{Conn: c, Msg: msg} + } else { + log.Println("websocket message received of type", messageType) + } + } + })) +}