diff --git a/cluster.go b/cluster.go index 3d795322b..bcff1d55a 100644 --- a/cluster.go +++ b/cluster.go @@ -109,6 +109,8 @@ type ClusterConfig struct { // Default: nil Authenticator Authenticator + WarningsHandler WarningHandlerBuilder + // An Authenticator factory. Can be used to create alternative authenticators. // Default: nil AuthProvider func(h *HostInfo) (Authenticator, error) @@ -322,6 +324,7 @@ func NewCluster(hosts ...string) *ClusterConfig { WriteCoalesceWaitTime: 200 * time.Microsecond, MetadataSchemaRequestTimeout: 60 * time.Second, DisableSkipMetadata: true, + WarningsHandler: DefaultWarningHandler{}, } return cfg diff --git a/conn.go b/conn.go index 670ff59be..666b41816 100644 --- a/conn.go +++ b/conn.go @@ -66,6 +66,14 @@ type Authenticator interface { Success(data []byte) error } +type WarningHandlerBuilder interface { + New(session *Session) WarningHandler +} + +type WarningHandler interface { + Warnings(qry ExecutableQueryInterface, host *HostInfo, warnings []string) +} + type PasswordAuthenticator struct { Username string Password string @@ -1404,7 +1412,16 @@ func marshalQueryValue(typ TypeInfo, value interface{}, dst *queryValues) error return nil } -func (c *Conn) executeQuery(ctx context.Context, qry *Query) *Iter { +func (c *Conn) executeQuery(ctx context.Context, qry *Query) (iter *Iter) { + defer func() { + if iter == nil || c.session == nil { + return + } + warnings := iter.Warnings() + if len(warnings) > 0 && c.session.warningHandler != nil { + c.session.warningHandler.Warnings(qry, iter.host, warnings) + } + }() params := queryParams{ consistency: qry.cons, } @@ -1670,7 +1687,17 @@ func (c *Conn) UseKeyspace(keyspace string) error { return nil } -func (c *Conn) executeBatch(ctx context.Context, batch *Batch) *Iter { +func (c *Conn) executeBatch(ctx context.Context, batch *Batch) (iter *Iter) { + defer func() { + if iter == nil || c.session == nil { + return + } + warnings := iter.Warnings() + if len(warnings) > 0 && c.session.warningHandler != nil { + c.session.warningHandler.Warnings(batch, iter.host, warnings) + } + }() + if c.version == protoVersion1 { return &Iter{err: ErrUnsupported} } diff --git a/control.go b/control.go index c88d7f4e1..0f5782c45 100644 --- a/control.go +++ b/control.go @@ -66,9 +66,9 @@ func (c *controlConn) getSession() *Session { func createControlConn(session *Session) *controlConn { control := &controlConn{ - session: session, - quit: make(chan struct{}), - retry: &SimpleRetryPolicy{NumRetries: 3}, + session: session, + quit: make(chan struct{}), + retry: &SimpleRetryPolicy{NumRetries: 3}, } control.conn.Store((*connHost)(nil)) diff --git a/query_executor.go b/query_executor.go index 354383981..234c4a2ba 100644 --- a/query_executor.go +++ b/query_executor.go @@ -7,27 +7,29 @@ import ( "time" ) -type ExecutableQuery interface { - borrowForExecution() // Used to ensure that the query stays alive for lifetime of a particular execution goroutine. - releaseAfterExecution() // Used when a goroutine finishes its execution attempts, either with ok result or an error. - execute(ctx context.Context, conn *Conn) *Iter - attempt(keyspace string, end, start time.Time, iter *Iter, host *HostInfo) - retryPolicy() RetryPolicy - speculativeExecutionPolicy() SpeculativeExecutionPolicy +type ExecutableQueryInterface interface { GetRoutingKey() ([]byte, error) Keyspace() string Table() string IsIdempotent() bool IsLWT() bool GetCustomPartitioner() Partitioner - - withContext(context.Context) ExecutableQuery - RetryableQuery - GetSession() *Session } +type ExecutableQuery interface { + ExecutableQueryInterface + + borrowForExecution() // Used to ensure that the query stays alive for lifetime of a particular execution goroutine. + releaseAfterExecution() // Used when a goroutine finishes its execution attempts, either with ok result or an error. + execute(ctx context.Context, conn *Conn) *Iter + attempt(keyspace string, end, start time.Time, iter *Iter, host *HostInfo) + retryPolicy() RetryPolicy + speculativeExecutionPolicy() SpeculativeExecutionPolicy + withContext(context.Context) ExecutableQuery +} + type queryExecutor struct { pool *policyConnPool policy HostSelectionPolicy diff --git a/session.go b/session.go index bca366282..0e6dcb464 100644 --- a/session.go +++ b/session.go @@ -87,6 +87,7 @@ type Session struct { tabletsRoutingV1 bool usingTimeoutClause string + warningHandler WarningHandler } var queryPool = &sync.Pool{ @@ -182,7 +183,9 @@ func newSessionCommon(cfg ClusterConfig) (*Session, error) { return nil, fmt.Errorf("gocql: unable to create session: %v", err) } s.connCfg = connCfg - + if cfg.WarningsHandler != nil { + s.warningHandler = cfg.WarningsHandler.New(s) + } return s, nil } diff --git a/warning_handler.go b/warning_handler.go new file mode 100644 index 000000000..2b39636d2 --- /dev/null +++ b/warning_handler.go @@ -0,0 +1,23 @@ +package gocql + +type DefaultWarningHandler struct { + logger StdLogger +} + +var NoopWarningHandler WarningHandler = nil + +func (d DefaultWarningHandler) New(session *Session) WarningHandler { + return DefaultWarningHandler{ + logger: session.logger, + } +} + +var _ WarningHandler = DefaultWarningHandler{} + +func (d DefaultWarningHandler) Warnings(qry ExecutableQueryInterface, host *HostInfo, warnings []string) { + if host != nil && len(host.hostId) > 0 { + d.logger.Printf("[%s] warnings: %v", host.hostId, warnings) + } else { + d.logger.Printf("Cluster warnings: %v", warnings) + } +}