Skip to content

Commit

Permalink
Merge branch 'devel' into refactor-workceptor
Browse files Browse the repository at this point in the history
  • Loading branch information
resoluteCoder authored Jan 30, 2024
2 parents cfb4fa8 + dab86f4 commit 541a963
Show file tree
Hide file tree
Showing 9 changed files with 96 additions and 148 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/build_binary_from_ref.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ jobs:
with:
go-version: "1.20"

- uses: actions/cache@v3
- uses: actions/cache@v4
with:
path: |
~/.cache/go-build
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/coverage_reporting.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ jobs:
with:
go-version: "1.20"

- uses: actions/cache@v3
- uses: actions/cache@v4
with:
path: |
~/.cache/go-build
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/pull_request.yml
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ jobs:
with:
go-version: ${{ matrix.go-version }}

- uses: actions/cache@v3
- uses: actions/cache@v4
with:
path: |
~/.cache/go-build
Expand Down
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -120,11 +120,11 @@ else
TESTCMD = -run $(RUNTEST)
endif

BLOCKLIST='/tests/'
BLOCKLIST='/tests/|mock_|example'
COVERAGE_FILE='coverage.txt'

coverage: build-all
PATH="${PWD}:${PATH}" go test $$(go list ./... | grep -v $(BLOCKLIST)) \
PATH="${PWD}:${PATH}" go test $$(go list ./... | grep -vE $(BLOCKLIST)) \
$(TESTCMD) \
-count=1 \
-cover \
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ require (
github.com/vishvananda/netlink v1.1.0
golang.org/x/net v0.20.0
gopkg.in/yaml.v2 v2.4.0
k8s.io/api v0.29.0
k8s.io/api v0.29.1
k8s.io/apimachinery v0.29.1
k8s.io/client-go v0.29.0
k8s.io/client-go v0.29.1
)

require (
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -202,12 +202,12 @@ gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
k8s.io/api v0.29.0 h1:NiCdQMY1QOp1H8lfRyeEf8eOwV6+0xA6XEE44ohDX2A=
k8s.io/api v0.29.0/go.mod h1:sdVmXoz2Bo/cb77Pxi71IPTSErEW32xa4aXwKH7gfBA=
k8s.io/api v0.29.1 h1:DAjwWX/9YT7NQD4INu49ROJuZAAAP/Ijki48GUPzxqw=
k8s.io/api v0.29.1/go.mod h1:7Kl10vBRUXhnQQI8YR/R327zXC8eJ7887/+Ybta+RoQ=
k8s.io/apimachinery v0.29.1 h1:KY4/E6km/wLBguvCZv8cKTeOwwOBqFNjwJIdMkMbbRc=
k8s.io/apimachinery v0.29.1/go.mod h1:6HVkd1FwxIagpYrHSwJlQqZI3G9LfYWRPAkUvLnXTKU=
k8s.io/client-go v0.29.0 h1:KmlDtFcrdUzOYrBhXHgKw5ycWzc3ryPX5mQe0SkG3y8=
k8s.io/client-go v0.29.0/go.mod h1:yLkXH4HKMAywcrD82KMSmfYg2DlE8mepPR4JGSo5n38=
k8s.io/client-go v0.29.1 h1:19B/+2NGEwnFLzt0uB5kNJnfTsbV8w6TgQRz9l7ti7A=
k8s.io/client-go v0.29.1/go.mod h1:TDG/psL9hdet0TI9mGyHJSgRkW3H9JZk2dNEUS7bRks=
k8s.io/klog/v2 v2.110.1 h1:U/Af64HJf7FcwMcXyKm2RPM22WZzyR7OSpYj5tg3cL0=
k8s.io/klog/v2 v2.110.1/go.mod h1:YGtd1984u+GgbuZ7e08/yBuAfKLSO0+uR1Fhi6ExXjo=
k8s.io/kube-openapi v0.0.0-20231010175941-2dd684a91f00 h1:aVUu9fTY98ivBPKR9Y5w/AuzbMm96cd3YHRTU83I780=
Expand Down
62 changes: 4 additions & 58 deletions pkg/workceptor/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/version"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
Expand Down Expand Up @@ -711,7 +710,7 @@ func (kw *kubeUnit) runWorkUsingLogger() {
}()
}

stdoutWithReconnect := shouldUseReconnect(kw)
stdoutWithReconnect := shouldUseReconnect()
if stdoutWithReconnect && stdoutErr == nil {
kw.GetWorkceptor().nc.GetLogger().Debug("streaming stdout with reconnect support")
go kw.kubeLoggingWithReconnect(&streamWait, stdout, &stdinErr, &stdoutErr)
Expand Down Expand Up @@ -746,54 +745,8 @@ func (kw *kubeUnit) runWorkUsingLogger() {
}
}

func isCompatibleK8S(kw *kubeUnit, versionStr string) bool {
semver, err := version.ParseSemantic(versionStr)
if err != nil {
kw.GetWorkceptor().nc.GetLogger().Warning("could parse Kubernetes server version %s, will not use reconnect support", versionStr)

return false
}

// ignore pre-release in version comparison
semver = semver.WithPreRelease("")

// The patch was backported to minor version 23, 24 and 25. We must check z stream
// based on the minor version
// if minor version == 24, compare with v1.24.8
// if minor version == 25, compare with v1.25.4
// all other minor versions compare with v1.23.14
var compatibleVer string
switch semver.Minor() {
case 24:
compatibleVer = "v1.24.8"
case 25:
compatibleVer = "v1.25.4"
default:
compatibleVer = "v1.23.14"
}

if semver.AtLeast(version.MustParseSemantic(compatibleVer)) {
kw.GetWorkceptor().nc.GetLogger().Debug("Kubernetes version %s is at least %s, using reconnect support", semver, compatibleVer)

return true
}
kw.GetWorkceptor().nc.GetLogger().Debug("Kubernetes version %s not at least %s, not using reconnect support", semver, compatibleVer)

return false
}

func shouldUseReconnect(kw *kubeUnit) bool {
// Attempt to detect support for streaming from pod with timestamps based on
// Kubernetes server version
// In order to use reconnect method, Kubernetes server must be at least
// v1.23.14
// v1.24.8
// v1.25.4
// These versions contain a critical patch that permits connecting to the
// logstream with timestamps enabled.
// Without the patch, stdout lines would be split after 4K characters into a
// new line, which will cause issues in Receptor.
// https://github.com/kubernetes/kubernetes/issues/77603
func shouldUseReconnect() bool {
// Support for streaming from pod with timestamps using reconnect method is in all current versions
// Can override the detection by setting the RECEPTOR_KUBE_SUPPORT_RECONNECT
// accepted values: "enabled", "disabled", "auto" with "disabled" being the default
// all invalid value will assume to be "disabled"
Expand All @@ -812,14 +765,7 @@ func shouldUseReconnect(kw *kubeUnit) bool {
}
}

serverVerInfo, err := kw.clientset.ServerVersion()
if err != nil {
kw.GetWorkceptor().nc.GetLogger().Warning("could not detect Kubernetes server version, will not use reconnect support")

return false
}

return isCompatibleK8S(kw, serverVerInfo.String())
return false
}

func parseTime(s string) *time.Time {
Expand Down
134 changes: 77 additions & 57 deletions pkg/workceptor/kubernetes_test.go
Original file line number Diff line number Diff line change
@@ -1,75 +1,95 @@
package workceptor

import (
"context"
"os"
"reflect"
"testing"

"github.com/ansible/receptor/pkg/netceptor"
"time"
)

func Test_isCompatibleK8S(t *testing.T) {
type args struct {
version string
isCompatible bool
}
func TestShouldUseReconnect(t *testing.T) {
const envVariable string = "RECEPTOR_KUBE_SUPPORT_RECONNECT"

kw := &kubeUnit{
BaseWorkUnitForWorkUnit: &BaseWorkUnit{},
tests := []struct {
name string
envValue string
want bool
}{
{
name: "Positive (undefined) test",
envValue: "",
want: false,
},
{
name: "Enabled test",
envValue: "enabled",
want: true,
},
{
name: "Disabled test",
envValue: "disabled",
want: false,
},
{
name: "Auto test",
envValue: "auto",
want: false,
},
{
name: "Default test",
envValue: "default",
want: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if tt.envValue != "" {
os.Setenv(envVariable, tt.envValue)
defer os.Unsetenv(envVariable)
} else {
os.Unsetenv(envVariable)
}

// Create Netceptor node using external backends
n1 := netceptor.New(context.Background(), "node1")
b1, err := netceptor.NewExternalBackend()
if err != nil {
t.Fatal(err)
}
err = n1.AddBackend(b1)
if err != nil {
t.Fatal(err)
}
w, err := New(context.Background(), n1, "")
if err != nil {
t.Fatal(err)
if got := shouldUseReconnect(); got != tt.want {
t.Errorf("shouldUseReconnect() = %v, want %v", got, tt.want)
}
})
}
kw.SetWorkceptor(w)

tests := []args{
// K8S compatible versions
{version: "v1.24.8", isCompatible: true},
{version: "v1.25.4", isCompatible: true},
{version: "v1.23.14", isCompatible: true},

// K8S Z stream >
{version: "v1.24.99", isCompatible: true},
{version: "v1.25.99", isCompatible: true},
{version: "v1.23.99", isCompatible: true},

// K8S Z stream <
{version: "v1.24.7", isCompatible: false},
{version: "v1.25.3", isCompatible: false},
{version: "v1.23.13", isCompatible: false},
}

// K8S X stream >
{version: "v2.24.8", isCompatible: true},
{version: "v2.25.4", isCompatible: true},
{version: "v2.23.14", isCompatible: true},
func TestParseTime(t *testing.T) {
type args struct {
s string
}

// K8S X stream <
{version: "v0.24.8", isCompatible: false},
{version: "v0.25.4", isCompatible: false},
{version: "v0.23.14", isCompatible: false},
desiredTimeString := "2024-01-17T00:00:00Z"
desiredTime, _ := time.Parse(time.RFC3339, desiredTimeString)

// Other versions
{version: "yoloswag", isCompatible: false},
{version: "v1.23.14+sadfasdf", isCompatible: true},
{version: "v1.23.14-asdfasdf+12131", isCompatible: true}, // ignore pre-release
{version: "v1.23.15-asdfasdf+12131", isCompatible: true},
tests := []struct {
name string
args args
want *time.Time
wantErr bool
}{
{
name: "Positive test",
args: args{
s: desiredTimeString,
},
want: &desiredTime,
},
{
name: "Error test",
args: args{
s: "Invalid time",
},
want: nil,
},
}

for _, tt := range tests {
t.Run(tt.version, func(t *testing.T) {
if got := isCompatibleK8S(kw, tt.version); got != tt.isCompatible {
t.Errorf("isCompatibleK8S() = %v, want %v", got, tt.isCompatible)
t.Run(tt.name, func(t *testing.T) {
if got := parseTime(tt.args.s); !reflect.DeepEqual(got, tt.want) {
t.Errorf("parseTime() = %v, want %v", got, tt.want)
}
})
}
Expand Down
26 changes: 4 additions & 22 deletions pkg/workceptor/workceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,10 @@ type workType struct {

// New constructs a new Workceptor instance.
func New(ctx context.Context, nc NetceptorForWorkceptor, dataDir string) (*Workceptor, error) {
dataDir = setDataDir(dataDir, nc)
if dataDir == "" {
dataDir = path.Join(os.TempDir(), "receptor")
}
dataDir = path.Join(dataDir, nc.NodeID())
c, cancel := context.WithCancel(ctx)
w := &Workceptor{
ctx: c,
Expand All @@ -96,27 +99,6 @@ func New(ctx context.Context, nc NetceptorForWorkceptor, dataDir string) (*Workc
// MainInstance is the global instance of Workceptor instantiated by the command-line main() function.
var MainInstance *Workceptor

// setDataDir returns a valid data directory.
func setDataDir(dataDir string, nc NetceptorForWorkceptor) string {
if _, err := os.Stat(dataDir); os.IsNotExist(err) {
nc.GetLogger().Warning("Receptor data directory provided does not exist \"%s\". Trying the default '/var/run/receptor/", dataDir)
} else {
return path.Join(dataDir, nc.NodeID())
}

dataDir = "/var/lib/receptor"
if _, err := os.Stat(dataDir); os.IsNotExist(err) {
nc.GetLogger().Warning("Receptor data directory \"%s\" does not exist. Setting tmp '/tmp/receptor/", dataDir)
} else {
return path.Join(dataDir, nc.NodeID())
}

dataDir = path.Join(os.TempDir(), "receptor")
dataDir = path.Join(dataDir, nc.NodeID())

return dataDir
}

// stdoutSize returns size of stdout, if it exists, or 0 otherwise.
func stdoutSize(unitdir string) int64 {
stat, err := os.Stat(path.Join(unitdir, "stdout"))
Expand Down

0 comments on commit 541a963

Please sign in to comment.