Skip to content

Commit

Permalink
Ignore repeated TransferSIPParticipant RPC request with same paramete…
Browse files Browse the repository at this point in the history
…rs (#195)
  • Loading branch information
biglittlebigben authored Oct 11, 2024
1 parent 0c423aa commit 711a46e
Showing 1 changed file with 67 additions and 16 deletions.
83 changes: 67 additions & 16 deletions pkg/sip/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ package sip

import (
"context"
"sync"
"sync/atomic"
"time"

"github.com/emiago/sipgo"
Expand All @@ -36,18 +38,27 @@ type Service struct {
mon *stats.Monitor
cli *Client
srv *Server

mu sync.Mutex
pendingTransfers map[transferKey]chan struct{}
}

type transferKey struct {
SipCallId string
TransferTo string
}

func NewService(conf *config.Config, mon *stats.Monitor, log logger.Logger) *Service {
if log == nil {
log = logger.GetLogger()
}
s := &Service{
conf: conf,
log: log,
mon: mon,
cli: NewClient(conf, log, mon),
srv: NewServer(conf, log, mon),
conf: conf,
log: log,
mon: mon,
cli: NewClient(conf, log, mon),
srv: NewServer(conf, log, mon),
pendingTransfers: make(map[transferKey]chan struct{}),
}
return s
}
Expand Down Expand Up @@ -124,35 +135,75 @@ func (s *Service) CreateSIPParticipantAffinity(ctx context.Context, req *rpc.Int
func (s *Service) TransferSIPParticipant(ctx context.Context, req *rpc.InternalTransferSIPParticipantRequest) (*emptypb.Empty, error) {
s.log.Infow("transfering SIP call", "callID", req.SipCallId, "transferTo", req.TransferTo)

ctx, done := context.WithTimeout(context.WithoutCancel(ctx), 30*time.Second)
defer done()
var transfetResult atomic.Pointer[error]

s.mu.Lock()
k := transferKey{
SipCallId: req.SipCallId,
TransferTo: req.TransferTo,
}
done, ok := s.pendingTransfers[k]
if !ok {
done = make(chan struct{})
s.pendingTransfers[k] = done

go func() {
ctx, cdone := context.WithTimeout(context.WithoutCancel(ctx), 30*time.Second)
defer cdone()

err := s.processParticipantTransfer(ctx, req.SipCallId, req.TransferTo)
transfetResult.Store(&err)
close(done)

s.mu.Lock()
delete(s.pendingTransfers, k)
s.mu.Unlock()
}()
} else {
s.log.Debugw("repeated request for call transfer", "callID", req.SipCallId, "transferTo", req.TransferTo)
}
s.mu.Unlock()

select {
case <-done:
var err error
errPtr := transfetResult.Load()
if errPtr != nil {
err = *errPtr
}
return &emptypb.Empty{}, err
case <-ctx.Done():
return &emptypb.Empty{}, psrpc.NewError(psrpc.Canceled, ctx.Err())
}
}

func (s *Service) processParticipantTransfer(ctx context.Context, callID string, transferTo string) error {
// Look for call both in client (outbound) and server (inbound)
s.cli.cmu.Lock()
out := s.cli.activeCalls[LocalTag(req.SipCallId)]
out := s.cli.activeCalls[LocalTag(callID)]
s.cli.cmu.Unlock()

if out != nil {
err := out.transferCall(ctx, req.TransferTo)
err := out.transferCall(ctx, transferTo)
if err != nil {
return nil, err
return err
}

return &emptypb.Empty{}, nil
return nil
}

s.srv.cmu.Lock()
in := s.srv.byLocal[LocalTag(req.SipCallId)]
in := s.srv.byLocal[LocalTag(callID)]
s.srv.cmu.Unlock()

if in != nil {
err := in.transferCall(ctx, req.TransferTo)
err := in.transferCall(ctx, transferTo)
if err != nil {
return nil, err
return err
}

return &emptypb.Empty{}, nil
return nil
}

return nil, psrpc.NewErrorf(psrpc.NotFound, "unknown call")
return psrpc.NewErrorf(psrpc.NotFound, "unknown call")
}

0 comments on commit 711a46e

Please sign in to comment.