Skip to content

Commit

Permalink
Use ringing timeout and max call duration. (#198)
Browse files Browse the repository at this point in the history
  • Loading branch information
dennwc authored Oct 11, 2024
1 parent 711a46e commit 2fa5c7a
Show file tree
Hide file tree
Showing 8 changed files with 83 additions and 43 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ require (
github.com/jfreymuth/oggvorbis v1.0.5
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1
github.com/livekit/mediatransportutil v0.0.0-20240730083616-559fa5ece598
github.com/livekit/protocol v1.23.1-0.20241003220239-75af842a1264
github.com/livekit/protocol v1.24.1-0.20241010185750-19b686d31289
github.com/livekit/psrpc v0.6.1-0.20240924010758-9f0a4268a3b9
github.com/livekit/server-sdk-go/v2 v2.2.2-0.20241003085414-b42e5a1da639
github.com/mjibson/go-dsp v0.0.0-20180508042940-11479a337f12
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,8 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ=
github.com/livekit/mediatransportutil v0.0.0-20240730083616-559fa5ece598 h1:yLlkHk2feSLHstD9n4VKg7YEBR4rLODTI4WE8gNBEnQ=
github.com/livekit/mediatransportutil v0.0.0-20240730083616-559fa5ece598/go.mod h1:jwKUCmObuiEDH0iiuJHaGMXwRs3RjrB4G6qqgkr/5oE=
github.com/livekit/protocol v1.23.1-0.20241003220239-75af842a1264 h1:jj0lLMRFhk1Y7X1Ugi8wd47wNtgIoju36qic6mSjGPE=
github.com/livekit/protocol v1.23.1-0.20241003220239-75af842a1264/go.mod h1:nxRzmQBKSYK64gqr7ABWwt78hvrgiO2wYuCojRYb7Gs=
github.com/livekit/protocol v1.24.1-0.20241010185750-19b686d31289 h1:Uj1kuYVzE2F7MhjpGhfefLuXdXTWKh/DMGv0vtFQY7k=
github.com/livekit/protocol v1.24.1-0.20241010185750-19b686d31289/go.mod h1:nxRzmQBKSYK64gqr7ABWwt78hvrgiO2wYuCojRYb7Gs=
github.com/livekit/psrpc v0.6.1-0.20240924010758-9f0a4268a3b9 h1:33oBjGpVD9tYkDXQU42tnHl8eCX9G6PVUToBVuCUyOs=
github.com/livekit/psrpc v0.6.1-0.20240924010758-9f0a4268a3b9/go.mod h1:CQUBSPfYYAaevg1TNCc6/aYsa8DJH4jSRFdCeSZk5u0=
github.com/livekit/server-sdk-go/v2 v2.2.2-0.20241003085414-b42e5a1da639 h1:5+iT4OaIukZ4TJwbOXAN+uh/wuAGArUoJyk5vmfQMY0=
Expand Down
4 changes: 4 additions & 0 deletions pkg/service/psrpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ func DispatchCall(ctx context.Context, psrpcClient rpc.IOInfoClient, log logger.
Headers: resp.Headers,
HeadersToAttributes: resp.HeadersToAttributes,
EnabledFeatures: resp.EnabledFeatures,
RingingTimeout: resp.RingingTimeout.AsDuration(),
MaxCallDuration: resp.MaxCallDuration.AsDuration(),
}
case rpc.SIPDispatchResult_ACCEPT:
return sip.CallDispatch{
Expand All @@ -123,6 +125,8 @@ func DispatchCall(ctx context.Context, psrpcClient rpc.IOInfoClient, log logger.
Headers: resp.Headers,
HeadersToAttributes: resp.HeadersToAttributes,
EnabledFeatures: resp.EnabledFeatures,
RingingTimeout: resp.RingingTimeout.AsDuration(),
MaxCallDuration: resp.MaxCallDuration.AsDuration(),
}
case rpc.SIPDispatchResult_REQUEST_PIN:
return sip.CallDispatch{
Expand Down
24 changes: 13 additions & 11 deletions pkg/sip/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,17 +179,19 @@ func (c *Client) createSIPParticipant(ctx context.Context, req *rpc.InternalCrea
},
}
sipConf := sipOutboundConfig{
address: req.Address,
transport: req.Transport,
host: req.Hostname,
from: req.Number,
to: req.CallTo,
user: req.Username,
pass: req.Password,
dtmf: req.Dtmf,
ringtone: req.PlayRingtone,
headers: req.Headers,
headersToAttrs: req.HeadersToAttributes,
address: req.Address,
transport: req.Transport,
host: req.Hostname,
from: req.Number,
to: req.CallTo,
user: req.Username,
pass: req.Password,
dtmf: req.Dtmf,
ringtone: req.PlayRingtone,
headers: req.Headers,
headersToAttrs: req.HeadersToAttributes,
ringingTimeout: req.RingingTimeout.AsDuration(),
maxCallDuration: req.MaxCallDuration.AsDuration(),
}
log.Infow("Creating SIP participant")
call, err := c.newCall(ctx, c.conf, log, LocalTag(req.SipCallId), roomConf, sipConf)
Expand Down
26 changes: 15 additions & 11 deletions pkg/sip/inbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,6 @@ const (
// audioBridgeMaxDelay delays sending audio for certain time, unless RTP packet is received.
// This is done because of audio cutoff at the beginning of calls observed in the wild.
audioBridgeMaxDelay = 1 * time.Second

// callSubscribeTimeout is a maximal duration which SIP participant will wait for other participant tracks.
// If no participant tracks are published by this time, the call will disconnect.
callSubscribeTimeout = 3 * time.Minute
)

func (s *Server) handleInviteAuth(log logger.Logger, req *sip.Request, tx sip.ServerTransaction, from, username, password string) (ok bool) {
Expand Down Expand Up @@ -397,6 +393,14 @@ func (c *inboundCall) handleInvite(ctx context.Context, req *sip.Request, trunkI
}
}
}
if disp.MaxCallDuration <= 0 || disp.MaxCallDuration > maxCallDuration {
disp.MaxCallDuration = maxCallDuration
}
if disp.RingingTimeout <= 0 {
disp.RingingTimeout = defaultRingingTimeout
}
ctx, cancel := context.WithTimeout(ctx, disp.MaxCallDuration)
defer cancel()
if !c.joinRoom(ctx, disp.Room) {
return // already sent a response
}
Expand All @@ -411,7 +415,7 @@ func (c *inboundCall) handleInvite(ctx context.Context, req *sip.Request, trunkI
c.log.Infow("Waiting for track subscription(s)")
// For dispatches without pin, we first wait for LK participant to become available,
// and also for at least one track subscription. In the meantime we keep ringing.
if !c.waitSubscribe(ctx) {
if !c.waitSubscribe(ctx, disp.RingingTimeout) {
return // already sent a response
}
if !acceptCall() {
Expand Down Expand Up @@ -502,11 +506,11 @@ func (c *inboundCall) waitMedia(ctx context.Context) bool {
return true
}

func (c *inboundCall) waitSubscribe(ctx context.Context) bool {
func (c *inboundCall) waitSubscribe(ctx context.Context, timeout time.Duration) bool {
ctx, span := tracer.Start(ctx, "inboundCall.waitSubscribe")
defer span.End()
timeout := time.NewTimer(callSubscribeTimeout)
defer timeout.Stop()
timer := time.NewTimer(timeout)
defer timer.Stop()
select {
case <-c.cc.Cancelled():
c.closeWithCancelled()
Expand All @@ -520,7 +524,7 @@ func (c *inboundCall) waitSubscribe(ctx context.Context) bool {
case <-c.media.Timeout():
c.closeWithTimeout()
return false
case <-timeout.C:
case <-timer.C:
c.close(false, callDropped, "cannot-subscribe")
return false
case <-c.lkRoom.Subscribed():
Expand Down Expand Up @@ -665,7 +669,7 @@ func (c *inboundCall) setStatus(v CallStatus) {
}

r.LocalParticipant.SetAttributes(map[string]string{
AttrSIPCallStatus: attr,
livekit.AttrSIPCallStatus: attr,
})
}

Expand All @@ -679,7 +683,7 @@ func (c *inboundCall) createLiveKitParticipant(ctx context.Context, rconf RoomCo
for k, v := range c.extraAttrs {
partConf.Attributes[k] = v
}
partConf.Attributes[AttrSIPCallStatus] = CallActive.Attribute()
partConf.Attributes[livekit.AttrSIPCallStatus] = CallActive.Attribute()
c.forwardDTMF.Store(true)
select {
case <-ctx.Done():
Expand Down
42 changes: 29 additions & 13 deletions pkg/sip/outbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,17 +43,19 @@ import (
)

type sipOutboundConfig struct {
address string
transport livekit.SIPTransport
host string
from string
to string
user string
pass string
dtmf string
ringtone bool
headers map[string]string
headersToAttrs map[string]string
address string
transport livekit.SIPTransport
host string
from string
to string
user string
pass string
dtmf string
ringtone bool
headers map[string]string
headersToAttrs map[string]string
ringingTimeout time.Duration
maxCallDuration time.Duration
}

type outboundCall struct {
Expand All @@ -76,6 +78,12 @@ func (c *Client) newCall(ctx context.Context, conf *config.Config, log logger.Lo
if sipConf.host == "" {
sipConf.host = c.signalingIp.String()
}
if sipConf.maxCallDuration <= 0 || sipConf.maxCallDuration > maxCallDuration {
sipConf.maxCallDuration = maxCallDuration
}
if sipConf.ringingTimeout <= 0 {
sipConf.ringingTimeout = defaultRingingTimeout
}
call := &outboundCall{
c: c,
log: log,
Expand Down Expand Up @@ -112,6 +120,8 @@ func (c *Client) newCall(ctx context.Context, conf *config.Config, log logger.Lo

func (c *outboundCall) Start(ctx context.Context) {
ctx = context.WithoutCancel(ctx)
ctx, cancel := context.WithTimeout(ctx, c.sipConf.maxCallDuration)
defer cancel()
c.mon.CallStart()
defer c.mon.CallEnd()
err := c.ConnectSIP(ctx)
Expand Down Expand Up @@ -231,7 +241,7 @@ func (c *outboundCall) connectToRoom(ctx context.Context, lkNew RoomConfig) erro
c.c.RegisterTransferSIPParticipant(sipCallID, c)
}

attrs[AttrSIPCallStatus] = CallDialing.Attribute()
attrs[livekit.AttrSIPCallStatus] = CallDialing.Attribute()
lkNew.Participant.Attributes = attrs
r := NewRoom(c.log)
if err := r.Connect(c.c.conf, lkNew); err != nil {
Expand Down Expand Up @@ -330,14 +340,20 @@ func (c *outboundCall) setStatus(v CallStatus) {
return
}
r.LocalParticipant.SetAttributes(map[string]string{
AttrSIPCallStatus: attr,
livekit.AttrSIPCallStatus: attr,
})
}

func (c *outboundCall) sipSignal(ctx context.Context) error {
ctx, span := tracer.Start(ctx, "outboundCall.sipSignal")
defer span.End()

if c.sipConf.ringingTimeout > 0 {
var cancel func()
ctx, cancel = context.WithTimeout(ctx, c.sipConf.ringingTimeout)
defer cancel()
}

ctx, cancel := context.WithCancel(ctx)
defer cancel()
go func() {
Expand Down
21 changes: 16 additions & 5 deletions pkg/sip/participant.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,22 @@

package sip

import "github.com/livekit/protocol/livekit"
import (
"time"

"github.com/livekit/protocol/livekit"
)

const (
// maxCallDuration sets a global max call duration.
maxCallDuration = 24 * time.Hour
// defaultRingingTimeout is a maximal duration which SIP participant will wait to connect.
//
// For inbound, the participant will wait this duration for other participant tracks.
//
// For outbound, this sets a timeout for the other end to pick up the call.
defaultRingingTimeout = 3 * time.Minute
)

var headerToLog = map[string]string{
"X-Twilio-AccountSid": "twilioAccSID",
Expand All @@ -27,10 +42,6 @@ var headerToAttr = map[string]string{
"X-Lk-Test-Id": "lktest.id",
}

const (
AttrSIPCallStatus = livekit.AttrSIPPrefix + "callStatus"
)

type CallStatus int

func (v CallStatus) Attribute() string {
Expand Down
3 changes: 3 additions & 0 deletions pkg/sip/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"net"
"net/netip"
"sync"
"time"

"github.com/emiago/sipgo"
"github.com/emiago/sipgo/sip"
Expand Down Expand Up @@ -93,6 +94,8 @@ type CallDispatch struct {
Headers map[string]string
HeadersToAttributes map[string]string
EnabledFeatures []rpc.SIPFeature
RingingTimeout time.Duration
MaxCallDuration time.Duration
}

type Handler interface {
Expand Down

0 comments on commit 2fa5c7a

Please sign in to comment.