diff --git a/dao/Factory.go b/dao/Factory.go index 1518e42..ef9a27f 100644 --- a/dao/Factory.go +++ b/dao/Factory.go @@ -9,6 +9,7 @@ import ( var userDAO IUserDAO var channelDAO IChannelDAO var sessionDAO ISessionDAO +var messageDAO IMessageDAO func GetUserDAO() (IUserDAO, *util.ChatError) { if userDAO == nil { @@ -36,9 +37,22 @@ func GetChannelDAO() (IChannelDAO, *util.ChatError) { return channelDAO, nil } +func GetMessageDAO() (IMessageDAO, *util.ChatError) { + if messageDAO == nil { + dao, err := postgres.MakeMessageDAO() + if err != nil { + return messageDAO, err + } + + messageDAO = dao + } + + return messageDAO, nil +} + func GetSessionDAO() (ISessionDAO, *util.ChatError) { if sessionDAO == nil { - dao, err := valkey.MakeUserDAO() + dao, err := valkey.MakeSessionDAO() if err != nil { return sessionDAO, err } diff --git a/dao/IChannelDAD.go b/dao/IChannelDAD.go index c9f192f..c5ebc8c 100644 --- a/dao/IChannelDAD.go +++ b/dao/IChannelDAD.go @@ -11,5 +11,5 @@ type IChannelDAO interface { List() ([]model.Channel, *util.ChatError) ListAvailableChannels(userID int) ([]model.Channel, *util.ChatError) Update(channel model.Channel) *util.ChatError - Delete(channel model.Channel) *util.ChatError + Delete(id int) *util.ChatError } diff --git a/dao/IMessageDAD.go b/dao/IMessageDAD.go new file mode 100644 index 0000000..639c3e1 --- /dev/null +++ b/dao/IMessageDAD.go @@ -0,0 +1,14 @@ +package dao + +import ( + "git.tek.govt.hu/dowerx/chat/server/model" + "git.tek.govt.hu/dowerx/chat/server/util" +) + +type IMessageDAO interface { + Create(message model.Message) *util.ChatError + Read(id int) (model.Message, *util.ChatError) + List(channel model.Channel) ([]model.Message, *util.ChatError) + Update(message model.Message) *util.ChatError + Delete(id int) *util.ChatError +} diff --git a/dao/INotificationDAO.go b/dao/INotificationDAO.go new file mode 100644 index 0000000..8f78313 --- /dev/null +++ b/dao/INotificationDAO.go @@ -0,0 +1,13 @@ +package dao + +import ( + "context" + + "git.tek.govt.hu/dowerx/chat/server/model" + "git.tek.govt.hu/dowerx/chat/server/util" +) + +type INotification interface { + SendMessage(ctx context.Context, message model.Message) *util.ChatError + SubscribeToChannel(ctx context.Context, id int) (<-chan model.Message, func(), *util.ChatError) +} diff --git a/dao/postgres/ChannelDAO.go b/dao/postgres/ChannelDAO.go index 7a0795b..dc3b397 100644 --- a/dao/postgres/ChannelDAO.go +++ b/dao/postgres/ChannelDAO.go @@ -60,7 +60,7 @@ func (d ChannelDAOPG) List() ([]model.Channel, *util.ChatError) { // ListAvailableChannels returns channels that the given user has access to func (d ChannelDAOPG) ListAvailableChannels(userID int) ([]model.Channel, *util.ChatError) { - rows, err := d.db.Queryx(`select "channel_id" as "id", "channel_name" as "name", "channel_description" as "description" from "user_rigths_per_channel" where "user_id" = $1`, userID) + rows, err := d.db.Queryx(`select "channel_id" as "id", "channel_name" as "name", "channel_description" as "description" from "user_rigths_per_channel" where "user_id" = $1 order by "id"`, userID) if err != nil { return nil, util.MakeError(err, util.DATABASE_QUERY_FAULT) } @@ -89,9 +89,9 @@ func (d ChannelDAOPG) Update(channel model.Channel) *util.ChatError { } // Delete removes a channel by ID -func (d ChannelDAOPG) Delete(channel model.Channel) *util.ChatError { +func (d ChannelDAOPG) Delete(id int) *util.ChatError { var err error - _, err = d.db.NamedExec(`delete from "channel" where "id" = :id`, &channel) + _, err = d.db.NamedExec(`delete from "channel" where "id" = $1`, id) return util.MakeError(err, util.DATABASE_QUERY_FAULT) } diff --git a/dao/postgres/MessageDAO.go b/dao/postgres/MessageDAO.go new file mode 100644 index 0000000..a951eed --- /dev/null +++ b/dao/postgres/MessageDAO.go @@ -0,0 +1,92 @@ +package postgres + +import ( + "git.tek.govt.hu/dowerx/chat/server/model" + "git.tek.govt.hu/dowerx/chat/server/util" + "github.com/jmoiron/sqlx" +) + +type MessageDAO struct { + pgDAO +} + +// Create a new message +func (d MessageDAO) Create(message model.Message) *util.ChatError { + _, err := d.db.NamedExec(`insert into "message" ("sender_id", "channel_id", "time", "content") values (:sender_id, :channel_id, :time, :content)`, &message) + return util.MakeError(err, util.DATABASE_QUERY_FAULT) +} + +// Read returns a message by ID +func (d MessageDAO) Read(id int) (model.Message, *util.ChatError) { + message := model.Message{} + + var rows *sqlx.Rows + var err error + rows, err = d.db.NamedQuery(`select * from "message" where "id" = $1`, id) + if err != nil { + return message, util.MakeError(err, util.DATABASE_QUERY_FAULT) + } + defer rows.Close() + + if !rows.Next() { + return message, &util.ChatError{Message: "", Code: util.NOT_FOUND} + } + + err = rows.StructScan(&message) + return message, util.MakeError(err, util.DATABASE_QUERY_FAULT) +} + +// List all messages in channel by ID or name +func (d MessageDAO) List(channel model.Channel) ([]model.Message, *util.ChatError) { + var rows *sqlx.Rows + var err error + if channel.ID != 0 { + rows, err = d.db.Queryx(`select * from "message" where "id" = :id order by "time"`, &channel) + } else { + rows, err = d.db.Queryx(`select * from "message" where "name" = :name order by "time"`, &channel) + } + + if err != nil { + return nil, util.MakeError(err, util.DATABASE_QUERY_FAULT) + } + defer rows.Close() + + messages := make([]model.Message, 0) + + for rows.Next() { + message := model.Message{} + + err = rows.StructScan(&channel) + if err != nil { + break + } + + messages = append(messages, message) + } + + return messages, util.MakeError(err, util.DATABASE_QUERY_FAULT) +} + +// Update the contents and time of a Message with the given ID +func (d MessageDAO) Update(message model.Message) *util.ChatError { + _, err := d.db.NamedExec(`update "message" set "content" = :content, "time" = :time where "id" = :id`, &message) + return util.MakeError(err, util.DATABASE_QUERY_FAULT) +} + +// Delete a message by ID +func (d MessageDAO) Delete(id int) *util.ChatError { + var err error + _, err = d.db.NamedExec(`delete from "message" where "id" = $1`, id) + return util.MakeError(err, util.DATABASE_QUERY_FAULT) +} + +func MakeMessageDAO() (MessageDAO, *util.ChatError) { + dao := MessageDAO{} + conn, err := getDatabase() + if err != nil { + return dao, err + } + + dao.db = conn + return dao, nil +} diff --git a/dao/valkey/NotificationDAO.go b/dao/valkey/NotificationDAO.go new file mode 100644 index 0000000..e5ceaef --- /dev/null +++ b/dao/valkey/NotificationDAO.go @@ -0,0 +1,60 @@ +package valkey + +import ( + "context" + "encoding/json" + "strconv" + + "git.tek.govt.hu/dowerx/chat/server/model" + "git.tek.govt.hu/dowerx/chat/server/util" + "github.com/valkey-io/valkey-go" +) + +const CHANNEL_PREFIX string = "channel:" + +type NotificationDAOVK struct { + vkDAO +} + +func (d NotificationDAOVK) SendMessage(message model.Message) *util.ChatError { + data, err := json.Marshal(message) + if err != nil { + return util.MakeError(err, util.GENERAL_ERROR) + } + + cmd := (*d.vk).B().Publish().Channel(CHANNEL_PREFIX + strconv.Itoa(message.Channel)).Message(string(data)).Build() + + return util.MakeError((*d.vk).Do(context.Background(), cmd).Error(), util.DATABASE_QUERY_FAULT) +} + +func (d NotificationDAOVK) SubscribeToChannel(ctx context.Context, id int) (<-chan model.Message, *util.ChatError) { + cmd := (*d.vk).B().Subscribe().Channel(CHANNEL_PREFIX + strconv.Itoa(id)).Build() + + var messages chan model.Message = make(chan model.Message) + + err := (*d.vk).Receive(ctx, cmd, func(msg valkey.PubSubMessage) { + go func() { + message := model.Message{} + err := json.Unmarshal([]byte(msg.Message), &message) + if err != nil { + return + } + + messages <- message + }() + }) + defer close(messages) + + return messages, util.MakeError(err, util.GENERAL_ERROR) +} + +func MakeNotificationDAO() (NotificationDAOVK, *util.ChatError) { + dao := NotificationDAOVK{} + conn, err := getClient() + if err != nil { + return dao, err + } + + dao.vk = conn + return dao, nil +} diff --git a/dao/valkey/SessionDAO.go b/dao/valkey/SessionDAO.go index 60f7abc..9616039 100644 --- a/dao/valkey/SessionDAO.go +++ b/dao/valkey/SessionDAO.go @@ -89,7 +89,7 @@ func (d SessionDAOVK) Bump(token string, time int) *util.ChatError { return util.MakeError((*d.vk).Do(context.Background(), cmd).Error(), util.DATABASE_QUERY_FAULT) } -func MakeUserDAO() (SessionDAOVK, *util.ChatError) { +func MakeSessionDAO() (SessionDAOVK, *util.ChatError) { dao := SessionDAOVK{} conn, err := getClient() if err != nil { diff --git a/model/Message.go b/model/Message.go index 3ad888d..0ec27ee 100644 --- a/model/Message.go +++ b/model/Message.go @@ -3,9 +3,10 @@ package model import "time" type Message struct { - ID int - Sender string // username - Channel Channel // channel.id - Time time.Time - Content string + ID int `db:"id" json:"id"` + SenderID string `db:"sender_id" json:"-"` + SenderName string `db:"sender_name" json:"sender_name"` + Channel int `db:"channel_id" json:"channel_id"` + Time time.Time `db:"time" json:"time"` + Content string `db:"content" json:"content"` }