Skip to content

Commit

Permalink
Allow requesting a different serializer from agent handshake
Browse files Browse the repository at this point in the history
  • Loading branch information
rsafonseca committed Aug 6, 2024
1 parent c548b18 commit 6714974
Show file tree
Hide file tree
Showing 6 changed files with 155 additions and 152 deletions.
192 changes: 57 additions & 135 deletions go.work.sum

Large diffs are not rendered by default.

59 changes: 46 additions & 13 deletions pkg/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ import (
)

var (
// These global variables only apply to the default serializer configured on the server. If clients negotiate a different serializer, these will be encoded again on the first connection.
// hbd contains the heartbeat packet data
hbd []byte
// hrd contains the handshake response data
Expand Down Expand Up @@ -83,6 +84,9 @@ type (
metricsReporters []metrics.Reporter
serializer serialize.Serializer // message serializer
state int32 // current agent state
hbd []byte // heartbeat packet data
hrd []byte // heartbeat response data
herd []byte // handshake error response data
}

pendingMessage struct {
Expand Down Expand Up @@ -118,6 +122,8 @@ type (
SendHandshakeErrorResponse() error
SendRequest(ctx context.Context, serverID, route string, v interface{}) (*protos.Response, error)
AnswerWithError(ctx context.Context, mid uint, err error)
GetSerializer() serialize.Serializer
SetSerializer(serializer serialize.Serializer)
}

// AgentFactory factory for creating Agent instances
Expand Down Expand Up @@ -184,11 +190,6 @@ func newAgent(
// initialize heartbeat and handshake data on first user connection
serializerName := serializer.GetName()

once.Do(func() {
hbdEncode(heartbeatTime, packetEncoder, messageEncoder.IsCompressionEnabled(), serializerName)
herdEncode(heartbeatTime, packetEncoder, messageEncoder.IsCompressionEnabled(), serializerName)
})

a := &agentImpl{
appDieChan: dieChan,
chDie: make(chan struct{}),
Expand All @@ -208,6 +209,14 @@ func newAgent(
sessionPool: sessionPool,
}

once.Do(func() {
a.hbdEncode(heartbeatTime, packetEncoder, messageEncoder.IsCompressionEnabled(), serializerName, true)
a.herdEncode(heartbeatTime, packetEncoder, messageEncoder.IsCompressionEnabled(), serializerName, true)
})
a.hbd = hbd
a.hrd = hrd
a.herd = herd

// binding session
s := sessionPool.NewSession(a, true)
metrics.ReportNumberOfConnectedClients(metricsReporters, sessionPool.GetSessionCount())
Expand Down Expand Up @@ -449,7 +458,7 @@ func (a *agentImpl) heartbeat() {

// chSend is never closed so we need this to don't block if agent is already closed
select {
case a.chSend <- pendingWrite{data: hbd}:
case a.chSend <- pendingWrite{data: a.hbd}:
case <-a.chDie:
return
case <-a.chStopHeartbeat:
Expand Down Expand Up @@ -481,13 +490,13 @@ func (a *agentImpl) onSessionClosed(s session.Session) {

// SendHandshakeResponse sends a handshake response
func (a *agentImpl) SendHandshakeResponse() error {
_, err := a.conn.Write(hrd)
_, err := a.conn.Write(a.hrd)

return err
}

func (a *agentImpl) SendHandshakeErrorResponse() error {
_, err := a.conn.Write(herd)
_, err := a.conn.Write(a.herd)

return err
}
Expand Down Expand Up @@ -591,7 +600,7 @@ func (a *agentImpl) AnswerWithError(ctx context.Context, mid uint, err error) {
}
}

func hbdEncode(heartbeatTimeout time.Duration, packetEncoder codec.PacketEncoder, dataCompression bool, serializerName string) {
func (a *agentImpl) hbdEncode(heartbeatTimeout time.Duration, packetEncoder codec.PacketEncoder, dataCompression bool, serializerName string, defaultSerializer bool) {
hData := map[string]interface{}{
"code": 200,
"sys": map[string]interface{}{
Expand All @@ -606,18 +615,25 @@ func hbdEncode(heartbeatTimeout time.Duration, packetEncoder codec.PacketEncoder
panic(err)
}

hrd, err = packetEncoder.Encode(packet.Handshake, data)
hrdData, err := packetEncoder.Encode(packet.Handshake, data)
if err != nil {
panic(err)
}

hbd, err = packetEncoder.Encode(packet.Heartbeat, nil)
hbdData, err := packetEncoder.Encode(packet.Heartbeat, nil)
if err != nil {
panic(err)
}
if defaultSerializer {
hrd = hrdData
hbd = hbdData
} else {
a.hrd = hrdData
a.hbd = hbdData
}
}

func herdEncode(heartbeatTimeout time.Duration, packetEncoder codec.PacketEncoder, dataCompression bool, serializerName string) {
func (a *agentImpl) herdEncode(heartbeatTimeout time.Duration, packetEncoder codec.PacketEncoder, dataCompression bool, serializerName string, defaultSerializer bool) {
hErrData := map[string]interface{}{
"code": 400,
"sys": map[string]interface{}{
Expand All @@ -632,10 +648,15 @@ func herdEncode(heartbeatTimeout time.Duration, packetEncoder codec.PacketEncode
panic(err)
}

herd, err = packetEncoder.Encode(packet.Handshake, errData)
herdData, err := packetEncoder.Encode(packet.Handshake, errData)
if err != nil {
panic(err)
}
if defaultSerializer {
herd = herdData
} else {
a.herd = herdData
}
}

func encodeAndCompress(data interface{}, dataCompression bool) ([]byte, error) {
Expand Down Expand Up @@ -668,3 +689,15 @@ func (a *agentImpl) reportChannelSize() {
}
}
}

// GetSerializer configured for this agent
func (a *agentImpl) GetSerializer() serialize.Serializer {
return a.serializer
}

// SetSerializer to use for this agent
func (a *agentImpl) SetSerializer(serializer serialize.Serializer) {
a.serializer = serializer
a.hbdEncode(a.heartbeatTimeout, a.encoder, a.messageEncoder.IsCompressionEnabled(), serializer.GetName(), false)
a.herdEncode(a.heartbeatTimeout, a.encoder, a.messageEncoder.IsCompressionEnabled(), serializer.GetName(), false)
}
27 changes: 27 additions & 0 deletions pkg/agent/mocks/agent.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 20 additions & 1 deletion pkg/service/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,25 @@ func (h *HandlerService) processPacket(a agent.Agent, p *packet.Packet) error {
logger.Log.Warnf("failed to save ip version on session: %q\n", err)
}

if len(handshakeData.Sys.Serializer) > 0 {
var requestedSerializer serialize.Serializer
err = nil
switch handshakeData.Sys.Serializer {
case "json":
requestedSerializer, err = serialize.NewSerializer(serialize.JSON)
case "protobuf":
requestedSerializer, err = serialize.NewSerializer(serialize.PROTOBUF)
default:
requestedSerializer = h.serializer
}
if err != nil {
logger.Log.Errorf("Error setting serializer %s for agent", handshakeData.Sys.Serializer)
} else {
logger.Log.Debugf("Overriding serializer for agent %s", a.RemoteAddr())
a.SetSerializer(requestedSerializer)
}
}

logger.Log.Debug("Successfully saved handshake data")

case packet.HandshakeAck:
Expand Down Expand Up @@ -342,7 +361,7 @@ func (h *HandlerService) localProcess(ctx context.Context, a agent.Agent, route
mid = 0
}

ret, err := h.handlerPool.ProcessHandlerMessage(ctx, route, h.serializer, h.handlerHooks, a.GetSession(), msg.Data, msg.Type, false)
ret, err := h.handlerPool.ProcessHandlerMessage(ctx, route, a.GetSerializer(), h.handlerHooks, a.GetSession(), msg.Data, msg.Type, false)
if msg.Type != message.Notify {
if err != nil {
logger.Log.Errorf("Failed to process handler message: %s", err.Error())
Expand Down
3 changes: 2 additions & 1 deletion pkg/session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,12 @@ import (
"sync/atomic"
"time"

"google.golang.org/protobuf/proto"
nats "github.com/nats-io/nats.go"
"github.com/topfreegames/pitaya/v3/pkg/constants"
"github.com/topfreegames/pitaya/v3/pkg/logger"
"github.com/topfreegames/pitaya/v3/pkg/networkentity"
"github.com/topfreegames/pitaya/v3/pkg/protos"
"google.golang.org/protobuf/proto"
)

type sessionPoolImpl struct {
Expand Down Expand Up @@ -72,6 +72,7 @@ type HandshakeClientData struct {
LibVersion string `json:"libVersion"`
BuildNumber string `json:"clientBuildNumber"`
Version string `json:"clientVersion"`
Serializer string `json:"serializer"`
}

// HandshakeData represents information about the handshake sent by the client.
Expand Down
5 changes: 3 additions & 2 deletions repl/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,17 @@ var (
handshake *session.HandshakeData
)

func Start(docs, filename string, prettyJSON bool) {
func Start(docs, filename string, prettyPrint bool) {
docsString = docs
fileName = filename
prettyJSON = prettyJSON
prettyJSON = prettyPrint
handshake = &session.HandshakeData{
Sys: session.HandshakeClientData{
Platform: "repl",
LibVersion: "0.3.5-release",
BuildNumber: "20",
Version: "1.0.0",
Serializer: "json",
},
User: map[string]interface{}{
"client": "repl",
Expand Down

0 comments on commit 6714974

Please sign in to comment.