Skip to content

Commit

Permalink
Wrap client and server into a Service (#5)
Browse files Browse the repository at this point in the history
* Share SIP user agent.

* Move SIP client and server init to the service package.

* Refactor SIP service similar to ingress/egress.

* Use function pointers instead of interfaces.
  • Loading branch information
dennwc authored Dec 1, 2023
1 parent 074c99d commit 3fff27b
Show file tree
Hide file tree
Showing 7 changed files with 140 additions and 55 deletions.
21 changes: 8 additions & 13 deletions cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package main

import (
"fmt"
"log"
"os"
"os/signal"
"syscall"
Expand Down Expand Up @@ -84,15 +83,16 @@ func runService(c *cli.Context) error {
killChan := make(chan os.Signal, 1)
signal.Notify(killChan, syscall.SIGINT)

sipCli := sip.NewClient(conf)
if err = sipCli.Start(); err != nil {
sipsrv, err := sip.NewService(conf)
if err != nil {
return err
}

svc := service.NewService(conf, sipCli, psrpcClient, bus)
svc := service.NewService(conf, sipsrv.InternalServerImpl(), psrpcClient, bus)
sipsrv.SetAuthHandler(svc.HandleTrunkAuthentication)
sipsrv.SetDispatchRuleHandlerFunc(svc.HandleDispatchRules)

sipSrv := sip.NewServer(conf, svc.HandleTrunkAuthentication, svc.HandleDispatchRules)
if err = sipSrv.Start(); err != nil {
if err = sipsrv.Start(); err != nil {
return err
}

Expand All @@ -101,17 +101,12 @@ func runService(c *cli.Context) error {
case sig := <-stopChan:
logger.Infow("exit requested, finishing all SIP then shutting down", "signal", sig)
svc.Stop(false)
sipsrv.Stop(false)

case sig := <-killChan:
logger.Infow("exit requested, stopping all SIP and shutting down", "signal", sig)
svc.Stop(true)
if err = sipCli.Stop(); err != nil {
log.Println(err)
}
if err = sipSrv.Stop(); err != nil {
log.Println(err)
}

sipsrv.Stop(true)
}
}()

Expand Down
19 changes: 11 additions & 8 deletions pkg/sip/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,18 @@ func NewClient(conf *config.Config) *Client {
return c
}

func (c *Client) Start() error {
ua, err := sipgo.NewUA(
sipgo.WithUserAgent(userAgent),
)
if err != nil {
return err
func (c *Client) Start(agent *sipgo.UserAgent) error {
if agent == nil {
ua, err := sipgo.NewUA(
sipgo.WithUserAgent(UserAgent),
)
if err != nil {
return err
}
agent = ua
}

c.sipCli, err = sipgo.NewClient(ua, sipgo.WithClientHostname(c.publicIp))
var err error
c.sipCli, err = sipgo.NewClient(agent, sipgo.WithClientHostname(c.publicIp))
if err != nil {
return err
}
Expand Down
9 changes: 4 additions & 5 deletions pkg/sip/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,31 +17,30 @@ package sip
import (
"encoding/json"
"io"
"log"
"net/http"
)

func getPublicIP() string {
req, err := http.Get("http://ip-api.com/json/")
if err != nil {
log.Fatal(err)
panic(err)
}
defer req.Body.Close()

body, err := io.ReadAll(req.Body)
if err != nil {
log.Fatal(err)
panic(err)
}

ip := struct {
Query string
}{}
if err = json.Unmarshal(body, &ip); err != nil {
log.Fatal(err)
panic(err)
}

if ip.Query == "" {
log.Fatal("Query entry was not populated")
panic("Query entry was not populated")
}

return ip.Query
Expand Down
4 changes: 2 additions & 2 deletions pkg/sip/inbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func (s *Server) handleInviteAuth(req *sip.Request, tx sip.ServerTransaction, fr
h := req.GetHeader("Proxy-Authorization")
if h == nil {
inviteState.challenge = digest.Challenge{
Realm: userAgent,
Realm: UserAgent,
Nonce: fmt.Sprintf("%d", time.Now().UnixMicro()),
Algorithm: "MD5",
}
Expand Down Expand Up @@ -111,7 +111,7 @@ func (s *Server) onInvite(req *sip.Request, tx sip.ServerTransaction) {
}
src := req.Source()

username, password, err := s.authenticationHandler(from.Address.User, to.Address.User, src)
username, password, err := s.authHandler(from.Address.User, to.Address.User, src)
if err != nil {
sipErrorResponse(tx, req)
return
Expand Down
56 changes: 32 additions & 24 deletions pkg/sip/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
)

const (
userAgent = "LiveKit"
UserAgent = "LiveKit"
digestLimit = 500
)

Expand All @@ -38,10 +38,9 @@ var (
)

type (
authenticationHandlerFunc func(from, to, srcAddress string) (username, password string, err error)
dispatchRuleHandlerFunc func(callingNumber, calledNumber, srcAddress, pin string, skipPin bool) (joinRoom, identity string, pinRequired, hangup bool)

Server struct {
AuthHandlerFunc func(from, to, srcAddress string) (username, password string, err error)
DispatchRuleHandlerFunc func(callingNumber, calledNumber, srcAddress string, pin string, noPin bool) (joinRoom, identity string, requestPin, rejectInvite bool)
Server struct {
sipSrv *sipgo.Server
publicIp string

Expand All @@ -50,9 +49,9 @@ type (
cmu sync.RWMutex
activeCalls map[string]*inboundCall

authenticationHandler authenticationHandlerFunc
dispatchRuleHandler dispatchRuleHandlerFunc
conf *config.Config
authHandler AuthHandlerFunc
dispatchRuleHandler DispatchRuleHandlerFunc
conf *config.Config

res mediaRes
}
Expand All @@ -63,19 +62,25 @@ type (
}
)

func NewServer(conf *config.Config, authenticationHandler authenticationHandlerFunc, dispatchRuleHandler dispatchRuleHandlerFunc) *Server {
func NewServer(conf *config.Config) *Server {
s := &Server{
conf: conf,
publicIp: getPublicIP(),
activeCalls: make(map[string]*inboundCall),
inProgressInvites: []*inProgressInvite{},
authenticationHandler: authenticationHandler,
dispatchRuleHandler: dispatchRuleHandler,
conf: conf,
publicIp: getPublicIP(),
activeCalls: make(map[string]*inboundCall),
inProgressInvites: []*inProgressInvite{},
}
s.initMediaRes()
return s
}

func (s *Server) SetAuthHandler(handler AuthHandlerFunc) {
s.authHandler = handler
}

func (s *Server) SetDispatchRuleHandlerFunc(handler DispatchRuleHandlerFunc) {
s.dispatchRuleHandler = handler
}

func getTagValue(req *sip.Request) (string, error) {
from, ok := req.From()
if !ok {
Expand Down Expand Up @@ -104,17 +109,20 @@ func logOnError(err error) {
}
}

func (s *Server) Start() error {
ua, err := sipgo.NewUA(
sipgo.WithUserAgent(userAgent),
)
if err != nil {
log.Fatal(err)
func (s *Server) Start(agent *sipgo.UserAgent) error {
if agent == nil {
ua, err := sipgo.NewUA(
sipgo.WithUserAgent(UserAgent),
)
if err != nil {
return err
}
agent = ua
}

s.sipSrv, err = sipgo.NewServer(ua)
var err error
s.sipSrv, err = sipgo.NewServer(agent)
if err != nil {
log.Fatal(err)
return err
}

s.sipSrv.OnInvite(s.onInvite)
Expand Down
81 changes: 81 additions & 0 deletions pkg/sip/service.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// Copyright 2023 LiveKit, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package sip

import (
"log"

"github.com/emiago/sipgo"
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/rpc"

"github.com/livekit/sip/pkg/config"
"github.com/livekit/sip/version"
)

type Service struct {
cli *Client
srv *Server
}

func NewService(conf *config.Config) (*Service, error) {
cli := NewClient(conf)
s := &Service{
cli: cli,
}
s.srv = NewServer(conf)
return s, nil
}

func (s *Service) Stop(kill bool) {
if kill {
if err := s.cli.Stop(); err != nil {
log.Println(err)
}
if err := s.srv.Stop(); err != nil {
log.Println(err)
}
}
}

func (s *Service) SetAuthHandler(handler AuthHandlerFunc) {
s.srv.SetAuthHandler(handler)
}

func (s *Service) SetDispatchRuleHandlerFunc(handler DispatchRuleHandlerFunc) {
s.srv.SetDispatchRuleHandlerFunc(handler)
}

func (s *Service) InternalServerImpl() rpc.SIPInternalServerImpl {
return s.cli
}

func (s *Service) Start() error {
logger.Debugw("starting sip service", "version", version.Version)
ua, err := sipgo.NewUA(
sipgo.WithUserAgent(UserAgent),
)
if err != nil {
return err
}
if err = s.cli.Start(ua); err != nil {
return err
}
if err = s.srv.Start(ua); err != nil {
return err
}
logger.Debugw("sip service ready")
return nil
}
5 changes: 2 additions & 3 deletions test/client/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package main
import (
"flag"
"fmt"
"log"
"math/rand"
"net"
"os"
Expand Down Expand Up @@ -236,12 +235,12 @@ func main() {
sipgo.WithUserAgent(*from),
)
if err != nil {
log.Fatal(err)
panic(err)
}

sipClient, err := sipgo.NewClient(ua, sipgo.WithClientHostname(getLocalIP()))
if err != nil {
log.Fatal(err)
panic(err)
}

var (
Expand Down

0 comments on commit 3fff27b

Please sign in to comment.