Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Run Member tests in parallel #19035

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions scripts/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ fi
COMMON_TEST_FLAGS=("${RACE}")
if [[ -n "${CPU:-}" ]]; then
COMMON_TEST_FLAGS+=("--cpu=${CPU}")
COMMON_TEST_FLAGS+=("--parallel=${CPU}")
fi

log_callout "Running with ${COMMON_TEST_FLAGS[*]}"
Expand Down
12 changes: 8 additions & 4 deletions server/embed/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -615,6 +615,14 @@ func NewConfig() *Config {
}

func (cfg *Config) AddFlags(fs *flag.FlagSet) {
cfg.AddFlagsWithoutGlobals(fs)

// raft connection timeouts
fs.DurationVar(&rafthttp.ConnReadTimeout, "raft-read-timeout", rafthttp.DefaultConnReadTimeout, "Read timeout set on each rafthttp connection")
fs.DurationVar(&rafthttp.ConnWriteTimeout, "raft-write-timeout", rafthttp.DefaultConnWriteTimeout, "Write timeout set on each rafthttp connection")
}

func (cfg *Config) AddFlagsWithoutGlobals(fs *flag.FlagSet) {
// member
fs.StringVar(&cfg.Dir, "data-dir", cfg.Dir, "Path to the data directory.")
fs.StringVar(&cfg.WalDir, "wal-dir", cfg.WalDir, "Path to the dedicated wal directory.")
Expand Down Expand Up @@ -657,10 +665,6 @@ func (cfg *Config) AddFlags(fs *flag.FlagSet) {

fs.Var(flags.NewUint32Value(cfg.MaxConcurrentStreams), "max-concurrent-streams", "Maximum concurrent streams that each client can open at a time.")

// raft connection timeouts
fs.DurationVar(&rafthttp.ConnReadTimeout, "raft-read-timeout", rafthttp.DefaultConnReadTimeout, "Read timeout set on each rafthttp connection")
fs.DurationVar(&rafthttp.ConnWriteTimeout, "raft-write-timeout", rafthttp.DefaultConnWriteTimeout, "Write timeout set on each rafthttp connection")

// clustering
fs.Var(
flags.NewUniqueURLsWithExceptions(DefaultInitialAdvertisePeerURLs, ""),
Expand Down
9 changes: 9 additions & 0 deletions tests/common/member_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,11 @@ func TestMemberList(t *testing.T) {

for _, tc := range clusterTestCases() {
t.Run(tc.name, func(t *testing.T) {
t.Parallel()

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
tc.config.UniquePortAlloc = true
clus := testRunner.NewCluster(ctx, t, config.WithClusterConfig(tc.config))
defer clus.Close()
cc := testutils.MustClient(clus.Client())
Expand Down Expand Up @@ -113,6 +116,8 @@ func TestMemberAdd(t *testing.T) {
for _, quorumTc := range quorumTcs {
for _, clusterTc := range clusterTestCases() {
t.Run(learnerTc.name+"/"+quorumTc.name+"/"+clusterTc.name, func(t *testing.T) {
t.Parallel()

ctxTimeout := 10 * time.Second
if quorumTc.waitForQuorum {
ctxTimeout += etcdserver.HealthInterval
Expand All @@ -121,6 +126,7 @@ func TestMemberAdd(t *testing.T) {
defer cancel()
c := clusterTc.config
c.StrictReconfigCheck = quorumTc.strictReconfigCheck
c.UniquePortAlloc = true
clus := testRunner.NewCluster(ctx, t, config.WithClusterConfig(c))
defer clus.Close()
cc := testutils.MustClient(clus.Client())
Expand Down Expand Up @@ -198,10 +204,13 @@ func TestMemberRemove(t *testing.T) {
continue
}
t.Run(quorumTc.name+"/"+clusterTc.name, func(t *testing.T) {
t.Parallel()

ctx, cancel := context.WithTimeout(context.Background(), 14*time.Second)
defer cancel()
c := clusterTc.config
c.StrictReconfigCheck = quorumTc.strictReconfigCheck
c.UniquePortAlloc = true
clus := testRunner.NewCluster(ctx, t, config.WithClusterConfig(c))
defer clus.Close()
// client connects to a specific member which won't be removed from cluster
Expand Down
5 changes: 5 additions & 0 deletions tests/framework/config/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type ClusterConfig struct {
StrictReconfigCheck bool
AuthToken string
SnapshotCount uint64
UniquePortAlloc bool

// ClusterContext is used by "e2e" or "integration" to extend the
// ClusterConfig. The common test cases shouldn't care about what
Expand Down Expand Up @@ -88,3 +89,7 @@ func WithSnapshotCount(count uint64) ClusterOption {
func WithStrictReconfigCheck(strict bool) ClusterOption {
return func(c *ClusterConfig) { c.StrictReconfigCheck = strict }
}

func WithUniquePortAlloc() ClusterOption {
return func(c *ClusterConfig) { c.UniquePortAlloc = true }
}
16 changes: 14 additions & 2 deletions tests/framework/e2e/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -518,6 +518,16 @@ func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfig(tb testing.TB, i in
peer2Port := port + 3
clientHTTPPort := port + 4

var allocatedPorts []int
if cfg.BasePort == -1 {
clientPort = uniquePorts.Alloc()
peerPort = uniquePorts.Alloc()
metricsPort = uniquePorts.Alloc()
peer2Port = uniquePorts.Alloc()
clientHTTPPort = uniquePorts.Alloc()
allocatedPorts = []int{clientPort, peerPort, metricsPort, peer2Port, clientHTTPPort}
}

if cfg.Client.ConnectionType == ClientTLSAndNonTLS {
curl = clientURL(cfg.ClientScheme(), clientPort, ClientNonTLS)
curls = []string{curl, clientURL(cfg.ClientScheme(), clientPort, ClientTLS)}
Expand Down Expand Up @@ -639,7 +649,8 @@ func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfig(tb testing.TB, i in
}
var gofailPort int
if cfg.GoFailEnabled {
gofailPort = (i+1)*10000 + 2381
gofailPort = uniquePorts.Alloc()
allocatedPorts = append(allocatedPorts, gofailPort)
envVars["GOFAIL_HTTP"] = fmt.Sprintf("127.0.0.1:%d", gofailPort)
}

Expand All @@ -662,12 +673,13 @@ func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfig(tb testing.TB, i in
GoFailClientTimeout: cfg.GoFailClientTimeout,
Proxy: proxyCfg,
LazyFSEnabled: cfg.LazyFSEnabled,
AllocatedPorts: allocatedPorts,
}
}

func values(cfg embed.Config) map[string]string {
fs := flag.NewFlagSet("etcd", flag.ContinueOnError)
cfg.AddFlags(fs)
cfg.AddFlagsWithoutGlobals(fs)
values := map[string]string{}
fs.VisitAll(func(f *flag.Flag) {
value := f.Value.String()
Expand Down
3 changes: 3 additions & 0 deletions tests/framework/e2e/e2e.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ func (e e2eRunner) NewCluster(ctx context.Context, t testing.TB, opts ...config.
default:
t.Fatalf("PeerTLS config %q not supported", cfg.PeerTLS)
}
if cfg.UniquePortAlloc {
e2eConfig.BasePort = -1
}
epc, err := NewEtcdProcessCluster(ctx, t, WithConfig(e2eConfig))
if err != nil {
t.Fatalf("could not start etcd integrationCluster: %s", err)
Expand Down
14 changes: 10 additions & 4 deletions tests/framework/e2e/etcd_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,11 @@ type EtcdServerProcessConfig struct {

Name string

PeerURL url.URL
ClientURL string
ClientHTTPURL string
MetricsURL string
PeerURL url.URL
ClientURL string
ClientHTTPURL string
MetricsURL string
AllocatedPorts []int

InitialToken string
InitialCluster string
Expand Down Expand Up @@ -248,6 +249,11 @@ func (ep *EtcdServerProcess) Close() error {
ep.cfg.lg.Info("removing directory", zap.String("data-dir", ep.cfg.DataDirPath))
return os.RemoveAll(ep.cfg.DataDirPath)
}

for _, port := range ep.cfg.AllocatedPorts {
uniquePorts.Free(port)
}
ep.cfg.AllocatedPorts = nil
return nil
}

Expand Down
58 changes: 58 additions & 0 deletions tests/framework/e2e/port_alloc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// Copyright 2024 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package e2e

import "sync"

// uniquePorts is a global instance of testPorts.
var uniquePorts *testPorts

func init() {
uniquePorts = newTestPorts(11000, 19000)
}

// testPorts is used to allocate listen ports for etcd instance in tests
// in a safe way for concurrent use (i.e. running tests in parallel).
type testPorts struct {
mux sync.Mutex
unused map[int]bool
}

// newTestPorts keeps track of unused ports in the specified range.
func newTestPorts(start, end int) *testPorts {
m := make(map[int]bool, end-start)
for i := start; i < end; i++ {
m[i] = true
}
return &testPorts{unused: m}
}

// Alloc allocates a new port or panics if none is available.
func (pa *testPorts) Alloc() int {
pa.mux.Lock()
defer pa.mux.Unlock()
for port := range pa.unused {
delete(pa.unused, port)
return port
}
panic("all ports are used")
}

// Free makes port available for allocation through Alloc.
func (pa *testPorts) Free(port int) {
pa.mux.Lock()
defer pa.mux.Unlock()
pa.unused[port] = true
}
10 changes: 3 additions & 7 deletions tests/framework/integration/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,13 +260,12 @@ func (c *Cluster) ProtoMembers() []*pb.Member {
}

func (c *Cluster) mustNewMember(t testutil.TB) *Member {
memberNumber := c.LastMemberNum
uniqueNumber := atomic.AddInt32(&UniqueNumber, 1)*10 + int32(c.LastMemberNum)
AwesomePatrol marked this conversation as resolved.
Show resolved Hide resolved
c.LastMemberNum++

m := MustNewMember(t,
MemberConfig{
Name: fmt.Sprintf("m%v", memberNumber),
MemberNumber: memberNumber,
Name: fmt.Sprintf("m%v", uniqueNumber),
AuthToken: c.Cfg.AuthToken,
PeerTLS: c.Cfg.PeerTLS,
ClientTLS: c.Cfg.ClientTLS,
Expand Down Expand Up @@ -549,7 +548,6 @@ func NewListenerWithAddr(t testutil.TB, addr string) net.Listener {
type Member struct {
config.ServerConfig
UniqNumber int
MemberNumber int
Port string
PeerListeners, ClientListeners []net.Listener
GRPCListener net.Listener
Expand Down Expand Up @@ -591,7 +589,6 @@ type Member struct {
type MemberConfig struct {
Name string
UniqNumber int64
MemberNumber int
PeerTLS *transport.TLSInfo
ClientTLS *transport.TLSInfo
AuthToken string
Expand Down Expand Up @@ -624,8 +621,7 @@ type MemberConfig struct {
func MustNewMember(t testutil.TB, mcfg MemberConfig) *Member {
var err error
m := &Member{
MemberNumber: mcfg.MemberNumber,
UniqNumber: int(atomic.AddInt32(&UniqueCount, 1)),
UniqNumber: int(atomic.AddInt32(&UniqueCount, 1)),
}

peerScheme := SchemeFromTLSInfo(mcfg.PeerTLS)
Expand Down
Loading