server/dao/valkey/NotificationDAO.go

66 lines
1.5 KiB
Go

package valkey
import (
"context"
"encoding/json"
"strconv"
"git.tek.govt.hu/dowerx/chat/server/model"
"git.tek.govt.hu/dowerx/chat/server/util"
"github.com/valkey-io/valkey-go"
)
const CHANNEL_PREFIX string = "channel:"
type NotificationDAOVK struct {
vkDAO
}
func (d NotificationDAOVK) SendMessage(message model.Message) *util.ChatError {
data, err := json.Marshal(message)
if err != nil {
return util.MakeError(err, util.GENERAL_ERROR)
}
cmd := (*d.vk).B().Publish().Channel(CHANNEL_PREFIX + strconv.Itoa(message.Channel)).Message(string(data)).Build()
return util.MakeError((*d.vk).Do(context.Background(), cmd).Error(), util.DATABASE_QUERY_FAULT)
}
func (d NotificationDAOVK) SubscribeToChannel(ctx context.Context, id int) (<-chan model.Message, <-chan *util.ChatError) {
cmd := (*d.vk).B().Subscribe().Channel(CHANNEL_PREFIX + strconv.Itoa(id)).Build()
var messages chan model.Message = make(chan model.Message, 1)
var errChan chan *util.ChatError = make(chan *util.ChatError, 1)
go func() {
err := (*d.vk).Receive(ctx, cmd, func(msg valkey.PubSubMessage) {
go func() {
message := model.Message{}
err := json.Unmarshal([]byte(msg.Message), &message)
if err != nil {
return
}
messages <- message
}()
})
defer close(messages)
errChan <- util.MakeError(err, util.DATABASE_QUERY_FAULT)
}()
return messages, errChan
}
func MakeNotificationDAO() (NotificationDAOVK, *util.ChatError) {
dao := NotificationDAOVK{}
conn, err := getClient()
if err != nil {
return dao, err
}
dao.vk = conn
return dao, nil
}