package rabbitmq import ( "encoding/json" "fmt" "git.umbach.dev/picture-storage-handler/modules/config" "git.umbach.dev/picture-storage-handler/modules/picture" "git.umbach.dev/picture-storage-handler/modules/structs" log "github.com/sirupsen/logrus" "github.com/streadway/amqp" ) var Conn *amqp.Connection var Channel *amqp.Channel var PictureQueue amqp.Queue func getConnectionString() string { cfg := &config.Cfg.RabbitMq return fmt.Sprintf("amqp://%s:%s@%s/", cfg.Username, cfg.Password, cfg.Host) } func Init() { conn, err := amqp.Dial(getConnectionString()) if err != nil { log.Fatalln("Failed to connect to RabbitMQ", err) } Conn = conn ch, err := conn.Channel() if err != nil { log.Fatalln("Failed to open a channel", err) } //defer ch.Close() Channel = ch err = ch.Qos( 1, // prefetch count 0, // prefetch size false, // global ) if err != nil { log.Fatalln("Failed to set QoS", err) } msgs, err := ch.Consume( "pictures", // queue "", // consumer false, // auto-ack false, // exclusive false, // no-local false, // no-wait nil, // args ) if err != nil { log.Fatalln("Consume err", err) } forever := make(chan []byte) go func() { for d := range msgs { log.Infoln("Received Message:", len(d.Body)) pictureMessage := structs.RabbitmqPictureMessage{} if err := json.Unmarshal(d.Body, &pictureMessage); err != nil { log.Fatal(err) } if err := picture.Save(pictureMessage); err == nil { d.Ack(false) } } }() fmt.Println("Successfully Connected to our RabbitMQ Instance") fmt.Println(" [*] - Waiting for messages") <-forever }