message and notification dao
This commit is contained in:
parent
d5ae19a0a4
commit
02098bd2cc
@ -9,6 +9,7 @@ import (
|
|||||||
var userDAO IUserDAO
|
var userDAO IUserDAO
|
||||||
var channelDAO IChannelDAO
|
var channelDAO IChannelDAO
|
||||||
var sessionDAO ISessionDAO
|
var sessionDAO ISessionDAO
|
||||||
|
var messageDAO IMessageDAO
|
||||||
|
|
||||||
func GetUserDAO() (IUserDAO, *util.ChatError) {
|
func GetUserDAO() (IUserDAO, *util.ChatError) {
|
||||||
if userDAO == nil {
|
if userDAO == nil {
|
||||||
@ -36,9 +37,22 @@ func GetChannelDAO() (IChannelDAO, *util.ChatError) {
|
|||||||
return channelDAO, nil
|
return channelDAO, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func GetMessageDAO() (IMessageDAO, *util.ChatError) {
|
||||||
|
if messageDAO == nil {
|
||||||
|
dao, err := postgres.MakeMessageDAO()
|
||||||
|
if err != nil {
|
||||||
|
return messageDAO, err
|
||||||
|
}
|
||||||
|
|
||||||
|
messageDAO = dao
|
||||||
|
}
|
||||||
|
|
||||||
|
return messageDAO, nil
|
||||||
|
}
|
||||||
|
|
||||||
func GetSessionDAO() (ISessionDAO, *util.ChatError) {
|
func GetSessionDAO() (ISessionDAO, *util.ChatError) {
|
||||||
if sessionDAO == nil {
|
if sessionDAO == nil {
|
||||||
dao, err := valkey.MakeUserDAO()
|
dao, err := valkey.MakeSessionDAO()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return sessionDAO, err
|
return sessionDAO, err
|
||||||
}
|
}
|
||||||
|
@ -11,5 +11,5 @@ type IChannelDAO interface {
|
|||||||
List() ([]model.Channel, *util.ChatError)
|
List() ([]model.Channel, *util.ChatError)
|
||||||
ListAvailableChannels(userID int) ([]model.Channel, *util.ChatError)
|
ListAvailableChannels(userID int) ([]model.Channel, *util.ChatError)
|
||||||
Update(channel model.Channel) *util.ChatError
|
Update(channel model.Channel) *util.ChatError
|
||||||
Delete(channel model.Channel) *util.ChatError
|
Delete(id int) *util.ChatError
|
||||||
}
|
}
|
||||||
|
14
dao/IMessageDAD.go
Normal file
14
dao/IMessageDAD.go
Normal file
@ -0,0 +1,14 @@
|
|||||||
|
package dao
|
||||||
|
|
||||||
|
import (
|
||||||
|
"git.tek.govt.hu/dowerx/chat/server/model"
|
||||||
|
"git.tek.govt.hu/dowerx/chat/server/util"
|
||||||
|
)
|
||||||
|
|
||||||
|
type IMessageDAO interface {
|
||||||
|
Create(message model.Message) *util.ChatError
|
||||||
|
Read(id int) (model.Message, *util.ChatError)
|
||||||
|
List(channel model.Channel) ([]model.Message, *util.ChatError)
|
||||||
|
Update(message model.Message) *util.ChatError
|
||||||
|
Delete(id int) *util.ChatError
|
||||||
|
}
|
13
dao/INotificationDAO.go
Normal file
13
dao/INotificationDAO.go
Normal file
@ -0,0 +1,13 @@
|
|||||||
|
package dao
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"git.tek.govt.hu/dowerx/chat/server/model"
|
||||||
|
"git.tek.govt.hu/dowerx/chat/server/util"
|
||||||
|
)
|
||||||
|
|
||||||
|
type INotification interface {
|
||||||
|
SendMessage(ctx context.Context, message model.Message) *util.ChatError
|
||||||
|
SubscribeToChannel(ctx context.Context, id int) (<-chan model.Message, func(), *util.ChatError)
|
||||||
|
}
|
@ -60,7 +60,7 @@ func (d ChannelDAOPG) List() ([]model.Channel, *util.ChatError) {
|
|||||||
|
|
||||||
// ListAvailableChannels returns channels that the given user has access to
|
// ListAvailableChannels returns channels that the given user has access to
|
||||||
func (d ChannelDAOPG) ListAvailableChannels(userID int) ([]model.Channel, *util.ChatError) {
|
func (d ChannelDAOPG) ListAvailableChannels(userID int) ([]model.Channel, *util.ChatError) {
|
||||||
rows, err := d.db.Queryx(`select "channel_id" as "id", "channel_name" as "name", "channel_description" as "description" from "user_rigths_per_channel" where "user_id" = $1`, userID)
|
rows, err := d.db.Queryx(`select "channel_id" as "id", "channel_name" as "name", "channel_description" as "description" from "user_rigths_per_channel" where "user_id" = $1 order by "id"`, userID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, util.MakeError(err, util.DATABASE_QUERY_FAULT)
|
return nil, util.MakeError(err, util.DATABASE_QUERY_FAULT)
|
||||||
}
|
}
|
||||||
@ -89,9 +89,9 @@ func (d ChannelDAOPG) Update(channel model.Channel) *util.ChatError {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Delete removes a channel by ID
|
// Delete removes a channel by ID
|
||||||
func (d ChannelDAOPG) Delete(channel model.Channel) *util.ChatError {
|
func (d ChannelDAOPG) Delete(id int) *util.ChatError {
|
||||||
var err error
|
var err error
|
||||||
_, err = d.db.NamedExec(`delete from "channel" where "id" = :id`, &channel)
|
_, err = d.db.NamedExec(`delete from "channel" where "id" = $1`, id)
|
||||||
return util.MakeError(err, util.DATABASE_QUERY_FAULT)
|
return util.MakeError(err, util.DATABASE_QUERY_FAULT)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
92
dao/postgres/MessageDAO.go
Normal file
92
dao/postgres/MessageDAO.go
Normal file
@ -0,0 +1,92 @@
|
|||||||
|
package postgres
|
||||||
|
|
||||||
|
import (
|
||||||
|
"git.tek.govt.hu/dowerx/chat/server/model"
|
||||||
|
"git.tek.govt.hu/dowerx/chat/server/util"
|
||||||
|
"github.com/jmoiron/sqlx"
|
||||||
|
)
|
||||||
|
|
||||||
|
type MessageDAO struct {
|
||||||
|
pgDAO
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create a new message
|
||||||
|
func (d MessageDAO) Create(message model.Message) *util.ChatError {
|
||||||
|
_, err := d.db.NamedExec(`insert into "message" ("sender_id", "channel_id", "time", "content") values (:sender_id, :channel_id, :time, :content)`, &message)
|
||||||
|
return util.MakeError(err, util.DATABASE_QUERY_FAULT)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Read returns a message by ID
|
||||||
|
func (d MessageDAO) Read(id int) (model.Message, *util.ChatError) {
|
||||||
|
message := model.Message{}
|
||||||
|
|
||||||
|
var rows *sqlx.Rows
|
||||||
|
var err error
|
||||||
|
rows, err = d.db.NamedQuery(`select * from "message" where "id" = $1`, id)
|
||||||
|
if err != nil {
|
||||||
|
return message, util.MakeError(err, util.DATABASE_QUERY_FAULT)
|
||||||
|
}
|
||||||
|
defer rows.Close()
|
||||||
|
|
||||||
|
if !rows.Next() {
|
||||||
|
return message, &util.ChatError{Message: "", Code: util.NOT_FOUND}
|
||||||
|
}
|
||||||
|
|
||||||
|
err = rows.StructScan(&message)
|
||||||
|
return message, util.MakeError(err, util.DATABASE_QUERY_FAULT)
|
||||||
|
}
|
||||||
|
|
||||||
|
// List all messages in channel by ID or name
|
||||||
|
func (d MessageDAO) List(channel model.Channel) ([]model.Message, *util.ChatError) {
|
||||||
|
var rows *sqlx.Rows
|
||||||
|
var err error
|
||||||
|
if channel.ID != 0 {
|
||||||
|
rows, err = d.db.Queryx(`select * from "message" where "id" = :id order by "time"`, &channel)
|
||||||
|
} else {
|
||||||
|
rows, err = d.db.Queryx(`select * from "message" where "name" = :name order by "time"`, &channel)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return nil, util.MakeError(err, util.DATABASE_QUERY_FAULT)
|
||||||
|
}
|
||||||
|
defer rows.Close()
|
||||||
|
|
||||||
|
messages := make([]model.Message, 0)
|
||||||
|
|
||||||
|
for rows.Next() {
|
||||||
|
message := model.Message{}
|
||||||
|
|
||||||
|
err = rows.StructScan(&channel)
|
||||||
|
if err != nil {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
messages = append(messages, message)
|
||||||
|
}
|
||||||
|
|
||||||
|
return messages, util.MakeError(err, util.DATABASE_QUERY_FAULT)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update the contents and time of a Message with the given ID
|
||||||
|
func (d MessageDAO) Update(message model.Message) *util.ChatError {
|
||||||
|
_, err := d.db.NamedExec(`update "message" set "content" = :content, "time" = :time where "id" = :id`, &message)
|
||||||
|
return util.MakeError(err, util.DATABASE_QUERY_FAULT)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Delete a message by ID
|
||||||
|
func (d MessageDAO) Delete(id int) *util.ChatError {
|
||||||
|
var err error
|
||||||
|
_, err = d.db.NamedExec(`delete from "message" where "id" = $1`, id)
|
||||||
|
return util.MakeError(err, util.DATABASE_QUERY_FAULT)
|
||||||
|
}
|
||||||
|
|
||||||
|
func MakeMessageDAO() (MessageDAO, *util.ChatError) {
|
||||||
|
dao := MessageDAO{}
|
||||||
|
conn, err := getDatabase()
|
||||||
|
if err != nil {
|
||||||
|
return dao, err
|
||||||
|
}
|
||||||
|
|
||||||
|
dao.db = conn
|
||||||
|
return dao, nil
|
||||||
|
}
|
60
dao/valkey/NotificationDAO.go
Normal file
60
dao/valkey/NotificationDAO.go
Normal file
@ -0,0 +1,60 @@
|
|||||||
|
package valkey
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"strconv"
|
||||||
|
|
||||||
|
"git.tek.govt.hu/dowerx/chat/server/model"
|
||||||
|
"git.tek.govt.hu/dowerx/chat/server/util"
|
||||||
|
"github.com/valkey-io/valkey-go"
|
||||||
|
)
|
||||||
|
|
||||||
|
const CHANNEL_PREFIX string = "channel:"
|
||||||
|
|
||||||
|
type NotificationDAOVK struct {
|
||||||
|
vkDAO
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d NotificationDAOVK) SendMessage(message model.Message) *util.ChatError {
|
||||||
|
data, err := json.Marshal(message)
|
||||||
|
if err != nil {
|
||||||
|
return util.MakeError(err, util.GENERAL_ERROR)
|
||||||
|
}
|
||||||
|
|
||||||
|
cmd := (*d.vk).B().Publish().Channel(CHANNEL_PREFIX + strconv.Itoa(message.Channel)).Message(string(data)).Build()
|
||||||
|
|
||||||
|
return util.MakeError((*d.vk).Do(context.Background(), cmd).Error(), util.DATABASE_QUERY_FAULT)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d NotificationDAOVK) SubscribeToChannel(ctx context.Context, id int) (<-chan model.Message, *util.ChatError) {
|
||||||
|
cmd := (*d.vk).B().Subscribe().Channel(CHANNEL_PREFIX + strconv.Itoa(id)).Build()
|
||||||
|
|
||||||
|
var messages chan model.Message = make(chan model.Message)
|
||||||
|
|
||||||
|
err := (*d.vk).Receive(ctx, cmd, func(msg valkey.PubSubMessage) {
|
||||||
|
go func() {
|
||||||
|
message := model.Message{}
|
||||||
|
err := json.Unmarshal([]byte(msg.Message), &message)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
messages <- message
|
||||||
|
}()
|
||||||
|
})
|
||||||
|
defer close(messages)
|
||||||
|
|
||||||
|
return messages, util.MakeError(err, util.GENERAL_ERROR)
|
||||||
|
}
|
||||||
|
|
||||||
|
func MakeNotificationDAO() (NotificationDAOVK, *util.ChatError) {
|
||||||
|
dao := NotificationDAOVK{}
|
||||||
|
conn, err := getClient()
|
||||||
|
if err != nil {
|
||||||
|
return dao, err
|
||||||
|
}
|
||||||
|
|
||||||
|
dao.vk = conn
|
||||||
|
return dao, nil
|
||||||
|
}
|
@ -89,7 +89,7 @@ func (d SessionDAOVK) Bump(token string, time int) *util.ChatError {
|
|||||||
return util.MakeError((*d.vk).Do(context.Background(), cmd).Error(), util.DATABASE_QUERY_FAULT)
|
return util.MakeError((*d.vk).Do(context.Background(), cmd).Error(), util.DATABASE_QUERY_FAULT)
|
||||||
}
|
}
|
||||||
|
|
||||||
func MakeUserDAO() (SessionDAOVK, *util.ChatError) {
|
func MakeSessionDAO() (SessionDAOVK, *util.ChatError) {
|
||||||
dao := SessionDAOVK{}
|
dao := SessionDAOVK{}
|
||||||
conn, err := getClient()
|
conn, err := getClient()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -3,9 +3,10 @@ package model
|
|||||||
import "time"
|
import "time"
|
||||||
|
|
||||||
type Message struct {
|
type Message struct {
|
||||||
ID int
|
ID int `db:"id" json:"id"`
|
||||||
Sender string // username
|
SenderID string `db:"sender_id" json:"-"`
|
||||||
Channel Channel // channel.id
|
SenderName string `db:"sender_name" json:"sender_name"`
|
||||||
Time time.Time
|
Channel int `db:"channel_id" json:"channel_id"`
|
||||||
Content string
|
Time time.Time `db:"time" json:"time"`
|
||||||
|
Content string `db:"content" json:"content"`
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user