Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

log: switch to slog #103

Merged
merged 12 commits into from
Jan 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 27 additions & 27 deletions dv/dv/advert_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,37 +10,37 @@ import (
"github.com/named-data/ndnd/std/utils"
)

func (dv *Router) advertGenerateNew() {
dv.mutex.Lock()
defer dv.mutex.Unlock()
func (a *advertModule) generate() {
a.dv.mutex.Lock()
defer a.dv.mutex.Unlock()

// Increment sequence number
dv.advertSyncSeq++
a.seq++

// Produce the advertisement
name, err := dv.client.Produce(object.ProduceArgs{
Name: dv.config.AdvertisementDataPrefix().Append(
enc.NewTimestampComponent(dv.advertBootTime),
name, err := a.dv.client.Produce(object.ProduceArgs{
Name: a.dv.config.AdvertisementDataPrefix().Append(
enc.NewTimestampComponent(a.bootTime),
),
Content: dv.rib.Advert().Encode(),
Version: utils.IdPtr(dv.advertSyncSeq),
Content: a.dv.rib.Advert().Encode(),
Version: utils.IdPtr(a.seq),
FreshnessPeriod: 10 * time.Second,
})
if err != nil {
log.Errorf("advert-data: failed to produce advertisement: %+v", err)
log.Error(a, "Failed to produce advertisement", "err", err)
}
dv.advertDir.Push(name)
dv.advertDir.Evict(dv.client)
a.objDir.Push(name)
a.objDir.Evict(a.dv.client)

// Notify neighbors with sync for new advertisement
go dv.advertSyncSendInterest()
go a.sendSyncInterest()
}

func (dv *Router) advertDataFetch(nName enc.Name, bootTime uint64, seqNo uint64) {
func (a *advertModule) dataFetch(nName enc.Name, bootTime uint64, seqNo uint64) {
// debounce; wait before fetching, then check if this is still the latest
// sequence number known for this neighbor
time.Sleep(10 * time.Millisecond)
if ns := dv.neighbors.Get(nName); ns == nil || ns.AdvertBoot != bootTime || ns.AdvertSeq != seqNo {
if ns := a.dv.neighbors.Get(nName); ns == nil || ns.AdvertBoot != bootTime || ns.AdvertSeq != seqNo {
return
}

Expand All @@ -52,53 +52,53 @@ func (dv *Router) advertDataFetch(nName enc.Name, bootTime uint64, seqNo uint64)
enc.NewVersionComponent(seqNo),
)...)

dv.client.Consume(advName, func(state *object.ConsumeState) bool {
a.dv.client.Consume(advName, func(state *object.ConsumeState) bool {
if !state.IsComplete() {
return true
}

go func() {
fetchErr := state.Error()
if fetchErr != nil {
log.Warnf("advert-data: failed to fetch advertisement %s: %+v", state.Name(), fetchErr)
log.Warn(a, "Failed to fetch advertisement", "name", state.Name(), "err", fetchErr)
time.Sleep(1 * time.Second) // wait on error
dv.advertDataFetch(nName, bootTime, seqNo)
a.dataFetch(nName, bootTime, seqNo)
return
}

// Process the advertisement
dv.advertDataHandler(nName, seqNo, state.Content())
a.dataHandler(nName, seqNo, state.Content())
}()

return true
})
}

// Received advertisement Data
func (dv *Router) advertDataHandler(nName enc.Name, seqNo uint64, data []byte) {
func (a *advertModule) dataHandler(nName enc.Name, seqNo uint64, data []byte) {
// Lock DV state
dv.mutex.Lock()
defer dv.mutex.Unlock()
a.dv.mutex.Lock()
defer a.dv.mutex.Unlock()

// Check if this is the latest advertisement
ns := dv.neighbors.Get(nName)
ns := a.dv.neighbors.Get(nName)
if ns == nil {
log.Warnf("advert-data: unknown advertisement %s", nName)
log.Warn(a, "Unknown advertisement", "name", nName)
return
}
if ns.AdvertSeq != seqNo {
log.Debugf("advert-data: old advertisement for %s (%d != %d)", nName, ns.AdvertSeq, seqNo)
log.Debug(a, "Old advertisement", "name", nName, "want", ns.AdvertSeq, "have", seqNo)
return
}

// Parse the advertisement
advert, err := tlv.ParseAdvertisement(enc.NewBufferReader(data), false)
if err != nil {
log.Errorf("advert-data: failed to parse advertisement: %+v", err)
log.Error(a, "Failed to parse advertisement", "err", err)
return
}

// Update the local advertisement list
ns.Advert = advert
go dv.ribUpdate(ns)
go a.dv.ribUpdate(ns)
}
74 changes: 45 additions & 29 deletions dv/dv/advert_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,38 +9,54 @@ import (
"github.com/named-data/ndnd/std/ndn"
spec "github.com/named-data/ndnd/std/ndn/spec_2022"
spec_svs "github.com/named-data/ndnd/std/ndn/svs/v3"
"github.com/named-data/ndnd/std/object"
sec "github.com/named-data/ndnd/std/security"
"github.com/named-data/ndnd/std/utils"
)

func (dv *Router) advertSyncSendInterest() (err error) {
type advertModule struct {
// parent router
dv *Router
// advertisement boot time for self
bootTime uint64
// advertisement sequence number for self
seq uint64
// object directory for advertisement data
objDir *object.MemoryFifoDir
}

func (a *advertModule) String() string {
return "dv-advert"
}

func (a *advertModule) sendSyncInterest() (err error) {
// Sync Interests for our outgoing connections
err = dv.advertSyncSendInterestImpl(dv.config.AdvertisementSyncActivePrefix())
err = a.sendSyncInterestImpl(a.dv.config.AdvertisementSyncActivePrefix())
if err != nil {
log.Warnf("advert-sync: failed to send active sync interest: %+v", err)
log.Error(a, "Failed to send active sync interest", "err", err)
}

// Sync Interests for incoming connections
err = dv.advertSyncSendInterestImpl(dv.config.AdvertisementSyncPassivePrefix())
err = a.sendSyncInterestImpl(a.dv.config.AdvertisementSyncPassivePrefix())
if err != nil {
log.Warnf("advert-sync: failed to send passive sync interest: %+v", err)
log.Error(a, "Failed to send passive sync interest", "err", err)
}

return err
}

func (dv *Router) advertSyncSendInterestImpl(prefix enc.Name) (err error) {
func (a *advertModule) sendSyncInterestImpl(prefix enc.Name) (err error) {
// SVS v3 Sync Data
syncName := prefix.Append(enc.NewVersionComponent(3))

// State Vector for our group
sv := &spec_svs.SvsData{
StateVector: &spec_svs.StateVector{
Entries: []*spec_svs.StateVectorEntry{{
Name: dv.config.RouterName(),
Name: a.dv.config.RouterName(),
SeqNoEntries: []*spec_svs.SeqNoEntry{{
BootstrapTime: dv.advertBootTime,
SeqNo: dv.advertSyncSeq,
BootstrapTime: a.bootTime,
SeqNo: a.seq,
}},
}},
},
Expand All @@ -52,53 +68,53 @@ func (dv *Router) advertSyncSendInterestImpl(prefix enc.Name) (err error) {
dataCfg := &ndn.DataConfig{
ContentType: utils.IdPtr(ndn.ContentTypeBlob),
}
data, err := dv.engine.Spec().MakeData(syncName, dataCfg, sv.Encode(), signer)
data, err := a.dv.engine.Spec().MakeData(syncName, dataCfg, sv.Encode(), signer)
if err != nil {
log.Errorf("advert-sync: sendSyncInterest failed make data: %+v", err)
log.Error(nil, "Failed make data", "err", err)
return
}

// Make SVS Sync Interest
intCfg := &ndn.InterestConfig{
Lifetime: utils.IdPtr(1 * time.Second),
Nonce: utils.ConvertNonce(dv.engine.Timer().Nonce()),
Nonce: utils.ConvertNonce(a.dv.engine.Timer().Nonce()),
HopLimit: utils.IdPtr(uint(2)), // use localhop w/ this
}
interest, err := dv.engine.Spec().MakeInterest(syncName, intCfg, data.Wire, nil)
interest, err := a.dv.engine.Spec().MakeInterest(syncName, intCfg, data.Wire, nil)
if err != nil {
return err
}

// Sync Interest has no reply
err = dv.engine.Express(interest, nil)
err = a.dv.engine.Express(interest, nil)
if err != nil {
return err
}

return nil
}

func (dv *Router) advertSyncOnInterest(args ndn.InterestHandlerArgs, active bool) {
func (a *advertModule) OnSyncInterest(args ndn.InterestHandlerArgs, active bool) {
// If there is no incoming face ID, we can't use this
if args.IncomingFaceId == nil {
log.Warn("advert-sync: received Sync Interest with no incoming face ID, ignoring")
log.Warn(a, "Received Sync Interest with no incoming face ID, ignoring")
return
}

// Check if app param is present
if args.Interest.AppParam() == nil {
log.Warn("advert-sync: received Sync Interest with no AppParam, ignoring")
log.Warn(a, "Received Sync Interest with no AppParam, ignoring")
return
}

// Decode Sync Data
pkt, _, err := spec.ReadPacket(enc.NewWireReader(args.Interest.AppParam()))
if err != nil {
log.Warnf("advert-sync: failed to parse Sync Data: %+v", err)
log.Warn(a, "Failed to parse Sync Data", "err", err)
return
}
if pkt.Data == nil {
log.Warnf("advert-sync: no Sync Data, ignoring")
log.Warn(a, "No Sync Data, ignoring")
return
}

Expand All @@ -108,40 +124,40 @@ func (dv *Router) advertSyncOnInterest(args ndn.InterestHandlerArgs, active bool
svWire := pkt.Data.Content()
params, err := spec_svs.ParseSvsData(enc.NewWireReader(svWire), false)
if err != nil || params.StateVector == nil {
log.Warnf("advert-sync: failed to parse StateVec: %+v", err)
log.Warn(a, "Failed to parse StateVec", "err", err)
return
}

// Process each entry in the state vector
dv.mutex.Lock()
defer dv.mutex.Unlock()
a.dv.mutex.Lock()
defer a.dv.mutex.Unlock()

// FIB needs update if face changes for any neighbor
fibDirty := false
markRecvPing := func(ns *table.NeighborState) {
err, faceDirty := ns.RecvPing(*args.IncomingFaceId, active)
if err != nil {
log.Warnf("advert-sync: failed to update neighbor: %+v", err)
log.Warn(a, "Failed to update neighbor", "err", err)
}
fibDirty = fibDirty || faceDirty
}

// There should only be one entry in the StateVector, but check all anyway
for _, node := range params.StateVector.Entries {
if len(node.SeqNoEntries) != 1 {
log.Warnf("advert-sync: unexpected %d SeqNoEntries for %s, ignoring", len(node.SeqNoEntries), node.Name)
log.Warn(a, "Unexpected SeqNoEntries count", "count", len(node.SeqNoEntries), "router", node.Name)
return
}
entry := node.SeqNoEntries[0]

// Parse name from entry
if node.Name == nil {
log.Warnf("advert-sync: failed to parse neighbor name: %+v", err)
log.Warn(a, "Failed to parse neighbor name", "err", err)
continue
}

// Check if the entry is newer than what we know
ns := dv.neighbors.Get(node.Name)
ns := a.dv.neighbors.Get(node.Name)
if ns != nil {
if ns.AdvertBoot >= entry.BootstrapTime && ns.AdvertSeq >= entry.SeqNo {
// Nothing has changed, skip
Expand All @@ -152,18 +168,18 @@ func (dv *Router) advertSyncOnInterest(args ndn.InterestHandlerArgs, active bool
// Create new neighbor entry cause none found
// This is the ONLY place where neighbors are created
// In all other places, quit if not found
ns = dv.neighbors.Add(node.Name)
ns = a.dv.neighbors.Add(node.Name)
}

markRecvPing(ns)
ns.AdvertBoot = entry.BootstrapTime
ns.AdvertSeq = entry.SeqNo

go dv.advertDataFetch(node.Name, entry.BootstrapTime, entry.SeqNo)
go a.dataFetch(node.Name, entry.BootstrapTime, entry.SeqNo)
}

// Update FIB if needed
if fibDirty {
go dv.fibUpdate()
go a.dv.fibUpdate()
}
}
4 changes: 2 additions & 2 deletions dv/dv/prefix_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func (dv *Router) prefixDataFetch(nName enc.Name) {
router.Fetching = true

// Fetch the prefix data object
log.Debugf("prefix-table: fetching object for %s [%d => %d]", nName, router.Known, router.Latest)
log.Debug(dv.pfx, "Fetching prefix data", "router", nName, "known", router.Known, "latest", router.Latest)

name := router.GetNextDataName()
dv.client.Consume(name, func(state *object.ConsumeState) bool {
Expand All @@ -68,7 +68,7 @@ func (dv *Router) prefixDataFetch(nName enc.Name) {
go func() {
fetchErr := state.Error()
if fetchErr != nil {
log.Warnf("prefix-table: failed to fetch object %s: %+v", state.Name(), fetchErr)
log.Warn(dv.pfx, "Failed to fetch prefix data", "name", state.Name(), "err", fetchErr)
time.Sleep(1 * time.Second) // wait on error
}

Expand Down
12 changes: 6 additions & 6 deletions dv/dv/readvertise.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func (dv *Router) readvertiseOnInterest(args ndn.InterestHandlerArgs) {
res.Encode(),
signer)
if err != nil {
log.Warnf("readvertise: failed to make response Data: %+v", err)
log.Warn(dv, "Failed to make readvertise response Data", "err", err)
return
}
args.Reply(data.Wire)
Expand All @@ -42,23 +42,23 @@ func (dv *Router) readvertiseOnInterest(args ndn.InterestHandlerArgs) {
// readvertise: /localhost/nlsr/rib/unregister/h%0C%07%07%08%05cathyo%01A/params-sha256=026dd595c75032c5101b321fbc11eeb96277661c66bc0564ac7ea1a281ae8210
iname := args.Interest.Name()
if len(iname) != 6 {
log.Warnf("readvertise: invalid interest %s", iname)
log.Warn(dv, "Invalid readvertise Interest", "name", iname)
return
}

module, cmd, advC := iname[2], iname[3], iname[4]
if module.String() != "rib" {
log.Warnf("readvertise: unknown module %s", iname)
log.Warn(dv, "Unknown readvertise module", "name", iname)
return
}

params, err := mgmt.ParseControlParameters(enc.NewBufferReader(advC.Val), false)
if err != nil || params.Val == nil || params.Val.Name == nil {
log.Warnf("readvertise: failed to parse advertised name (%s)", err)
log.Warn(dv, "Failed to parse readvertised name", "err", err)
return
}

log.Debugf("readvertise: %s %s", cmd, params.Val.Name)
log.Debug(dv, "Received readvertise request", "cmd", cmd, "name", params.Val.Name)
dv.mutex.Lock()
defer dv.mutex.Unlock()

Expand All @@ -68,7 +68,7 @@ func (dv *Router) readvertiseOnInterest(args ndn.InterestHandlerArgs) {
case "unregister":
dv.pfx.Withdraw(params.Val.Name)
default:
log.Warnf("readvertise: unknown cmd %s", cmd)
log.Warn(dv, "Unknown readvertise cmd", "cmd", cmd)
return
}

Expand Down
Loading
Loading