-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathconn.go
127 lines (98 loc) · 4.19 KB
/
conn.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package duckdbreplicator
import (
"context"
"github.com/jmoiron/sqlx"
)
// Conn represents a single database connection.
// This is useful when running a chain of queries using a single write connection.
type Conn interface {
// Connx returns the underlying sqlx.Conn.
Connx() *sqlx.Conn
// CreateTableAsSelect creates a new table by name from the results of the given SQL query.
CreateTableAsSelect(ctx context.Context, name string, sql string, opts *CreateTableOptions) error
// InsertTableAsSelect inserts the results of the given SQL query into the table.
InsertTableAsSelect(ctx context.Context, name string, sql string, opts *InsertTableOptions) error
// DropTable removes a table from the database.
DropTable(ctx context.Context, name string) error
// RenameTable renames a table in the database.
RenameTable(ctx context.Context, oldName, newName string) error
// AddTableColumn adds a column to the table.
AddTableColumn(ctx context.Context, tableName, columnName, typ string) error
// AlterTableColumn alters the type of a column in the table.
AlterTableColumn(ctx context.Context, tableName, columnName, newType string) error
}
type conn struct {
*sqlx.Conn
db *db
}
var _ Conn = (*conn)(nil)
func (c *conn) Connx() *sqlx.Conn {
return c.Conn
}
func (c *conn) CreateTableAsSelect(ctx context.Context, name, sql string, opts *CreateTableOptions) error {
if opts == nil {
opts = &CreateTableOptions{}
}
return c.db.createTableAsSelect(ctx, c.Conn, func() error { return nil }, name, sql, opts)
}
// InsertTableAsSelect inserts the results of the given SQL query into the table.
func (c *conn) InsertTableAsSelect(ctx context.Context, name, sql string, opts *InsertTableOptions) error {
if opts == nil {
opts = &InsertTableOptions{
Strategy: IncrementalStrategyAppend,
}
}
return c.db.insertTableAsSelect(ctx, c.Conn, func() error { return nil }, name, sql, opts)
}
// DropTable removes a table from the database.
func (c *conn) DropTable(ctx context.Context, name string) error {
return c.db.dropTable(ctx, name)
}
// RenameTable renames a table in the database.
func (c *conn) RenameTable(ctx context.Context, oldName, newName string) error {
return c.db.renameTable(ctx, oldName, newName)
}
// AddTableColumn adds a column to the table.
func (c *conn) AddTableColumn(ctx context.Context, tableName, columnName, typ string) error {
return c.db.addTableColumn(ctx, c.Conn, func() error { return nil }, tableName, columnName, typ)
}
// AlterTableColumn alters the type of a column in the table.
func (c *conn) AlterTableColumn(ctx context.Context, tableName, columnName, newType string) error {
return c.db.alterTableColumn(ctx, c.Conn, func() error { return nil }, tableName, columnName, newType)
}
type singledbConn struct {
*sqlx.Conn
db *singledb
}
var _ Conn = (*singledbConn)(nil)
func (c *singledbConn) Connx() *sqlx.Conn {
return c.Conn
}
func (c *singledbConn) CreateTableAsSelect(ctx context.Context, name, sql string, opts *CreateTableOptions) error {
return c.db.createTableAsSelect(ctx, c.Conn, name, sql, opts)
}
// InsertTableAsSelect inserts the results of the given SQL query into the table.
func (c *singledbConn) InsertTableAsSelect(ctx context.Context, name, sql string, opts *InsertTableOptions) error {
if opts == nil {
opts = &InsertTableOptions{
Strategy: IncrementalStrategyAppend,
}
}
return execIncrementalInsert(ctx, c.Conn, name, sql, opts)
}
// DropTable removes a table from the database.
func (c *singledbConn) DropTable(ctx context.Context, name string) error {
return c.db.dropTable(ctx, c.Conn, name)
}
// RenameTable renames a table in the database.
func (c *singledbConn) RenameTable(ctx context.Context, oldName, newName string) error {
return c.db.renameTable(ctx, c.Conn, oldName, newName)
}
// AddTableColumn adds a column to the table.
func (c *singledbConn) AddTableColumn(ctx context.Context, tableName, columnName, typ string) error {
return c.db.addTableColumn(ctx, c.Conn, tableName, columnName, typ)
}
// AlterTableColumn alters the type of a column in the table.
func (c *singledbConn) AlterTableColumn(ctx context.Context, tableName, columnName, newType string) error {
return c.db.alterTableColumn(ctx, c.Conn, tableName, columnName, newType)
}