Skip to content

Commit

Permalink
Make driver print warnings returned by server
Browse files Browse the repository at this point in the history
  • Loading branch information
dkropachev committed Jan 15, 2025
1 parent 5712671 commit d0d3498
Show file tree
Hide file tree
Showing 6 changed files with 75 additions and 17 deletions.
3 changes: 3 additions & 0 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ type ClusterConfig struct {
// Default: nil
Authenticator Authenticator

WarningsHandler WarningHandlerBuilder

// An Authenticator factory. Can be used to create alternative authenticators.
// Default: nil
AuthProvider func(h *HostInfo) (Authenticator, error)
Expand Down Expand Up @@ -322,6 +324,7 @@ func NewCluster(hosts ...string) *ClusterConfig {
WriteCoalesceWaitTime: 200 * time.Microsecond,
MetadataSchemaRequestTimeout: 60 * time.Second,
DisableSkipMetadata: true,
WarningsHandler: DefaultWarningHandler{},
}

return cfg
Expand Down
31 changes: 29 additions & 2 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,14 @@ type Authenticator interface {
Success(data []byte) error
}

type WarningHandlerBuilder interface {
New(session *Session) WarningHandler
}

type WarningHandler interface {
Warnings(qry ExecutableQueryInterface, host *HostInfo, warnings []string)
}

type PasswordAuthenticator struct {
Username string
Password string
Expand Down Expand Up @@ -1404,7 +1412,16 @@ func marshalQueryValue(typ TypeInfo, value interface{}, dst *queryValues) error
return nil
}

func (c *Conn) executeQuery(ctx context.Context, qry *Query) *Iter {
func (c *Conn) executeQuery(ctx context.Context, qry *Query) (iter *Iter) {
defer func() {
if iter == nil || c.session == nil {
return
}
warnings := iter.Warnings()
if len(warnings) > 0 && c.session.warningHandler != nil {
c.session.warningHandler.Warnings(qry, iter.host, warnings)
}
}()
params := queryParams{
consistency: qry.cons,
}
Expand Down Expand Up @@ -1670,7 +1687,17 @@ func (c *Conn) UseKeyspace(keyspace string) error {
return nil
}

func (c *Conn) executeBatch(ctx context.Context, batch *Batch) *Iter {
func (c *Conn) executeBatch(ctx context.Context, batch *Batch) (iter *Iter) {
defer func() {
if iter == nil || c.session == nil {
return
}
warnings := iter.Warnings()
if len(warnings) > 0 && c.session.warningHandler != nil {
c.session.warningHandler.Warnings(batch, iter.host, warnings)
}
}()

if c.version == protoVersion1 {
return &Iter{err: ErrUnsupported}
}
Expand Down
6 changes: 3 additions & 3 deletions control.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,9 @@ func (c *controlConn) getSession() *Session {
func createControlConn(session *Session) *controlConn {

control := &controlConn{
session: session,
quit: make(chan struct{}),
retry: &SimpleRetryPolicy{NumRetries: 3},
session: session,
quit: make(chan struct{}),
retry: &SimpleRetryPolicy{NumRetries: 3},
}

control.conn.Store((*connHost)(nil))
Expand Down
24 changes: 13 additions & 11 deletions query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,27 +7,29 @@ import (
"time"
)

type ExecutableQuery interface {
borrowForExecution() // Used to ensure that the query stays alive for lifetime of a particular execution goroutine.
releaseAfterExecution() // Used when a goroutine finishes its execution attempts, either with ok result or an error.
execute(ctx context.Context, conn *Conn) *Iter
attempt(keyspace string, end, start time.Time, iter *Iter, host *HostInfo)
retryPolicy() RetryPolicy
speculativeExecutionPolicy() SpeculativeExecutionPolicy
type ExecutableQueryInterface interface {
GetRoutingKey() ([]byte, error)
Keyspace() string
Table() string
IsIdempotent() bool
IsLWT() bool
GetCustomPartitioner() Partitioner

withContext(context.Context) ExecutableQuery

RetryableQuery

GetSession() *Session
}

type ExecutableQuery interface {
ExecutableQueryInterface

borrowForExecution() // Used to ensure that the query stays alive for lifetime of a particular execution goroutine.
releaseAfterExecution() // Used when a goroutine finishes its execution attempts, either with ok result or an error.
execute(ctx context.Context, conn *Conn) *Iter
attempt(keyspace string, end, start time.Time, iter *Iter, host *HostInfo)
retryPolicy() RetryPolicy
speculativeExecutionPolicy() SpeculativeExecutionPolicy
withContext(context.Context) ExecutableQuery
}

type queryExecutor struct {
pool *policyConnPool
policy HostSelectionPolicy
Expand Down
5 changes: 4 additions & 1 deletion session.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ type Session struct {
tabletsRoutingV1 bool

usingTimeoutClause string
warningHandler WarningHandler
}

var queryPool = &sync.Pool{
Expand Down Expand Up @@ -182,7 +183,9 @@ func newSessionCommon(cfg ClusterConfig) (*Session, error) {
return nil, fmt.Errorf("gocql: unable to create session: %v", err)
}
s.connCfg = connCfg

if cfg.WarningsHandler != nil {
s.warningHandler = cfg.WarningsHandler.New(s)
}
return s, nil
}

Expand Down
23 changes: 23 additions & 0 deletions warning_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package gocql

type DefaultWarningHandler struct {
logger StdLogger
}

var NoopWarningHandler WarningHandler = nil

func (d DefaultWarningHandler) New(session *Session) WarningHandler {
return DefaultWarningHandler{
logger: session.logger,
}
}

var _ WarningHandler = DefaultWarningHandler{}

func (d DefaultWarningHandler) Warnings(qry ExecutableQueryInterface, host *HostInfo, warnings []string) {
if host != nil && len(host.hostId) > 0 {
d.logger.Printf("[%s] warnings: %v", host.hostId, warnings)
} else {
d.logger.Printf("Cluster warnings: %v", warnings)
}
}

0 comments on commit d0d3498

Please sign in to comment.