Skip to content

Commit

Permalink
Revert UA change. Pass BYE from server to client. (#69)
Browse files Browse the repository at this point in the history
  • Loading branch information
dennwc authored Feb 28, 2024
1 parent e243f14 commit 73b66e8
Show file tree
Hide file tree
Showing 7 changed files with 113 additions and 32 deletions.
1 change: 1 addition & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ type Config struct {
ClusterID string `yaml:"cluster_id"` // cluster this instance belongs to

UseExternalIP bool `yaml:"use_external_ip"`
LocalNet string `yaml:"local_net"` // local IP net to use, e.g. 192.168.0.0/24
NAT1To1IP string `yaml:"nat_1_to_1_ip"`

// internal
Expand Down
45 changes: 28 additions & 17 deletions pkg/sip/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ type Client struct {
conf *config.Config
mon *stats.Monitor

sipCli *sipgo.Client
sipServer *sipgo.Server
signalingIp string
sipCli *sipgo.Client
signalingIp string
signalingIpLocal string

cmu sync.Mutex
activeCalls map[*outboundCall]struct{}
Expand All @@ -50,39 +50,41 @@ func NewClient(conf *config.Config, mon *stats.Monitor) *Client {
return c
}

func (c *Client) Start() error {
func (c *Client) Start(agent *sipgo.UserAgent) error {
var err error
if c.conf.UseExternalIP {
if c.signalingIp, err = getPublicIP(); err != nil {
return err
}
if c.signalingIpLocal, err = getLocalIP(c.conf.LocalNet); err != nil {
return err
}
} else if c.conf.NAT1To1IP != "" {
c.signalingIp = c.conf.NAT1To1IP
c.signalingIpLocal = c.signalingIp
} else {
if c.signalingIp, err = getLocalIP(); err != nil {
if c.signalingIp, err = getLocalIP(c.conf.LocalNet); err != nil {
return err
}
c.signalingIpLocal = c.signalingIp
}
logger.Infow("client starting", "local", c.signalingIpLocal, "external", c.signalingIp)

agent, err := sipgo.NewUA(
sipgo.WithUserAgent(UserAgent),
)
if err != nil {
return err
if agent == nil {
ua, err := sipgo.NewUA(
sipgo.WithUserAgent(UserAgent),
)
if err != nil {
return err
}
agent = ua
}

c.sipCli, err = sipgo.NewClient(agent, sipgo.WithClientHostname(c.signalingIp))
if err != nil {
return err
}

c.sipServer, err = sipgo.NewServer(agent)
if err != nil {
return err
}

c.sipServer.OnBye(c.onBye)

return nil
}

Expand Down Expand Up @@ -147,7 +149,16 @@ func (c *Client) CreateSIPParticipant(ctx context.Context, req *rpc.InternalCrea
return &rpc.InternalCreateSIPParticipantResponse{ParticipantId: p.ID, ParticipantIdentity: p.Identity}, nil
}

func (c *Client) OnRequest(req *sip.Request, tx sip.ServerTransaction) {
switch req.Method {
case "BYE":
c.onBye(req, tx)
}
}

func (c *Client) onBye(req *sip.Request, tx sip.ServerTransaction) {
tag, _ := getTagValue(req)
logger.Infow("BYE", "tag", tag)
c.cmu.Lock()
defer c.cmu.Unlock()

Expand Down
15 changes: 14 additions & 1 deletion pkg/sip/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,18 @@ func getPublicIP() (string, error) {
return ip.Query, nil
}

func getLocalIP() (string, error) {
func getLocalIP(localNet string) (string, error) {
ifaces, err := net.Interfaces()
if err != nil {
return "", err
}
var netw *net.IPNet
if localNet != "" {
_, netw, err = net.ParseCIDR(localNet)
if err != nil {
return "", err
}
}
for _, i := range ifaces {
addrs, err := i.Addrs()
if err != nil {
Expand All @@ -72,10 +79,16 @@ func getLocalIP() (string, error) {
for _, a := range addrs {
switch v := a.(type) {
case *net.IPAddr:
if netw != nil && !netw.Contains(v.IP) {
continue
}
if !v.IP.IsLoopback() && v.IP.To4() != nil {
return v.IP.String(), nil
}
case *net.IPNet:
if netw != nil && !netw.Contains(v.IP) {
continue
}
if !v.IP.IsLoopback() && v.IP.To4() != nil {
return v.IP.String(), nil
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/sip/inbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,8 @@ func (s *Server) onBye(req *sip.Request, tx sip.ServerTransaction) {
s.cmu.RUnlock()
if c != nil {
c.Close()
} else if s.sipUnhandled != nil {
s.sipUnhandled(req, tx)
}
_ = tx.Respond(sip.NewResponseFromRequest(req, 200, "OK", nil))
}
Expand Down
28 changes: 28 additions & 0 deletions pkg/sip/outbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,34 @@ func (c *outboundCall) sipAttemptInvite(offer []byte, conf sipOutboundConfig, au
req.AppendHeader(&sip.ToHeader{Address: *to})
req.AppendHeader(fromHeader)
req.AppendHeader(&sip.ContactHeader{Address: *from})
// TODO: This issue came out when we tried creating separate UA for the server and the client.
// So we might need to set Via explicitly. Anyway, UA must be shared for other reasons,
// and the calls work without explicit Via. So keep it simple, and let SIPGO set Via for now.
// Code will remain here for the future reference/refactoring.
if false {
if c.c.signalingIp != c.c.signalingIpLocal {
// SIPGO will use Via/From headers to figure out which interface to listen on, which will obviously fail
// in case we specify our external IP in there. So we must explicitly add a Via with our local IP.
params := sip.NewParams()
params["branch"] = sip.GenerateBranch()
req.AppendHeader(&sip.ViaHeader{
ProtocolName: "SIP",
ProtocolVersion: "2.0",
Transport: req.Transport(),
Host: c.c.signalingIpLocal,
Params: params,
})
}
params := sip.NewParams()
params["branch"] = sip.GenerateBranch()
req.AppendHeader(&sip.ViaHeader{
ProtocolName: "SIP",
ProtocolVersion: "2.0",
Transport: req.Transport(),
Host: c.c.signalingIp,
Params: params,
})
}
req.AppendHeader(sip.NewHeader("Content-Type", "application/sdp"))
req.AppendHeader(sip.NewHeader("Allow", "INVITE, ACK, CANCEL, BYE, NOTIFY, REFER, MESSAGE, OPTIONS, INFO, SUBSCRIBE"))

Expand Down
35 changes: 24 additions & 11 deletions pkg/sip/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/emiago/sipgo"
"github.com/emiago/sipgo/sip"
"github.com/icholy/digest"
"github.com/livekit/protocol/logger"
"golang.org/x/exp/maps"

"github.com/livekit/sip/pkg/config"
Expand Down Expand Up @@ -70,10 +71,12 @@ type Handler interface {
}

type Server struct {
mon *stats.Monitor
sipSrv *sipgo.Server
sipConn *net.UDPConn
signalingIp string
mon *stats.Monitor
sipSrv *sipgo.Server
sipConn *net.UDPConn
sipUnhandled sipgo.RequestHandler
signalingIp string
signalingIpLocal string

inProgressInvites []*inProgressInvite

Expand Down Expand Up @@ -124,25 +127,34 @@ func sipErrorResponse(tx sip.ServerTransaction, req *sip.Request) {
_ = tx.Respond(sip.NewResponseFromRequest(req, 400, "", nil))
}

func (s *Server) Start() error {
func (s *Server) Start(agent *sipgo.UserAgent, unhandled sipgo.RequestHandler) error {
var err error
if s.conf.UseExternalIP {
if s.signalingIp, err = getPublicIP(); err != nil {
return err
}
if s.signalingIpLocal, err = getLocalIP(s.conf.LocalNet); err != nil {
return err
}
} else if s.conf.NAT1To1IP != "" {
s.signalingIp = s.conf.NAT1To1IP
s.signalingIpLocal = s.signalingIp
} else {
if s.signalingIp, err = getLocalIP(); err != nil {
if s.signalingIp, err = getLocalIP(s.conf.LocalNet); err != nil {
return err
}
s.signalingIpLocal = s.signalingIp
}
logger.Infow("server starting", "local", s.signalingIpLocal, "external", s.signalingIp)

agent, err := sipgo.NewUA(
sipgo.WithUserAgent(UserAgent),
)
if err != nil {
return err
if agent == nil {
ua, err := sipgo.NewUA(
sipgo.WithUserAgent(UserAgent),
)
if err != nil {
return err
}
agent = ua
}

s.sipSrv, err = sipgo.NewServer(agent)
Expand All @@ -152,6 +164,7 @@ func (s *Server) Start() error {

s.sipSrv.OnInvite(s.onInvite)
s.sipSrv.OnBye(s.onBye)
s.sipUnhandled = unhandled

// Ignore ACKs
s.sipSrv.OnAck(func(req *sip.Request, tx sip.ServerTransaction) {})
Expand Down
19 changes: 16 additions & 3 deletions pkg/sip/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package sip

import (
"github.com/emiago/sipgo"
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/rpc"
"golang.org/x/exp/maps"
Expand Down Expand Up @@ -75,11 +76,23 @@ func (s *Service) Start() error {
if err := s.mon.Start(s.conf); err != nil {
return err
}

if err := s.cli.Start(); err != nil {
// The UA must be shared between the client and the server.
// Otherwise, the client will have to listen on a random port, which must then be forwarded.
//
// Routers are smart, they usually keep the UDP "session" open for a few moments, and may allow INVITE handshake
// to pass even without forwarding rules on the firewall. ut it will inevitably fail later on follow-up requests like BYE.
ua, err := sipgo.NewUA(
sipgo.WithUserAgent(UserAgent),
)
if err != nil {
return err
}
if err := s.cli.Start(ua); err != nil {
return err
}
if err := s.srv.Start(); err != nil {
// Server is responsible for answering all transactions. However, the client may also receive some (e.g. BYE).
// Thus, all unhandled transactions will be checked by the client.
if err := s.srv.Start(ua, s.cli.OnRequest); err != nil {
return err
}
logger.Debugw("sip service ready")
Expand Down

0 comments on commit 73b66e8

Please sign in to comment.