Skip to content

Commit

Permalink
[refactor] 所有client类型session都实现IClientSessionLifecycle接口中的Start函数
Browse files Browse the repository at this point in the history
  • Loading branch information
q191201771 committed Jun 26, 2024
1 parent 15ed8d1 commit 323d639
Show file tree
Hide file tree
Showing 21 changed files with 66 additions and 36 deletions.
2 changes: 1 addition & 1 deletion app/demo/benchrtmpconnect/benchrtmpconnect.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func main() {
option.HandshakeComplexFlag = false
})
b := time.Now()
err := pullSession.Pull(u)
err := pullSession.Start(u)
e := time.Now()
cost := e.Sub(b).Milliseconds()
// 耗时不够1毫秒,我们将值取整到1毫秒,并打印更精确的实际耗时
Expand Down
4 changes: 2 additions & 2 deletions app/demo/calcrtmpdelay/calcrtmpdelay.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func main() {
pushSession := rtmp.NewPushSession(func(option *rtmp.PushSessionOption) {
option.PushTimeoutMs = 10000
})
err := pushSession.Push(pushUrl)
err := pushSession.Start(pushUrl)
if err != nil {
nazalog.Fatalf("push rtmp failed. url=%s, err=%+v", pushUrl, err)
}
Expand Down Expand Up @@ -105,7 +105,7 @@ func main() {
rtmpPullSession = rtmp.NewPullSession().WithOnReadRtmpAvMsg(func(msg base.RtmpMsg) {
handleReadPayloadFn(msg.Payload)
})
err = rtmpPullSession.Pull(pullUrl)
err = rtmpPullSession.Start(pullUrl)
if err != nil {
nazalog.Fatalf("pull rtmp failed. err=%+v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion app/demo/pullrtmp/pullrtmp.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func pull(url string, filename string) {
}
})

err = session.Pull(url)
err = session.Start(url)
if err != nil {
nazalog.Errorf("pull failed. err=%+v", err)
return
Expand Down
2 changes: 1 addition & 1 deletion app/demo/pullrtmp2hls/pullrtmp2hls.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func main() {
option.PullTimeoutMs = 10000
option.ReadAvTimeoutMs = 10000
}).WithOnReadRtmpAvMsg(rtmp2Mpegts.FeedRtmpMessage)
err = pullSession.Pull(url)
err = pullSession.Start(url)

if err != nil {
nazalog.Fatalf("pull rtmp failed. err=%+v", err)
Expand Down
2 changes: 1 addition & 1 deletion app/demo/pullrtmp2pushrtmp/stream_exist.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func StreamExist(url string) error {
defer s.Dispose()

go func() {
err := s.Pull(url)
err := s.Start(url)
if err != nil {
errChan <- err
}
Expand Down
4 changes: 2 additions & 2 deletions app/demo/pullrtmp2pushrtmp/tunnel.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ func (t *Tunnel) Start() (ret ErrorCode) {
})
nazalog.Infof("[%s] start push. [%s] url=%s", t.uk, pushSession.UniqueKey(), outUrl)

err := pushSession.Push(outUrl)
err := pushSession.Start(outUrl)
// 只有有一个失败就直接退出
if err != nil {
nazalog.Errorf("[%s] push error. [%s] err=%+v", t.uk, pushSession.UniqueKey(), err)
Expand All @@ -223,7 +223,7 @@ func (t *Tunnel) Start() (ret ErrorCode) {
})
nazalog.Infof("[%s] start pull. [%s] url=%s", t.uk, t.pullSession.UniqueKey(), t.inUrl)

err := t.pullSession.Pull(t.inUrl)
err := t.pullSession.Start(t.inUrl)
// pull失败就直接退出
if err != nil {
nazalog.Errorf("[%s] pull error. [%s] err=%+v", t.uk, t.pullSession.UniqueKey(), err)
Expand Down
2 changes: 1 addition & 1 deletion app/demo/pullrtmp2pushrtsp/pullrtmp2pushrtsp.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func main() {
pullSession := rtmp.NewPullSession().WithOnReadRtmpAvMsg(remuxer.FeedRtmpMsg)

nazalog.Info("start pull.")
err := pullSession.Pull(inRtmpUrl)
err := pullSession.Start(inRtmpUrl)
nazalog.Assert(nil, err)
nazalog.Info("pull succ.")

Expand Down
2 changes: 1 addition & 1 deletion app/demo/pullrtsp/pullrtsp.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func main() {
option.OverTcp = overTcp != 0
})

err = pullSession.Pull(inUrl)
err = pullSession.Start(inUrl)
nazalog.Assert(nil, err)

go func() {
Expand Down
4 changes: 2 additions & 2 deletions app/demo/pullrtsp2pushrtmp/pullrtsp2pushrtmp.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func main() {
option.WriteAvTimeoutMs = 10000
})

err := pushSession.Push(outUrl)
err := pushSession.Start(outUrl)
nazalog.Assert(nil, err)
defer pushSession.Dispose()

Expand All @@ -49,7 +49,7 @@ func main() {
option.OverTcp = overTcp != 0
})

err = pullSession.Pull(inUrl)
err = pullSession.Start(inUrl)
nazalog.Assert(nil, err)
defer pullSession.Dispose()

Expand Down
2 changes: 1 addition & 1 deletion app/demo/pullrtsp2pushrtsp/pullrtsp2pushrtsp.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (r *RtspTunnel) Start() error {
option.PullTimeoutMs = 10000
option.OverTcp = r.pullOverTcp
})
if err := r.pullSession.Pull(r.pullUrl); err != nil {
if err := r.pullSession.Start(r.pullUrl); err != nil {
nazalog.Errorf("[%s] start pull failed. err=%+v, url=%s", r.uniqueKey, err, r.pullUrl)
return err
}
Expand Down
2 changes: 1 addition & 1 deletion app/demo/pushrtmp/pushrtmp.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func push(tags []httpflv.Tag, url string, isRecursive bool) {
option.WriteChanSize = 0
})

if err := ps.Push(url); err != nil {
if err := ps.Start(url); err != nil {
nazalog.Errorf("push failed. err=%v", err)
return
}
Expand Down
23 changes: 19 additions & 4 deletions pkg/base/t_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,17 +95,32 @@ type IServerSession interface {

// ---------------------------------------------------------------------------------------------------------------------

// IClientSessionLifecycle
//
// 常规正确调用流程:
//
// New -> WithXXX -> Start -> Read/Write -> Dispose
//
// Start之前,不调用Read, Write, WaitChan函数
// Start, Read, Write返回失败或者WaitChan返回错误时,直接调用Dispose,之后尽量不再使用这个session
type IClientSessionLifecycle interface {
// Dispose 主动关闭session时调用
Start(rawUrl string) error

// Dispose
//
// 注意,只有Start(具体session的Start类型函数一般命令为Push和Pull)成功后的session才能调用,否则行为未定义
// 关闭session,主要是在主动关闭时调用。
//
// Dispose可在任意协程内调用
// - 可以在任意协程内调用。
// - 可以调用多次。
//
// 注意,目前Dispose允许调用多次,但是未来可能不对该表现做保证
// - 理论上可以在Start之前调用,但请尽量避免。
// - 不能和Start并发调用。
// - Start失败后可以不调用Dispose。
//
// Dispose后,调用Write函数将返回错误
//
// - TODO 如果Read或Write函数返回错误,可以不调用Dispose。
//
// @return 可以通过返回值判断调用Dispose前,session是否已经被关闭了 TODO(chef) 这个返回值没有太大意义,后面可能会删掉
//
Dispose() error
Expand Down
6 changes: 3 additions & 3 deletions pkg/innertest/innertest.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ func entry() {
assert.Equal(t, nil, err)
rtmpPullTagCount.Increment()
})
err := rtmpPullSession.Pull(rtmpPullUrl)
err := rtmpPullSession.Start(rtmpPullUrl)
Log.Assert(nil, err)
err = <-rtmpPullSession.WaitChan()
Log.Debug(err)
Expand Down Expand Up @@ -262,7 +262,7 @@ func entry() {
rtspPullSession = rtsp.NewPullSession(&rtspPullObserver, func(option *rtsp.PullSessionOption) {
option.PullTimeoutMs = 10000
})
err := rtspPullSession.Pull(rtspPullUrl)
err := rtspPullSession.Start(rtspPullUrl)
assert.Equal(t, nil, err)
entryWaitGroup.Done()
}()
Expand All @@ -273,7 +273,7 @@ func entry() {
option.WriteBufSize = 4096
//option.WriteChanSize = 1024
})
err = pushSession.Push(pushUrl)
err = pushSession.Start(pushUrl)
assert.Equal(t, nil, err)

for _, tag := range tags {
Expand Down
4 changes: 2 additions & 2 deletions pkg/logic/group__relay_pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ func (group *Group) pullIfNeeded() (string, error) {
go func(rtPullUrl string, rtIsPullByRtmp bool, rtRtmpSession *rtmp.PullSession, rtRtspSession *rtsp.PullSession) {
if rtIsPullByRtmp {
// TODO(chef): 处理数据回调,是否应该等待Add成功之后。避免竞态条件中途加入了其他in session
err := rtRtmpSession.Pull(rtPullUrl)
err := rtRtmpSession.Start(rtPullUrl)
if err != nil {
Log.Errorf("[%s] relay pull fail. err=%v", rtRtmpSession.UniqueKey(), err)
group.DelRtmpPullSession(rtRtmpSession)
Expand All @@ -262,7 +262,7 @@ func (group *Group) pullIfNeeded() (string, error) {
return
}

err := rtRtspSession.Pull(rtPullUrl)
err := rtRtspSession.Start(rtPullUrl)
if err != nil {
Log.Errorf("[%s] relay pull fail. err=%v", rtRtspSession.UniqueKey(), err)
group.DelRtspPullSession(rtRtspSession)
Expand Down
2 changes: 1 addition & 1 deletion pkg/logic/group__relay_push.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func (group *Group) startPushIfNeeded() {
option.PushTimeoutMs = RelayPushTimeoutMs
option.WriteAvTimeoutMs = RelayPushWriteAvTimeoutMs
})
err := pushSession.Push(u2)
err := pushSession.Start(u2)
if err != nil {
Log.Errorf("[%s] relay push done. err=%v", pushSession.UniqueKey(), err)
group.DelRtmpPushSession(u, pushSession)
Expand Down
11 changes: 8 additions & 3 deletions pkg/rtmp/client_pull_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func NewPullSession(modOptions ...ModPullSessionOption) *PullSession {

// WithOnPullSucc Pull成功
//
// 如果你想保证绝对时序,在 WithOnReadRtmpAvMsg 回调音视频数据前,做一些操作,那么使用这个回调替代 Pull 返回成功
// 如果你想保证绝对时序,在 WithOnReadRtmpAvMsg 回调音视频数据前,做一些操作,那么使用这个回调替代 Start 返回成功
func (s *PullSession) WithOnPullSucc(onPullResult func()) *PullSession {
s.core.onDoResult = onPullResult
return s
Expand All @@ -96,9 +96,14 @@ func (s *PullSession) WithOnReadRtmpAvMsg(onReadRtmpAvMsg OnReadRtmpAvMsg) *Pull
return s
}

// Pull 阻塞直到和对端完成拉流前的所有准备工作(也即收到RTMP Play response),或者发生错误
// Start 阻塞直到和对端完成拉流前的所有准备工作(也即收到RTMP Play response),或者发生错误
func (s *PullSession) Start(rawUrl string) error {
return s.core.Start(rawUrl)
}

// Pull deprecated. use Start instead.
func (s *PullSession) Pull(rawUrl string) error {
return s.core.Do(rawUrl)
return s.Start(rawUrl)
}

// ---------------------------------------------------------------------------------------------------------------------
Expand Down
9 changes: 7 additions & 2 deletions pkg/rtmp/client_push_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,14 @@ func NewPushSession(modOptions ...ModPushSessionOption) *PushSession {
}
}

// Push 阻塞直到和对端完成推流前,握手部分的工作(也即收到RTMP Publish response),或者发生错误
// Start 阻塞直到和对端完成推流前,握手部分的工作(也即收到RTMP Publish response),或者发生错误
func (s *PushSession) Start(rawUrl string) error {
return s.core.Start(rawUrl)
}

// Push deprecated. use Start instead.
func (s *PushSession) Push(rawUrl string) error {
return s.core.Do(rawUrl)
return s.Start(rawUrl)
}

// Write 发送数据
Expand Down
4 changes: 2 additions & 2 deletions pkg/rtmp/client_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,8 @@ func NewClientSession(sessionType base.SessionType, modOptions ...ModClientSessi
return s
}

// Do 阻塞直到收到服务端返回的 publish / play 对应结果的信令或者发生错误
func (s *ClientSession) Do(rawUrl string) error {
// Start 阻塞直到收到服务端返回的 publish / play 对应结果的信令或者发生错误
func (s *ClientSession) Start(rawUrl string) error {
Log.Debugf("[%s] Do. url=%s", s.UniqueKey(), rawUrl)

var (
Expand Down
2 changes: 1 addition & 1 deletion pkg/rtsp/client_command_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func (session *ClientCommandSession) InitWithSdp(sdpCtx sdp.LogicContext) {
session.sdpCtx = sdpCtx
}

func (session *ClientCommandSession) Do(rawUrl string) error {
func (session *ClientCommandSession) Start(rawUrl string) error {
var (
ctx context.Context
cancel context.CancelFunc
Expand Down
11 changes: 8 additions & 3 deletions pkg/rtsp/client_pull_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,10 @@ func (session *PullSession) WithOnDescribeResponse(onDescribeResponse func()) *P
return session
}

// Pull 阻塞直到和对端完成拉流前,握手部分的工作(也即收到RTSP Play response),或者发生错误
func (session *PullSession) Pull(rawUrl string) error {
// Start 阻塞直到和对端完成拉流前,握手部分的工作(也即收到RTSP Play response),或者发生错误
func (session *PullSession) Start(rawUrl string) error {
Log.Debugf("[%s] pull. url=%s", session.UniqueKey(), rawUrl)
if err := session.cmdSession.Do(rawUrl); err != nil {
if err := session.cmdSession.Start(rawUrl); err != nil {
_ = session.dispose(err)
return err
}
Expand Down Expand Up @@ -128,6 +128,11 @@ func (session *PullSession) Pull(rawUrl string) error {
return nil
}

// Pull deprecated. use Start instead.
func (session *PullSession) Pull(rawUrl string) error {
return session.Start(rawUrl)
}

func (session *PullSession) GetSdp() sdp.LogicContext {
return session.baseInSession.GetSdp()
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/rtsp/client_push_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ func (session *PushSession) push(rawUrl string) error {
Log.Debugf("[%s] push. url=%s", session.UniqueKey(), rawUrl)
session.cmdSession.InitWithSdp(*session.sdpCtx)
session.baseOutSession.InitWithSdp(*session.sdpCtx)
if err := session.cmdSession.Do(rawUrl); err != nil {
if err := session.cmdSession.Start(rawUrl); err != nil {
_ = session.dispose(err)
return err
}
Expand Down

0 comments on commit 323d639

Please sign in to comment.