forked from ligato/cn-infra
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathplugin_impl_cassa.go
130 lines (107 loc) · 3.6 KB
/
plugin_impl_cassa.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
// Copyright (c) 2017 Cisco and/or its affiliates.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at:
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package cassandra
import (
"errors"
"github.com/willfaught/gockle"
"go.ligato.io/cn-infra/v2/db/sql"
"go.ligato.io/cn-infra/v2/health/statuscheck"
"go.ligato.io/cn-infra/v2/infra"
"go.ligato.io/cn-infra/v2/utils/safeclose"
)
//
const (
probeCassandraConnection = "SELECT keyspace_name FROM system_schema.keyspaces"
)
// Plugin implements Plugin interface therefore can be loaded with other plugins
type Plugin struct {
Deps
clientConfig *ClientConfig
session gockle.Session
}
// Deps is here to group injected dependencies of plugin
// to not mix with other plugin fields.
type Deps struct {
infra.PluginDeps
StatusCheck statuscheck.PluginStatusWriter // inject
}
var (
// ErrMissingVisitorEntity is error returned when visitor is missing entity.
ErrMissingVisitorEntity = errors.New("cassandra: visitor is missing entity")
// ErrMissingEntityField is error returned when visitor entity is missing field.
ErrMissingEntityField = errors.New("cassandra: visitor entity is missing field")
// ErrUnexportedEntityField is error returned when visitor entity has unexported field.
ErrUnexportedEntityField = errors.New("cassandra: visitor entity with unexported field")
// ErrInvalidEndpointConfig is error returned when endpoint and port are not in valid format.
ErrInvalidEndpointConfig = errors.New("cassandra: invalid configuration, endpoint and port not in valid format")
)
// Init is called at plugin startup. The session to Cassandra is established.
func (p *Plugin) Init() (err error) {
if p.session != nil {
return nil // skip initialization
}
// Retrieve config
var cfg Config
found, err := p.Cfg.LoadValue(&cfg)
// need to be strict about config presence for ETCD
if !found {
p.Log.Info("cassandra client config not found ", p.Cfg.GetConfigName(),
" - skip loading this plugin")
return nil
}
if err != nil {
return err
}
// Init session
p.clientConfig, err = ConfigToClientConfig(&cfg)
if err != nil {
return err
}
if p.session == nil && p.clientConfig != nil {
session, err := CreateSessionFromConfig(p.clientConfig)
if err != nil {
return err
}
p.session = gockle.NewSession(session)
}
return nil
}
// AfterInit registers Cassandra to status check.
func (p *Plugin) AfterInit() error {
if p.StatusCheck != nil && p.session != nil {
p.StatusCheck.Register(p.PluginName, func() (statuscheck.PluginState, error) {
broker := p.NewBroker()
err := broker.Exec(`select keyspace_name from system_schema.keyspaces`)
if err == nil {
return statuscheck.OK, nil
}
return statuscheck.Error, err
})
p.Log.Warnf("Status check for %s was started", p.PluginName)
}
return nil
}
// FromExistingSession is used mainly for testing
func FromExistingSession(session gockle.Session) *Plugin {
return &Plugin{session: session}
}
// NewBroker returns a Broker instance to work with Cassandra Data Base
func (p *Plugin) NewBroker() sql.Broker {
return NewBrokerUsingSession(p.session)
}
// Close resources
func (p *Plugin) Close() error {
safeclose.Close(p.session)
return nil
}