Skip to content

Commit

Permalink
feat(sharding): enabled sharding feature on db (#13)
Browse files Browse the repository at this point in the history
  • Loading branch information
cnlangzi authored Feb 25, 2024
1 parent 5f6f7d6 commit 41f0abf
Show file tree
Hide file tree
Showing 19 changed files with 550 additions and 321 deletions.
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]
## [1.2.0]
### Added
- added `BitBool` for mysql bit type (#11)
- added `sharding` feature (#12)
- added `On` on `DB` to enable AutoSharding feature(#13)
- added `On` on `SQLBuilder` to enable AutoRotation feature(#13)

### Fixed
- fixed parameterized placeholder for postgresql(#12)
Expand Down
88 changes: 72 additions & 16 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ You’ll find the SQLE package useful if you’re not a fan of full-featured ORM
slices of map/structs/primitive types.
- 100% compatible drop-in replacement of "database/sql". Code is really easy to migrate from `database/sql` to `SQLE`. see [examples](row_test.go)
- [Migration](migrate/migrator_test.go)
- _Configurable and extendable sharding_
- [ShardID](shardid/README.md) is a `snowflake-like` distributed sequence unique identifier with extended features for table rotation and database sharding.
- Table AutoRotation
- Database AutoSharding

## Tutorials
> All examples on https://go.dev/doc/tutorial/database-access can directly work with `sqle.DB` instance.
Expand All @@ -47,28 +49,29 @@ SQLE directly connects to a database by `sql.DB` instance.
var db *sqle.DB
switch driver {
case "sqlite":
sqldb, err := sql.Open("sqlite3", "file:"+dsn+"?cache=shared")
if err != nil {
panic(fmt.Sprintf("db: failed to open sqlite database %s", dsn))
}
case "sqlite":
sqldb, err := sql.Open("sqlite3", "file:"+dsn+"?cache=shared")
if err != nil {
panic(fmt.Sprintf("db: failed to open sqlite database %s", dsn))
}
db = sqle.Open(sqldb)
db = sqle.Open(sqldb)
case "mysql":
sqldb, err := sql.Open("mysql", dsn)
if err != nil {
panic(fmt.Sprintf("db: failed to open mysql database %s", dsn))
}
db = sqle.Open(sqldb)
case "mysql":
sqldb, err := sql.Open("mysql", dsn)
if err != nil {
panic(fmt.Sprintf("db: failed to open mysql database %s", dsn))
}
default:
panic(fmt.Sprintf("db: driver %s is not supported yet", driver))
db = sqle.Open(sqldb)
default:
panic(fmt.Sprintf("db: driver %s is not supported yet", driver))
}
if err := db.Ping(); err == nil {
panic("db: database is unreachable")
panic("db: database is unreachable")
}
```

Expand Down Expand Up @@ -418,4 +421,57 @@ func deleteAlbums(ids []int64) error {
}
})
}
```


### Table Rotation
use `shardid.ID` to enable rotate feature for a table based on option (NoRotate/MonthlyRotate/WeeklyRotate/DailyRotate)

```
gen := shardid.New(WithTableRotate(shardid.DailyRotate))
id := gen.Next()
b := New().On(id) //call `On` to enable rotate feature, and setup a input variable <rotate>
b.Delete("orders<rotate>").Where().
If(true).And("order_id = {order_id}").
If(false).And("member_id").
Param("order_id", "order_123456")
db.ExecBuilder(context.TODO(),b) //DELETE FROM `orders_20240220` WHERE order_id = ?
```
see more [examples](sqlbuilder_test.go#L490)


### Database Sharding
use `shardid.ID` to enable sharding feature for any sql
```
gen := shardid.New(WithDatabase(10)) // 10 database instances
id := gen.Next()
b := New().On(id) //call `On` to setup an input variable named `rotate`, and enable rotation feature
b.Delete("orders<rotate>").Where().
If(true).And("order_id = {order_id}").
If(false).And("member_id").
Param("order_id", "order_123456")
db.On(id). //automatically select database based on `id.DatabaseID`
ExecBuilder(context.TODO(),b) //DELETE FROM `orders` WHERE order_id = ?
```

see more [examples](db_test.go#L49)


## SQL Injection
SQLE uses the database/sql‘s argument placeholders to build parameterized SQL statement, which will automatically escape arguments to avoid SQL injection. eg if postgresql is used in your app, please call [UsePostgres](use.go#L5) on SQLBuilder or change [DefaultSQLQuote](sqlbuilder.go?L16) and [DefaultSQLParameterize](sqlbuilder.go?L17) to update parameterization options.

```
func UsePostgres(b *Builder) {
b.Quote = "`"
b.Parameterize = func(name string, index int) string {
return "$" + strconv.Itoa(index)
}
}
```
155 changes: 155 additions & 0 deletions context.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
package sqle

import (
"context"
"database/sql"
"sync"

"github.com/rs/zerolog/log"
)

type Context struct {
*sql.DB
sync.Mutex
_ noCopy

index int
stmts map[string]*cachedStmt
stmtsMutex sync.RWMutex
}

func (db *Context) Query(query string, args ...any) (*Rows, error) {
return db.QueryContext(context.Background(), query, args...)
}

func (db *Context) QueryBuilder(ctx context.Context, b *Builder) (*Rows, error) {
query, args, err := b.Build()
if err != nil {
return nil, err
}

return db.QueryContext(ctx, query, args...)
}

func (db *Context) QueryContext(ctx context.Context, query string, args ...any) (*Rows, error) {
var rows *sql.Rows
var stmt *sql.Stmt
var err error
if len(args) > 0 {
stmt, err = db.prepareStmt(ctx, query)
if err == nil {
rows, err = stmt.QueryContext(ctx, args...)
if err != nil {
return nil, err
}
}

} else {
rows, err = db.DB.QueryContext(ctx, query, args...)
if err != nil {
return nil, err
}
}

return &Rows{Rows: rows, query: query}, nil
}

func (db *Context) QueryRow(query string, args ...any) *Row {
return db.QueryRowContext(context.Background(), query, args...)
}

func (db *Context) QueryRowBuilder(ctx context.Context, b *Builder) *Row {
query, args, err := b.Build()
if err != nil {
return &Row{
err: err,
query: query,
}
}

return db.QueryRowContext(ctx, query, args...)
}

func (db *Context) QueryRowContext(ctx context.Context, query string, args ...any) *Row {
var rows *sql.Rows
var stmt *sql.Stmt
var err error

if len(args) > 0 {
stmt, err = db.prepareStmt(ctx, query)
if err != nil {
return &Row{
err: err,
query: query,
}
}
rows, err = stmt.QueryContext(ctx, args...)
return &Row{
rows: rows,
err: err,
query: query,
}
}

rows, err = db.DB.QueryContext(ctx, query, args...)
return &Row{
rows: rows,
err: err,
query: query,
}
}

func (db *Context) Exec(query string, args ...any) (sql.Result, error) {
return db.ExecContext(context.Background(), query, args...)
}

func (db *Context) ExecBuilder(ctx context.Context, b *Builder) (sql.Result, error) {
query, args, err := b.Build()
if err != nil {
return nil, err
}

return db.ExecContext(ctx, query, args...)
}

func (db *Context) ExecContext(ctx context.Context, query string, args ...any) (sql.Result, error) {
if len(args) > 0 {
stmt, err := db.prepareStmt(ctx, query)
if err != nil {
return nil, err
}

return stmt.ExecContext(ctx, args...)
}
return db.DB.ExecContext(context.Background(), query, args...)
}

func (db *Context) Begin(opts *sql.TxOptions) (*Tx, error) {
return db.BeginTx(context.TODO(), opts)

}

func (db *Context) BeginTx(ctx context.Context, opts *sql.TxOptions) (*Tx, error) {
tx, err := db.DB.BeginTx(ctx, opts)
if err != nil {
return nil, err
}

return &Tx{Tx: tx, cachedStmts: make(map[string]*sql.Stmt)}, nil
}

func (db *Context) Transaction(ctx context.Context, opts *sql.TxOptions, fn func(ctx context.Context, tx *Tx) error) error {
tx, err := db.BeginTx(ctx, opts)
if err != nil {
return err
}

err = fn(ctx, tx)
if err != nil {
if e := tx.Rollback(); e != nil {
log.Error().Str("pkg", "sqle").Str("tag", "tx").Err(e)
}
return err
}
return tx.Commit()
}
4 changes: 2 additions & 2 deletions stmt.go → context_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ type cachedStmt struct {
lastUsed time.Time
}

func (db *DB) prepareStmt(ctx context.Context, query string) (*sql.Stmt, error) {
func (db *Context) prepareStmt(ctx context.Context, query string) (*sql.Stmt, error) {
db.stmtsMutex.RLock()
s, ok := db.stmts[query]
db.stmtsMutex.RUnlock()
Expand All @@ -39,7 +39,7 @@ func (db *DB) prepareStmt(ctx context.Context, query string) (*sql.Stmt, error)
return stmt, nil
}

func (db *DB) closeIdleStmt() {
func (db *Context) closeIdleStmt() {
for {
<-time.After(1 * time.Minute)

Expand Down
File renamed without changes.
Loading

0 comments on commit 41f0abf

Please sign in to comment.