Skip to content

Commit

Permalink
Revert "PMM-8655 Send unsent messages after connection problems (#1970)…
Browse files Browse the repository at this point in the history
…" (#2676)

* Revert "PMM-8655 Send unsent messages after connection problems (#1970)"

This reverts commit a3a52c1

* PMM-8655 fix build

* PMM-8655 fix linter

* PMM-8655 fix linter
  • Loading branch information
BupycHuk authored Dec 5, 2023
1 parent 839b682 commit 9a3843d
Show file tree
Hide file tree
Showing 18 changed files with 187 additions and 1,354 deletions.
13 changes: 0 additions & 13 deletions agent/agents/mysql/perfschema/perfschema.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"fmt"
"io"
"math"
"sync"
"time"

"github.com/AlekSi/pointer" // register SQL driver
Expand All @@ -46,18 +45,6 @@ type (
summaryMap map[string]*eventsStatementsSummaryByDigest
)

// mySQLVersion contains.
type mySQLVersion struct {
version float64
vendor string
}

// versionsCache provides cached access to MySQL version.
type versionsCache struct {
rw sync.RWMutex
items map[string]*mySQLVersion
}

const (
retainHistory = 5 * time.Minute
refreshHistory = 5 * time.Second
Expand Down
108 changes: 0 additions & 108 deletions agent/client/cache/cache.go

This file was deleted.

64 changes: 0 additions & 64 deletions agent/client/cache/dummy.go

This file was deleted.

37 changes: 18 additions & 19 deletions agent/client/channel/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@ import (
"google.golang.org/protobuf/encoding/prototext"
"google.golang.org/protobuf/proto"

"github.com/percona/pmm/agent/models"
agenterrors "github.com/percona/pmm/agent/utils/errors"
"github.com/percona/pmm/api/agentpb"
)

Expand All @@ -48,6 +46,15 @@ type ServerRequest struct {
Payload agentpb.ServerRequestPayload
}

// AgentResponse represents agent's response.
// It is similar to agentpb.AgentMessage except it can contain only responses,
// and the payload is already unwrapped (XXX instead of AgentMessage_XXX).
type AgentResponse struct {
ID uint32
Status *grpcstatus.Status
Payload agentpb.AgentResponsePayload
}

// Response is a type used to pass response from pmm-server to the subscriber.
type Response struct {
Payload agentpb.ServerResponsePayload
Expand Down Expand Up @@ -111,7 +118,7 @@ func New(stream agentpb.Agent_ConnectClient) *Channel {
func (c *Channel) close(err error) {
c.closeOnce.Do(func() {
c.l.Debugf("Closing with error: %+v", err)
c.closeErr = agenterrors.NewChannelClosedError(err)
c.closeErr = err

c.m.Lock()
for _, ch := range c.responses { // unblock all subscribers
Expand Down Expand Up @@ -141,7 +148,7 @@ func (c *Channel) Requests() <-chan *ServerRequest {
}

// Send sends message to pmm-managed. It is no-op once channel is closed (see Wait).
func (c *Channel) Send(resp *models.AgentResponse) error {
func (c *Channel) Send(resp *AgentResponse) {
msg := &agentpb.AgentMessage{
Id: resp.ID,
}
Expand All @@ -151,7 +158,7 @@ func (c *Channel) Send(resp *models.AgentResponse) error {
if resp.Status != nil {
msg.Status = resp.Status.Proto()
}
return c.send(msg)
c.send(msg)
}

// SendAndWaitResponse sends request to pmm-managed, blocks until response is available.
Expand All @@ -162,24 +169,21 @@ func (c *Channel) SendAndWaitResponse(payload agentpb.AgentRequestPayload) (agen
id := atomic.AddUint32(&c.lastSentRequestID, 1)
ch := c.subscribe(id)

err := c.send(&agentpb.AgentMessage{
c.send(&agentpb.AgentMessage{
Id: id,
Payload: payload.AgentMessageRequestPayload(),
})
if err != nil {
return nil, err
}

resp := <-ch
return resp.Payload, resp.Error
}

func (c *Channel) send(msg *agentpb.AgentMessage) error {
func (c *Channel) send(msg *agentpb.AgentMessage) {
c.sendM.Lock()
select {
case <-c.closeWait:
c.sendM.Unlock()
return c.Wait()
return
default:
}

Expand All @@ -197,12 +201,10 @@ func (c *Channel) send(msg *agentpb.AgentMessage) error {
err := c.s.Send(msg)
c.sendM.Unlock()
if err != nil {
err = errors.Wrap(err, "failed to send message")
c.close(err)
return agenterrors.NewChannelClosedError(err)
c.close(errors.Wrap(err, "failed to send message"))
return
}
c.mSend.Inc()
return nil
}

// runReader receives messages from server.
Expand Down Expand Up @@ -312,13 +314,10 @@ func (c *Channel) runReceiver() {
c.l.Warnf("pmm-managed was not able to process message with id: %d, handling of that payload type is unimplemented", msg.Id)
continue
}
err := c.Send(&models.AgentResponse{
c.Send(&AgentResponse{
ID: msg.Id,
Status: grpcstatus.New(codes.Unimplemented, "can't handle message type sent, it is not implemented"),
})
if err != nil {
c.l.Error(err)
}
}
}
}
Expand Down
8 changes: 3 additions & 5 deletions agent/client/channel/channel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import (
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/timestamppb"

"github.com/percona/pmm/agent/models"
"github.com/percona/pmm/agent/utils/truncate"
"github.com/percona/pmm/api/agentpb"
)
Expand Down Expand Up @@ -150,6 +149,7 @@ func TestAgentRequestWithTruncatedInvalidUTF8(t *testing.T) {
Mysql: &agentpb.MetricsBucket_MySQL{},
}}
resp, err = channel.SendAndWaitResponse(&request)
require.NoError(t, err)
assert.Nil(t, resp)
}

Expand Down Expand Up @@ -248,13 +248,12 @@ func TestServerRequest(t *testing.T) {
for req := range channel.Requests() {
assert.IsType(t, &agentpb.Ping{}, req.Payload)

err := channel.Send(&models.AgentResponse{
channel.Send(&AgentResponse{
ID: req.ID,
Payload: &agentpb.Pong{
CurrentTime: timestamppb.Now(),
},
})
assert.NoError(t, err)
}
}

Expand Down Expand Up @@ -417,11 +416,10 @@ func TestUnexpectedResponsePayloadFromServer(t *testing.T) {
channel, _, teardown := setup(t, connect, io.EOF)
defer teardown()
req := <-channel.Requests()
err := channel.Send(&models.AgentResponse{
channel.Send(&AgentResponse{
ID: req.ID,
Payload: &agentpb.Pong{
CurrentTime: timestamppb.Now(),
},
})
assert.NoError(t, err)
}
Loading

0 comments on commit 9a3843d

Please sign in to comment.