Skip to content

Commit

Permalink
remove client stream
Browse files Browse the repository at this point in the history
  • Loading branch information
alessio-perugini committed Sep 19, 2023
1 parent 1b48af3 commit 9e61ef6
Show file tree
Hide file tree
Showing 6 changed files with 32 additions and 60 deletions.
10 changes: 2 additions & 8 deletions client_example/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -657,16 +657,13 @@ func callBoardList(client rpc.ArduinoCoreServiceClient, instance *rpc.Instance)
}

func callBoardListWatch(client rpc.ArduinoCoreServiceClient, instance *rpc.Instance) {
watchClient, err := client.BoardListWatch(context.Background())
req := &rpc.BoardListWatchRequest{Instance: instance}
watchClient, err := client.BoardListWatch(context.Background(), req)
if err != nil {
log.Fatalf("Board list watch error: %s\n", err)
}

// Start the watcher
watchClient.Send(&rpc.BoardListWatchRequest{
Instance: instance,
})

go func() {
for {
res, err := watchClient.Recv()
Expand All @@ -693,9 +690,6 @@ func callBoardListWatch(client rpc.ArduinoCoreServiceClient, instance *rpc.Insta
// Watch for 10 seconds and then interrupts
timer := time.NewTicker(time.Duration(10 * time.Second))
<-timer.C
watchClient.Send(&rpc.BoardListWatchRequest{
Interrupt: true,
})
}

func callPlatformUnInstall(client rpc.ArduinoCoreServiceClient, instance *rpc.Instance) {
Expand Down
10 changes: 5 additions & 5 deletions commands/board/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,22 +258,22 @@ func hasMatchingBoard(b *rpc.DetectedPort, fqbnFilter *cores.FQBN) bool {

// Watch returns a channel that receives boards connection and disconnection events.
// It also returns a callback function that must be used to stop and dispose the watch.
func Watch(req *rpc.BoardListWatchRequest) (<-chan *rpc.BoardListWatchResponse, func(), error) {
func Watch(ctx context.Context, req *rpc.BoardListWatchRequest) (<-chan *rpc.BoardListWatchResponse, error) {
pme, release := commands.GetPackageManagerExplorer(req)
if pme == nil {
return nil, nil, &arduino.InvalidInstanceError{}
return nil, &arduino.InvalidInstanceError{}
}
defer release()
dm := pme.DiscoveryManager()

watcher, err := dm.Watch()
if err != nil {
return nil, nil, err
return nil, err
}

ctx, cancel := context.WithCancel(context.Background())
go func() {
<-ctx.Done()
logrus.Trace("closed watch")
watcher.Close()
}()

Expand Down Expand Up @@ -301,5 +301,5 @@ func Watch(req *rpc.BoardListWatchRequest) (<-chan *rpc.BoardListWatchResponse,
}
}()

return outChan, cancel, nil
return outChan, nil
}
38 changes: 4 additions & 34 deletions commands/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,52 +86,22 @@ func (s *ArduinoCoreServerImpl) BoardSearch(ctx context.Context, req *rpc.BoardS
}

// BoardListWatch FIXMEDOC
func (s *ArduinoCoreServerImpl) BoardListWatch(stream rpc.ArduinoCoreService_BoardListWatchServer) error {
func (s *ArduinoCoreServerImpl) BoardListWatch(req *rpc.BoardListWatchRequest, stream rpc.ArduinoCoreService_BoardListWatchServer) error {
syncSend := NewSynchronizedSend(stream.Send)
msg, err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return err
}

if msg.Instance == nil {
err = fmt.Errorf(tr("no instance specified"))
if req.Instance == nil {
err := fmt.Errorf(tr("no instance specified"))
syncSend.Send(&rpc.BoardListWatchResponse{
EventType: "error",
Error: err.Error(),
})
return err
}

eventsChan, closeWatcher, err := board.Watch(msg)
eventsChan, err := board.Watch(stream.Context(), req)
if err != nil {
return convertErrorToRPCStatus(err)
}

go func() {
defer closeWatcher()
for {
msg, err := stream.Recv()
// Handle client closing the stream and eventual errors
if err == io.EOF {
logrus.Info("boards watcher stream closed")
return
}
if err != nil {
logrus.Infof("interrupting boards watcher: %v", err)
return
}

// Message received, does the client want to interrupt?
if msg != nil && msg.Interrupt {
logrus.Info("boards watcher interrupted by client")
return
}
}
}()

for event := range eventsChan {
if err := syncSend.Send(event); err != nil {
logrus.Infof("sending board watch message: %v", err)
Expand Down
4 changes: 2 additions & 2 deletions internal/cli/board/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package board

import (
"context"
"errors"
"fmt"
"os"
Expand Down Expand Up @@ -84,11 +85,10 @@ func runListCommand(watch bool, timeout int64, fqbn string) {
}

func watchList(inst *rpc.Instance) {
eventsChan, closeCB, err := board.Watch(&rpc.BoardListWatchRequest{Instance: inst})
eventsChan, err := board.Watch(context.Background(), &rpc.BoardListWatchRequest{Instance: inst})
if err != nil {
feedback.Fatal(tr("Error detecting boards: %v", err), feedback.ErrNetwork)
}
defer closeCB()

// This is done to avoid printing the header each time a new event is received
if feedback.GetFormat() == feedback.Text {
Expand Down
6 changes: 3 additions & 3 deletions internal/integrationtest/arduino-cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,16 +362,16 @@ func (inst *ArduinoCLIInstance) BoardList(timeout time.Duration) (*commands.Boar
}

// BoardListWatch calls the "BoardListWatch" gRPC method.
func (inst *ArduinoCLIInstance) BoardListWatch() (commands.ArduinoCoreService_BoardListWatchClient, error) {
func (inst *ArduinoCLIInstance) BoardListWatch(ctx context.Context) (commands.ArduinoCoreService_BoardListWatchClient, error) {
boardListWatchReq := &commands.BoardListWatchRequest{
Instance: inst.instance,
}
logCallf(">>> BoardListWatch(%v)\n", boardListWatchReq)
watcher, err := inst.cli.daemonClient.BoardListWatch(context.Background())
watcher, err := inst.cli.daemonClient.BoardListWatch(ctx, boardListWatchReq)
if err != nil {
return watcher, err
}
return watcher, watcher.Send(boardListWatchReq)
return watcher, nil
}

// PlatformInstall calls the "PlatformInstall" gRPC method.
Expand Down
24 changes: 16 additions & 8 deletions internal/integrationtest/daemon/daemon_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import (
"github.com/arduino/arduino-cli/internal/integrationtest"
"github.com/arduino/arduino-cli/rpc/cc/arduino/cli/commands/v1"
"github.com/arduino/go-paths-helper"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -54,28 +56,34 @@ func TestArduinoCliDaemon(t *testing.T) {

testWatcher := func() {
// Run watcher
watcher, err := grpcInst.BoardListWatch()
ctx, cancel := context.WithCancel(context.TODO())
watcher, err := grpcInst.BoardListWatch(ctx)
require.NoError(t, err)
ctx, cancel := context.WithCancel(context.Background())
watcherCanceldCh := make(chan struct{})
go func() {
defer cancel()
for {
msg, err := watcher.Recv()
if err == io.EOF {
fmt.Println("Watcher EOF")
return
}
require.Empty(t, msg.Error, "Board list watcher returned an error")
if s, ok := status.FromError(err); ok && s.Code() == codes.Canceled {
fmt.Println("Watcher canceled")
watcherCanceldCh <- struct{}{}
return
}
require.NoError(t, err, "BoardListWatch grpc call returned an error")
fmt.Printf("WATCH> %v\n", msg)
require.Empty(t, msg.Error, "Board list watcher returned an error")
fmt.Printf("WATCH> %v %v\n", msg, err)
}
}()
time.Sleep(time.Second)
require.NoError(t, watcher.CloseSend())
cancel()
time.Sleep(time.Second)
select {
case <-ctx.Done():
case <-watcherCanceldCh:
// all right!
case <-time.After(time.Second):
case <-time.After(2 * time.Second):
require.Fail(t, "BoardListWatch didn't close")
}
}
Expand Down

0 comments on commit 9e61ef6

Please sign in to comment.