Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactored and fixed the websocket #2

Merged
merged 3 commits into from
Mar 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 43 additions & 28 deletions api/api/controller/websocket.go
Original file line number Diff line number Diff line change
@@ -1,37 +1,52 @@
package controller

import (
"encoding/json"

"github.com/gofiber/contrib/websocket"
"github.com/gofiber/fiber/v2"

"github.com/TheNestDevs/SnTx/api/internal/domain"
)

var (
Server domain.Server
)

func WebsocketRoute(conn *websocket.Conn) {
id := conn.Params("id")
if id == "" {
_ = conn.Close()
}

topic := conn.Query("topic")
if topic == "" {
_ = conn.Close()
}

client := domain.Client{
Id: id,
Topic: topic,
Conn: conn,
}

for {
_, payLoad, _ := conn.ReadMessage()

// sending data to go routine
domain.Cli <- client
domain.PayLoad <- payLoad
}
func WebsocketRoute(hub *domain.Hub) fiber.Handler {
return websocket.New(func(conn *websocket.Conn) {
var msg domain.Message

id := conn.Params("id")
if id == "" {
_ = conn.Close()
}

topic := conn.Query("topic")
if topic == "" {
_ = conn.Close()
}

client := domain.Client{
Id: id,
Topic: topic,
Conn: conn,
}

hub.NewClient <- &client

defer func() {
hub.RemoveClient(&client)
_ = conn.Close()
}()

for {
messageType, payLoad, _ := conn.ReadMessage()

_ = json.Unmarshal(payLoad, &msg)

// broadcasting message to all other clients in the same room
if messageType == 1 { // checking if the message is a text type message
hub.Broadcast <- &msg
} else {
break
}
}
})
}
9 changes: 6 additions & 3 deletions api/api/routes/routes.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
package routes

import (
"github.com/gofiber/contrib/websocket"
"github.com/gofiber/fiber/v2"

"github.com/TheNestDevs/SnTx/api/api/controller"
"github.com/TheNestDevs/SnTx/api/internal/domain"
)

func SetupRoute(app *fiber.App) {
ws := app.Group("/ws")
go controller.Server.ProcessMessage()
ws.Get("/:id", websocket.New(controller.WebsocketRoute))

hub := domain.NewHub()

go hub.Run()
ws.Get("/:id", controller.WebsocketRoute(hub))
}
10 changes: 10 additions & 0 deletions api/internal/domain/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package domain

import "github.com/gofiber/contrib/websocket"

// Client holds the structure of a single client instance
type Client struct {
Id string `json:"id"` // client id fetched from query or if not passed then generated automatically
Topic string `json:"topic"` // topic of the conversation
Conn *websocket.Conn // websocket connection for each client
}
56 changes: 56 additions & 0 deletions api/internal/domain/hub.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package domain

type Room struct {
Topic string
Clients map[string]*Client
}

type Hub struct {
Rooms map[string]*Room // Map of rooms. Key is the Topic and value is the Room
Broadcast chan *Message
NewClient chan *Client
}

func NewHub() *Hub {
return &Hub{
Rooms: make(map[string]*Room),
Broadcast: make(chan *Message),
NewClient: make(chan *Client),
}
}

func (h *Hub) Run() {
for {
select {
case message := <-h.Broadcast:
h.SendToRoom(message.Topic, message)
case newClient := <-h.NewClient:
if _, ok := h.Rooms[newClient.Topic]; !ok {
h.Rooms[newClient.Topic] = &Room{
Topic: newClient.Topic,
Clients: make(map[string]*Client),
}
h.Rooms[newClient.Topic].Clients[newClient.Id] = newClient
} else {
h.Rooms[newClient.Topic].Clients[newClient.Id] = newClient
}
}
}
}

// SendToRoom sends a message to all clients in a room accept the sender client
func (h *Hub) SendToRoom(room string, message *Message) {
for _, client := range h.Rooms[room].Clients {
if client.Id != message.ClientID {
client.Conn.WriteJSON(message)
}
}
}

func (h *Hub) RemoveClient(client *Client) {
delete(h.Rooms[client.Topic].Clients, client.Id)

if len(h.Rooms[client.Topic].Clients) == 0 {
delete(h.Rooms, client.Topic)
}
}
67 changes: 0 additions & 67 deletions api/internal/domain/message.go
Original file line number Diff line number Diff line change
@@ -1,75 +1,8 @@
package domain

import (
"context"
"encoding/json"
"log"

"github.com/gofiber/contrib/websocket"
)

var (
Cli = make(chan Client) // Cli is client channel for communicating with go routine
PayLoad = make(chan []byte) // PayLoad channel contains the payload sent from client
Svr = make(chan []*Client)
ctx = context.Background() // ctx is used as context passed to redis
)

// Client holds the structure of a single client instance
type Client struct {
Id string `json:"id"` // client id fetched from query or if not passed then generated automatically
Topic string `json:"topic"` // topic of the conversation
Conn *websocket.Conn // websocket connection for each client
}

type Server struct {
server []*Client
}

// Message holds the structure of JSON message send via websocket.
type Message struct {
ClientID string `json:"client_id,omitempty"` // client id of the client
Topic string `json:"topic,omitempty"` // topic of the message sent
Msg string `json:"message,omitempty"` // message string that is sent
}

func (s *Server) Send(client *Client, msg Message) {
jsonData, err := json.Marshal(msg)
if err != nil {
return
}

err = client.Conn.WriteMessage(websocket.TextMessage, jsonData)
if err != nil {
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
log.Fatalf("websocket error: %v", err)
}
}
}

func (s *Server) ProcessMessage() {
for {
select {
case client := <-Cli:
s.LoadServer(&client)
case payload := <-PayLoad:
var msg Message
err := json.Unmarshal(payload, &msg)
if err != nil {
log.Fatalf("error unmarshalling payload: %v", err)
}

for _, client := range s.server {
if client.Id == msg.ClientID {
continue
} else if client.Topic == msg.Topic {
s.Send(client, msg)
}
}
}
}
}

func (s *Server) LoadServer(client *Client) {
s.server = append(s.server, client)
}
Loading