-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathconn.go
187 lines (161 loc) · 4.94 KB
/
conn.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
package rethinkgo
import (
"code.google.com/p/goprotobuf/proto"
"encoding/binary"
"errors"
"fmt"
"net"
"io"
"bufio"
p "github.com/christopherhesse/rethinkgo/ql2"
"time"
)
// connection is a connection to a rethinkdb database
type connection struct {
// embed the net.Conn type, so that we can effectively define new methods on
// it (interfaces do not allow that)
net.Conn
}
var debugMode bool = false
func serverConnect(address string, authkey string) (*connection, error) {
conn, err := net.Dial("tcp", address)
if err != nil {
return nil, err
}
if err := binary.Write(conn, binary.LittleEndian, p.VersionDummy_V0_2); err != nil {
return nil, err
}
// authorization key
if err := binary.Write(conn, binary.LittleEndian, uint32(len(authkey))); err != nil {
return nil, err
}
if err := binary.Write(conn, binary.BigEndian, []byte(authkey)); err != nil {
return nil, err
}
// read server response to authorization key (terminated by NUL)
reader := bufio.NewReader(conn)
line, err := reader.ReadBytes('\x00')
if err != nil {
return nil, err
}
// convert to string and remove trailing NUL byte
response := string(line[:len(line)-1])
if response != "SUCCESS" {
// we failed authorization or something else terrible happened
return nil, fmt.Errorf("Failed to connect to server: %v", response)
}
return &connection{conn}, nil
}
// SetDebug causes all queries sent to the server and responses received to be
// printed to stdout in raw form.
//
// Example usage:
//
// r.SetDebug(true)
func SetDebug(debug bool) {
debugMode = debug
}
// writeMessage writes a byte array to the stream preceeded by the length in
// bytes.
func (c *connection) writeMessage(data []byte) error {
messageLength := uint32(len(data))
if err := binary.Write(c, binary.LittleEndian, messageLength); err != nil {
return err
}
_, err := c.Write(data)
return err
}
// writeQuery writes a protobuf message to the connection.
func (c *connection) writeQuery(protobuf *p.Query) error {
data, err := proto.Marshal(protobuf)
if err != nil {
return fmt.Errorf("rethinkdb: Could not marshal protocol buffer: %v, %v", protobuf, err)
}
return c.writeMessage(data)
}
// readMessage reads a single message from a connection. A message is a length
// followed by a serialized protocol buffer.
func (c *connection) readMessage() ([]byte, error) {
var messageLength uint32
if err := binary.Read(c, binary.LittleEndian, &messageLength); err != nil {
return nil, err
}
buffer := make([]byte, messageLength)
_, err := io.ReadFull(c, buffer)
if err != nil {
return nil, err
}
return buffer, nil
}
// readResponse reads a protobuf message from a connection and parses it.
func (c *connection) readResponse() (*p.Response, error) {
data, err := c.readMessage()
if err != nil {
return nil, err
}
response := &p.Response{}
err = proto.Unmarshal(data, response)
return response, err
}
// executeQueryProtobuf sends a single query to the server and retrieves the parsed
// response, a lower level function used by .executeQuery()
func (c *connection) executeQueryProtobuf(protobuf *p.Query) (responseProto *p.Response, err error) {
if err = c.writeQuery(protobuf); err != nil {
return
}
for {
responseProto, err = c.readResponse()
if err != nil {
return
}
if responseProto.GetToken() == protobuf.GetToken() {
break
} else if responseProto.GetToken() > protobuf.GetToken() {
return nil, errors.New("rethinkdb: The server returned a response for a protobuf that was not submitted by us")
}
}
return
}
// executeQuery is an internal function, shared by Rows iterator and the normal
// Run() call. Runs a protocol buffer formatted query, returns a list of strings
// and a status code.
func (c *connection) executeQuery(queryProto *p.Query, timeout time.Duration) (result []*p.Datum, responseType p.Response_ResponseType, err error) {
if debugMode {
fmt.Printf("rethinkdb: queryProto:\n%v", protobufToString(queryProto, 1))
}
// if the user has set a timeout, make sure we set a deadline on the connection
// so that we don't exceed the timeout. if not, use the zero time value to
// indicate no deadline
if timeout == 0 {
c.SetDeadline(time.Time{})
} else {
c.SetDeadline(time.Now().Add(timeout))
}
r, err := c.executeQueryProtobuf(queryProto)
// reset the deadline for the connection
c.SetDeadline(time.Time{})
if err != nil {
return
}
if debugMode {
fmt.Printf("rethinkdb: responseProto:\n%v", protobufToString(r, 1))
}
responseType = r.GetType()
switch responseType {
case p.Response_SUCCESS_ATOM, p.Response_SUCCESS_SEQUENCE, p.Response_SUCCESS_PARTIAL:
result = r.Response
default:
// some sort of error
switch responseType {
case p.Response_CLIENT_ERROR:
err = ErrBrokenClient{response: r}
case p.Response_COMPILE_ERROR:
err = ErrBadQuery{response: r}
case p.Response_RUNTIME_ERROR:
err = ErrRuntime{response: r}
default:
err = fmt.Errorf("rethinkdb: Unexpected response type from server: %v", responseType)
}
}
return
}