Skip to content

Commit

Permalink
fix #685, the Processor print "incoming chan is full",and exit. (#686)
Browse files Browse the repository at this point in the history
Signed-off-by: CFC4N <[email protected]>
  • Loading branch information
cfc4n authored Dec 10, 2024
1 parent b4881a1 commit 9cf64b4
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 11 deletions.
1 change: 0 additions & 1 deletion cli/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,6 @@ func runModule(modName string, modConfig config.IConfig) {

reload:
// 初始化
logger.Warn().Msg("========== module starting. ==========")
mod := modFunc()
ctx, cancelFun := context.WithCancel(context.TODO())
err = mod.Init(ctx, &logger, modConfig, ecw)
Expand Down
13 changes: 9 additions & 4 deletions pkg/event_processor/iworker.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,16 @@ type IWorker interface {
}

const (
MaxTickerCount = 10 // 1 Sencond/(eventWorker.ticker.C) = 10
MaxChanLen = 16 // 包队列长度
MaxTickerCount = 10 // 1 Sencond/(eventWorker.ticker.C) = 10
MaxChanLen = 1024 // 包队列长度
//MAX_EVENT_LEN = 16 // 事件数组长度
)

var (
ErrEventWorkerIncomingFull = errors.New("eventWorker Write failed, incoming chan is full")
ErrEventWorkerOutcomingFull = errors.New("eventWorker Write failed, outComing chan is full")
)

type eventWorker struct {
incoming chan event.IEventStruct
//events []user.IEventStruct
Expand Down Expand Up @@ -88,7 +93,7 @@ func (ew *eventWorker) Write(e event.IEventStruct) error {
select {
case ew.incoming <- e:
default:
err = errors.New("eventWorker Write failed, incoming chan is full")
err = ErrEventWorkerIncomingFull
}
return err
}
Expand All @@ -98,7 +103,7 @@ func (ew *eventWorker) writeToChan(s string) error {
select {
case ew.outComing <- s:
default:
err = errors.New("eventWorker Write failed, outComing chan is full")
err = ErrEventWorkerOutcomingFull
}
return err
}
Expand Down
14 changes: 11 additions & 3 deletions pkg/event_processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type EventProcessor struct {
logger io.Writer

closeChan chan bool
errChan chan error

// output model
isHex bool
Expand All @@ -52,6 +53,7 @@ func (ep *EventProcessor) init() {
ep.incoming = make(chan event.IEventStruct, MaxIncomingChanLen)
ep.outComing = make(chan string, MaxIncomingChanLen)
ep.closeChan = make(chan bool)
ep.errChan = make(chan error, 16)
ep.workerQueue = make(map[string]IWorker, MaxParserQueueLen)
}

Expand All @@ -63,9 +65,12 @@ func (ep *EventProcessor) Serve() error {
case eventStruct := <-ep.incoming:
err = ep.dispatch(eventStruct)
if err != nil {
//err1 := ep.Close()
//return errors.Join(err, err1)
return err
// 不返回error是合理的做法,因为个别事件处理失败不应该影响整个处理器的关闭。
// 但是,需要将这个错误抛给的调用着,让调用者决定是否关闭处理器
select {
case ep.errChan <- err:
default:
}
}
case s := <-ep.outComing:
_, _ = ep.GetLogger().Write([]byte(s))
Expand Down Expand Up @@ -153,6 +158,9 @@ func (ep *EventProcessor) Close() error {
return nil
}

func (ep *EventProcessor) ErrorChan() chan error {
return ep.errChan
}
func NewEventProcessor(logger io.Writer, isHex bool) *EventProcessor {
var ep *EventProcessor
ep = &EventProcessor{}
Expand Down
14 changes: 13 additions & 1 deletion user/module/imodule.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,23 @@ func (m *Module) Init(ctx context.Context, logger *zerolog.Logger, conf config.I
m.isClosed.Store(false)
m.ctx = ctx
m.logger = logger
m.errChan = make(chan error)
m.errChan = make(chan error, 16)
m.isKernelLess5_2 = false //set false default
m.eventCollector = eventCollector
//var epl = epLogger{logger: logger}
m.processor = event_processor.NewEventProcessor(eventCollector, conf.GetHex())

go func() {
// 读取错误信息
for {
select {
case err := <-m.processor.ErrorChan():
m.logger.Warn().AnErr("Processor error", err).Send()
case <-m.ctx.Done():
return
}
}
}()
kv, err := kernel.HostVersion()
if err != nil {
m.logger.Warn().Err(err).Msg("Unable to detect kernel version due to an error:%v.used non-Less5_2 bytecode.")
Expand Down
3 changes: 1 addition & 2 deletions user/module/probe_openssl.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,8 +209,7 @@ func (m *MOpenSSLProbe) getSslBpfFile(soPath, sslVersion string) error {
// 未找到版本号, try libcrypto.so.x
if strings.Contains(soPath, "libssl.so.3") {
m.logger.Warn().Err(err).Str("soPath", soPath).Msg("OpenSSL/BoringSSL version not found.")
m.logger.Warn().Msg("Try to detect libcrypto.so.3. If you have doubts")
m.logger.Warn().Msg("See https://github.com/gojue/ecapture/discussions/675 for more information.")
m.logger.Warn().Msg("Try to detect libcrypto.so.3. If you have doubts, See https://github.com/gojue/ecapture/discussions/675 for more information.")

// 从 libssl.so.3 中获取 libcrypto.so.3 的路径
var libcryptoName = "libcrypto.so.3"
Expand Down

0 comments on commit 9cf64b4

Please sign in to comment.