Skip to content

Commit

Permalink
Protect pktiohandler maps with mutex (#537)
Browse files Browse the repository at this point in the history
* Protect pktiohandler maps with mutex

* fix double lock
  • Loading branch information
DanG100 authored Jan 30, 2025
1 parent 8489a77 commit f5c4cf8
Showing 1 changed file with 18 additions and 2 deletions.
20 changes: 18 additions & 2 deletions dataplane/standalone/pkthandler/pktiohandler/pktiohandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"io"
"os"
"sync"
"time"

"google.golang.org/genproto/googleapis/rpc/status"
Expand Down Expand Up @@ -52,6 +53,7 @@ func New(portFile string) (*PacketIOMgr, error) {

// PacketIOMgr creates and delete ports and reads and writes to them.
type PacketIOMgr struct {
mu sync.Mutex
hostifs map[uint64]*port
dplanePortIfIndex map[uint64]int // For tap devices, maps the dataport port id to hostif if index.
sendQueue *queue.Queue
Expand Down Expand Up @@ -104,7 +106,9 @@ func (m *PacketIOMgr) StreamPackets(c pktiopb.PacketIO_CPUPacketStreamClient) er
log.Warningf("received err from server: %v", err)
continue
}
m.mu.Lock()
port, ok := m.hostifs[out.GetPacket().GetHostPort()]
m.mu.Unlock()
if !ok {
log.Warningf("skipping unknown port id: %v", out.GetPacket().GetHostPort())
continue
Expand All @@ -119,6 +123,8 @@ func (m *PacketIOMgr) StreamPackets(c pktiopb.PacketIO_CPUPacketStreamClient) er
}

func (m *PacketIOMgr) metadataFromPacket(p *pktiopb.Packet) *kernel.PacketMetadata {
m.mu.Lock()
defer m.mu.Unlock()
md := &kernel.PacketMetadata{
SrcIfIndex: int16(m.dplanePortIfIndex[p.GetInputPort()]),
DstIfIndex: int16(m.dplanePortIfIndex[p.GetOutputPort()]),
Expand All @@ -128,6 +134,9 @@ func (m *PacketIOMgr) metadataFromPacket(p *pktiopb.Packet) *kernel.PacketMetada
}

func (m *PacketIOMgr) writePorts() error {
m.mu.Lock()
defer m.mu.Unlock()

if m.portFile == "" {
return nil
}
Expand All @@ -138,7 +147,6 @@ func (m *PacketIOMgr) writePorts() error {
Hostifs: make(map[uint64]*pktiopb.HostPortControlMessage),
PortIds: m.dplanePortIfIndex,
}

for id, h := range m.hostifs {
msg.Hostifs[id] = h.msg
}
Expand Down Expand Up @@ -182,7 +190,9 @@ func (m *PacketIOMgr) ManagePorts(c pktiopb.PacketIO_HostPortControlClient) erro
log.Warningf("failed to write file: %v", err)
}
case pktiopb.PortOperation_PORT_OPERATION_DELETE:
m.mu.Lock()
p, ok := m.hostifs[resp.GetPortId()]
m.mu.Unlock()
if !ok {
sendErr := c.Send(&pktiopb.HostPortControlRequest{Msg: &pktiopb.HostPortControlRequest_Status{
Status: &status.Status{
Expand All @@ -195,7 +205,7 @@ func (m *PacketIOMgr) ManagePorts(c pktiopb.PacketIO_HostPortControlClient) erro
}
continue
}
m.hostifs[resp.GetPortId()].cancelFn()
p.cancelFn()
if err := p.Delete(); err != nil {
sendErr := c.Send(&pktiopb.HostPortControlRequest{Msg: &pktiopb.HostPortControlRequest_Status{
Status: &status.Status{
Expand All @@ -207,7 +217,9 @@ func (m *PacketIOMgr) ManagePorts(c pktiopb.PacketIO_HostPortControlClient) erro
return sendErr
}
}
m.mu.Lock()
delete(m.hostifs, resp.GetPortId())
m.mu.Unlock()
sendErr := c.Send(&pktiopb.HostPortControlRequest{Msg: &pktiopb.HostPortControlRequest_Status{
Status: &status.Status{
Code: int32(codes.OK),
Expand All @@ -220,7 +232,9 @@ func (m *PacketIOMgr) ManagePorts(c pktiopb.PacketIO_HostPortControlClient) erro
log.Warningf("failed to write file: %v", err)
}
case pktiopb.PortOperation_PORT_OPERATION_SET_UP, pktiopb.PortOperation_PORT_OPERATION_SET_DOWN:
m.mu.Lock()
p, ok := m.hostifs[resp.GetPortId()]
m.mu.Unlock()
if !ok {
sendErr := c.Send(&pktiopb.HostPortControlRequest{Msg: &pktiopb.HostPortControlRequest_Status{
Status: &status.Status{
Expand Down Expand Up @@ -262,6 +276,8 @@ func Register(t pktiopb.PortType, b func(*pktiopb.HostPortControlMessage) (PortI
var linkByName = netlink.LinkByName

func (m *PacketIOMgr) createPort(msg *pktiopb.HostPortControlMessage) error {
m.mu.Lock()
defer m.mu.Unlock()
var p PortIO
switch msg.GetPort().(type) {
case *pktiopb.HostPortControlMessage_Genetlink:
Expand Down

0 comments on commit f5c4cf8

Please sign in to comment.