Skip to content

Commit

Permalink
Move logic to logger, add logging for kube
Browse files Browse the repository at this point in the history
  • Loading branch information
matoval committed Sep 25, 2024
1 parent 373668d commit 669a77a
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 30 deletions.
31 changes: 14 additions & 17 deletions pkg/controlsvc/controlsvc.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,15 @@ func (s *SockControl) ReadFromConn(message string, out io.Writer, io Copier) err
return err
}
payloadDebug, _ := strconv.Atoi(os.Getenv("RECEPTOR_PAYLOAD_TRACE_LEVEL"))
switch {
case payloadDebug > 2:
var data string

if payloadDebug != 0 {
var connectionType string
var payload string
if s.conn.LocalAddr().Network() == "unix" {
connectionType = "unix socket"
} else {
connectionType = "network connection"

Check warning on line 135 in pkg/controlsvc/controlsvc.go

View check run for this annotation

Codecov / codecov/patch

pkg/controlsvc/controlsvc.go#L130-L135

Added lines #L130 - L135 were not covered by tests
}
reader := bufio.NewReader(s.conn)

Check warning on line 137 in pkg/controlsvc/controlsvc.go

View check run for this annotation

Codecov / codecov/patch

pkg/controlsvc/controlsvc.go#L137

Added line #L137 was not covered by tests

for {
Expand All @@ -139,23 +145,14 @@ func (s *SockControl) ReadFromConn(message string, out io.Writer, io Copier) err

break

Check warning on line 146 in pkg/controlsvc/controlsvc.go

View check run for this annotation

Codecov / codecov/patch

pkg/controlsvc/controlsvc.go#L146

Added line #L146 was not covered by tests
}
data += response
MainInstance.nc.GetLogger().Debug("Response reading from conn: %v", response)
}
if _, err := out.Write([]byte(data)); err != nil {
return err
payload += response

Check warning on line 148 in pkg/controlsvc/controlsvc.go

View check run for this annotation

Codecov / codecov/patch

pkg/controlsvc/controlsvc.go#L148

Added line #L148 was not covered by tests
}

fallthrough
case payloadDebug > 0:
var connectType string
if s.conn.LocalAddr().Network() == "unix" {
connectType = "unix socket"
} else {
connectType = "network connection"
MainInstance.nc.GetLogger().DebugPayload(payloadDebug, payload, "", connectionType)
if _, err := out.Write([]byte(payload)); err != nil {
return err

Check warning on line 153 in pkg/controlsvc/controlsvc.go

View check run for this annotation

Codecov / codecov/patch

pkg/controlsvc/controlsvc.go#L151-L153

Added lines #L151 - L153 were not covered by tests
}
MainInstance.nc.GetLogger().Debug("Reading from %v", connectType)
default:
} else {
if _, err := io.Copy(out, s.conn); err != nil {
return err
}
Expand Down
25 changes: 25 additions & 0 deletions pkg/logger/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,31 @@ func (rl *ReceptorLogger) Debug(format string, v ...interface{}) {
rl.Log(DebugLevel, format, v...)
}

// Debug payload data.
func (rl *ReceptorLogger) DebugPayload(payloadDebug int, payload string, workUnitID string, connectionType string) {
switch payloadDebug {
case 3:
if workUnitID != "" {
rl.Debug("Work unit %v stdin: %v", workUnitID, payload)
} else {
rl.Debug("Response reading from conn: %v", payload)

Check warning on line 158 in pkg/logger/logger.go

View check run for this annotation

Codecov / codecov/patch

pkg/logger/logger.go#L152-L158

Added lines #L152 - L158 were not covered by tests
}

fallthrough
case 2:
if payloadDebug == 2 && workUnitID != "" {
rl.Debug("Work unit %v received command\n", workUnitID)

Check warning on line 164 in pkg/logger/logger.go

View check run for this annotation

Codecov / codecov/patch

pkg/logger/logger.go#L161-L164

Added lines #L161 - L164 were not covered by tests
}

fallthrough
case 1:
if connectionType != "" {
rl.Debug("Reading from %v", connectionType)

Check warning on line 170 in pkg/logger/logger.go

View check run for this annotation

Codecov / codecov/patch

pkg/logger/logger.go#L167-L170

Added lines #L167 - L170 were not covered by tests
}
default:

Check warning on line 172 in pkg/logger/logger.go

View check run for this annotation

Codecov / codecov/patch

pkg/logger/logger.go#L172

Added line #L172 was not covered by tests
}
}

// SanitizedDebug contains extra information helpful to developers.
func (rl *ReceptorLogger) SanitizedDebug(format string, v ...interface{}) {
rl.SanitizedLog(DebugLevel, format, v...)
Expand Down
28 changes: 15 additions & 13 deletions pkg/workceptor/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,16 +116,20 @@ func commandRunner(command string, params string, unitdir string) error {
return err
}
payloadDebug, _ := strconv.Atoi(os.Getenv("RECEPTOR_PAYLOAD_TRACE_LEVEL"))

Check warning on line 118 in pkg/workceptor/command.go

View check run for this annotation

Codecov / codecov/patch

pkg/workceptor/command.go#L118

Added line #L118 was not covered by tests
splitUnitDir := strings.Split(unitdir, "/")
workUnitID := splitUnitDir[len(splitUnitDir)-1]
switch payloadDebug {
case 3:
var data string
reader := bufio.NewReader(stdin)

if payloadDebug != 0 {
splitUnitDir := strings.Split(unitdir, "/")
workUnitID := splitUnitDir[len(splitUnitDir)-1]
stdinStream, err := cmd.StdinPipe()
if err != nil {
return err

Check warning on line 125 in pkg/workceptor/command.go

View check run for this annotation

Codecov / codecov/patch

pkg/workceptor/command.go#L120-L125

Added lines #L120 - L125 were not covered by tests
}
var payload string
reader := bufio.NewReader(stdin)
if err != nil {
return err

Check warning on line 130 in pkg/workceptor/command.go

View check run for this annotation

Codecov / codecov/patch

pkg/workceptor/command.go#L127-L130

Added lines #L127 - L130 were not covered by tests
}

for {
response, err := reader.ReadString('\n')
if err != nil {
Expand All @@ -135,14 +139,12 @@ func commandRunner(command string, params string, unitdir string) error {

break

Check warning on line 140 in pkg/workceptor/command.go

View check run for this annotation

Codecov / codecov/patch

pkg/workceptor/command.go#L140

Added line #L140 was not covered by tests
}
data += response
MainInstance.nc.GetLogger().Debug("Work unit %v stdin: %v", workUnitID, response)
payload += response

Check warning on line 142 in pkg/workceptor/command.go

View check run for this annotation

Codecov / codecov/patch

pkg/workceptor/command.go#L142

Added line #L142 was not covered by tests
}
io.WriteString(stdinStream, data)
case 2:
MainInstance.nc.GetLogger().Debug("Work unit %v received command\n", workUnitID)
cmd.Stdin = stdin
default:

MainInstance.nc.GetLogger().DebugPayload(payloadDebug, payload, workUnitID, "")
io.WriteString(stdinStream, payload)
} else {
cmd.Stdin = stdin

Check warning on line 148 in pkg/workceptor/command.go

View check run for this annotation

Codecov / codecov/patch

pkg/workceptor/command.go#L145-L148

Added lines #L145 - L148 were not covered by tests
}
stdout, err := os.OpenFile(path.Join(unitdir, "stdout"), os.O_CREATE+os.O_WRONLY+os.O_SYNC, 0o600)
Expand Down
23 changes: 23 additions & 0 deletions pkg/workceptor/stdio_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"io"
"os"
"path"
"strconv"
"strings"
"sync"
)

Expand Down Expand Up @@ -111,6 +113,7 @@ func (sw *STDoutWriter) SetWriter(writer FileWriteCloser) {
// STDinReader reads from a stdin file and provides a Done function.
type STDinReader struct {
reader FileReadCloser
workUnit string
lasterr error
doneChan chan struct{}
doneOnce sync.Once
Expand All @@ -120,6 +123,8 @@ var errFileSizeZero = errors.New("file is empty")

// NewStdinReader allocates a new stdinReader, which reads from a stdin file and provides a Done function.
func NewStdinReader(fs FileSystemer, unitdir string) (*STDinReader, error) {
splitUnitDir := strings.Split(unitdir, "/")
workUnitID := splitUnitDir[len(splitUnitDir)-1]
stdinpath := path.Join(unitdir, "stdin")
stat, err := fs.Stat(stdinpath)
if err != nil {
Expand All @@ -135,6 +140,7 @@ func NewStdinReader(fs FileSystemer, unitdir string) (*STDinReader, error) {

return &STDinReader{
reader: reader,
workUnit: workUnitID,
lasterr: nil,
doneChan: make(chan struct{}),
doneOnce: sync.Once{},
Expand All @@ -143,6 +149,23 @@ func NewStdinReader(fs FileSystemer, unitdir string) (*STDinReader, error) {

// Read reads data from the stdout file, implementing io.Reader.
func (sr *STDinReader) Read(p []byte) (n int, err error) {
payloadDebug, _ := strconv.Atoi(os.Getenv("RECEPTOR_PAYLOAD_TRACE_LEVEL"))

if payloadDebug != 0 {
isNotEmpty := func() bool {
for _, v := range p {
if v != 0 {
return true

Check warning on line 158 in pkg/workceptor/stdio_utils.go

View check run for this annotation

Codecov / codecov/patch

pkg/workceptor/stdio_utils.go#L155-L158

Added lines #L155 - L158 were not covered by tests
}
}

return false

Check warning on line 162 in pkg/workceptor/stdio_utils.go

View check run for this annotation

Codecov / codecov/patch

pkg/workceptor/stdio_utils.go#L162

Added line #L162 was not covered by tests
}()
if isNotEmpty {
payload := string(p)
MainInstance.nc.GetLogger().DebugPayload(payloadDebug, payload, sr.workUnit, "")

Check warning on line 166 in pkg/workceptor/stdio_utils.go

View check run for this annotation

Codecov / codecov/patch

pkg/workceptor/stdio_utils.go#L164-L166

Added lines #L164 - L166 were not covered by tests
}
}
n, err = sr.reader.Read(p)
if err != nil {
sr.lasterr = err
Expand Down

0 comments on commit 669a77a

Please sign in to comment.