Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

addclosecallback可为conn添加任意个回调方法,并在conn.stop时按添加顺序执行(独立协程中) #301

Merged
merged 2 commits into from
Feb 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions examples/zinx_kcp/kcp_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/aceld/zinx/zpack"
"github.com/xtaci/kcp-go"
"io"
"time"
)

// 模拟客户端
Expand All @@ -19,8 +20,8 @@ func main() {
}

dp := zpack.Factory().NewPack(ziface.ZinxDataPack)
msg, _ := dp.Pack(zpack.NewMsgPackage(1, []byte("client test message")))
_, err = conn.Write(msg)
sendMsg, _ := dp.Pack(zpack.NewMsgPackage(1, []byte("client test message")))
_, err = conn.Write(sendMsg)
if err != nil {
fmt.Println("client write err: ", err)
return
Expand Down Expand Up @@ -55,6 +56,13 @@ func main() {
}

fmt.Printf("==> Client receive Msg: ID = %d, len = %d , data = %s\n", msg.ID, msg.DataLen, msg.Data)

time.Sleep(1 * time.Second)
_, err = conn.Write(sendMsg)
if err != nil {
fmt.Println("client write err: ", err)
return
}
}
}
}
27 changes: 16 additions & 11 deletions examples/zinx_kcp/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ type TestRouter struct {
znet.BaseRouter
}

var dealTimes = 0

// PreHandle -
func (t *TestRouter) PreHandle(req ziface.IRequest) {
start := time.Now()
Expand All @@ -30,27 +32,24 @@ func (t *TestRouter) PreHandle(req ziface.IRequest) {
func (t *TestRouter) Handle(req ziface.IRequest) {
fmt.Println("--> Call Handle")

// Simulated scenario - In the event of an expected error such as incorrect permissions or incorrect information,
// subsequent function execution will be stopped, but this function will be fully executed.
// 模拟场景- 出现意料之中的错误 如权限不对或者信息错误 则停止后续函数执行,但是次函数会执行完毕
if err := Err(); err != nil {
req.Abort()
fmt.Println("Insufficient permission")
}

// Simulation scenario - In case of a certain situation, repeat the above operation.
// 模拟场景- 出现某种情况,重复上面的操作
/*
if err := Err(); err != nil {
req.Goto(znet.PRE_HANDLE)
fmt.Println("repeat")
}
*/
dealTimes++
req.GetConnection().AddCloseCallback(nil, nil, func() {
fmt.Println("run close callback")
})

if err := req.GetConnection().SendMsg(0, []byte("test2")); err != nil {
fmt.Println(err)
}

if dealTimes == 5 {
req.GetConnection().Stop()
}

time.Sleep(1 * time.Millisecond)
}

Expand Down Expand Up @@ -79,5 +78,11 @@ func main() {
LogFile: "test.log",
})
s.AddRouter(1, &TestRouter{})
s.SetOnConnStart(func(conn ziface.IConnection) {
fmt.Println("--> OnConnStart")
})
s.SetOnConnStop(func(conn ziface.IConnection) {
fmt.Println("--> OnConnStop")
})
s.Serve()
}
4 changes: 4 additions & 0 deletions ziface/iconnection.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,8 @@ type IConnection interface {
RemoveProperty(key string) // Remove connection property
IsAlive() bool // Check if the current connection is alive(判断当前连接是否存活)
SetHeartBeat(checker IHeartbeatChecker) // Set the heartbeat detector (设置心跳检测器)

AddCloseCallback(handler, key interface{}, callback func()) // Add a close callback function (添加关闭回调函数)
RemoveCloseCallback(handler, key interface{}) // Remove a close callback function (删除关闭回调函数)
InvokeCloseCallbacks() // Trigger the close callback function (触发关闭回调函数,独立协程完成)
}
57 changes: 57 additions & 0 deletions znet/callbacks.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package znet

type callbackCommon struct {
handler interface{}
key interface{}
call func()
next *callbackCommon
}

type callbacks struct {
first *callbackCommon
last *callbackCommon
}

func (t *callbacks) Add(handler, key interface{}, callback func()) {
if callback == nil {
return
}
newItem := &callbackCommon{handler, key, callback, nil}
if t.first == nil {
t.first = newItem
} else {
t.last.next = newItem
}
t.last = newItem
}

func (t *callbacks) Remove(handler, key interface{}) {
var prev *callbackCommon
for callback := t.first; callback != nil; prev, callback = callback, callback.next {
if callback.handler == handler && callback.key == key {
if t.first == callback {
t.first = callback.next
} else if prev != nil {
prev.next = callback.next
}
if t.last == callback {
t.last = prev
}
return
}
}
}

func (t *callbacks) Invoke() {
for callback := t.first; callback != nil; callback = callback.next {
callback.call()
}
}

func (t *callbacks) Count() int {
var count int
for callback := t.first; callback != nil; callback = callback.next {
count++
}
return count
}
29 changes: 29 additions & 0 deletions znet/callbacks_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package znet

import "testing"

func TestCallback(t *testing.T) {
cb := &callbacks{}
var count, expected int

cb.Add("handler", "a", func() {
count++
})
cb.Add("handler", "b", func() {
count++
})
cb.Invoke()

expected = 2
if count != expected {
t.Errorf("returned %d, expected %d", count, expected)
}

count = 0
expected = 1
cb.Remove("handler", "b")
cb.Invoke()
if count != expected {
t.Errorf("returned %d, expected %d", count, expected)
}
}
45 changes: 45 additions & 0 deletions znet/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,12 @@ type Connection struct {
// Remote address of the current connection
// (当前链接的远程地址)
remoteAddr string

// Close callback
closeCallback callbacks

// Close callback mutex
closeCallbackMutex sync.RWMutex
}

// newServerConn :for Server, method to create a Server-side connection with Server-specific properties
Expand Down Expand Up @@ -487,6 +493,21 @@ func (c *Connection) finalizer() {
c.connManager.Remove(c)
}

// Close all channels associated with the connection
if c.msgBuffChan != nil {
close(c.msgBuffChan)
}

go func() {
defer func() {
if err := recover(); err != nil {
zlog.Ins().ErrorF("Conn finalizer panic: %v", err)
}
}()

c.InvokeCloseCallbacks()
}()

zlog.Ins().InfoF("Conn Stop()...ConnID = %d", c.connID)
}

Expand Down Expand Up @@ -549,3 +570,27 @@ func (c *Connection) setClose() bool {
func (c *Connection) setStartWriterFlag() bool {
return atomic.CompareAndSwapInt32(&c.startWriterFlag, 0, 1)
}

func (s *Connection) AddCloseCallback(handler, key interface{}, f func()) {
if s.isClosed() {
return
}
s.closeCallbackMutex.Lock()
defer s.closeCallbackMutex.Unlock()
s.closeCallback.Add(handler, key, f)
}

func (s *Connection) RemoveCloseCallback(handler, key interface{}) {
if s.isClosed() {
return
}
s.closeCallbackMutex.Lock()
defer s.closeCallbackMutex.Unlock()
s.closeCallback.Remove(handler, key)
}

func (s *Connection) InvokeCloseCallbacks() {
s.closeCallbackMutex.RLock()
defer s.closeCallbackMutex.RUnlock()
s.closeCallback.Invoke()
}
Loading
Loading