Skip to content

Commit

Permalink
fix rotate: 2 (#40)
Browse files Browse the repository at this point in the history
  • Loading branch information
vadv authored Oct 9, 2020
1 parent cd9b7b8 commit 16a9585
Show file tree
Hide file tree
Showing 7 changed files with 58 additions and 42 deletions.
3 changes: 1 addition & 2 deletions gatherer/cmd/pg_gatherer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,10 @@ import (
"io/ioutil"
"log"

"github.com/vadv/pg_gatherer/gatherer/internal/secrets"

"gopkg.in/yaml.v2"

"github.com/vadv/pg_gatherer/gatherer/internal/plugins"
"github.com/vadv/pg_gatherer/gatherer/internal/secrets"
)

// Config represent configuration of plugins
Expand Down
16 changes: 8 additions & 8 deletions gatherer/internal/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,33 +11,33 @@ import (
lua "github.com/yuin/gopher-lua"
)

func TestCache(t *testing.T) {
func TestCacheRotate(t *testing.T) {

state := lua.NewState()
cache.Preload(state)
os.RemoveAll("./tests/db.sqlite")
if err := cache.NewSqlite(state, "cache", "./tests/db.sqlite", "prefix_"); err != nil {
os.RemoveAll("./tests/rotate.sqlite")
if err := cache.NewSqlite(state, "cache", "./tests/rotate.sqlite", "prefix_"); err != nil {
t.Fatalf(err.Error())
}
time.Preload(state)
inspect.Preload(state)
if err := state.DoFile("./tests/cache.lua"); err != nil {
if err := state.DoFile("./tests/rotate.lua"); err != nil {
t.Fatalf("error: %s\n", err.Error())
}

}

func TestCacheRotate(t *testing.T) {
func TestCache(t *testing.T) {

state := lua.NewState()
cache.Preload(state)
os.RemoveAll("./tests/rotate.sqlite")
if err := cache.NewSqlite(state, "cache", "./tests/rotate.sqlite", "prefix_"); err != nil {
os.RemoveAll("./tests/db.sqlite")
if err := cache.NewSqlite(state, "cache", "./tests/db.sqlite", "prefix_"); err != nil {
t.Fatalf(err.Error())
}
time.Preload(state)
inspect.Preload(state)
if err := state.DoFile("./tests/rotate.lua"); err != nil {
if err := state.DoFile("./tests/cache.lua"); err != nil {
t.Fatalf("error: %s\n", err.Error())
}

Expand Down
19 changes: 7 additions & 12 deletions gatherer/internal/cache/sqlite/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ const (
// EnvCacheRotateTable env value for override default value
EnvCacheRotateTable = `CACHE_ROTATE_TABLE`
// DefaultCacheRotateTable default value for rotate tables
DefaultCacheRotateTable = int64(60 * 60 * 24)
DefaultCacheRotateTable = int64(60 * 60 * 2)
createQuery = `create table if not exists "%s" (key text primary key, value real, updated_at real)`
)

Expand Down Expand Up @@ -64,23 +64,18 @@ OpenSqlite:
if retries > 3 {
return nil, fmt.Errorf("too many errors while prepare sqlite database")
}
newDB, err := sql.Open(`sqlite3`, path)
// https://github.com/mattn/go-sqlite3/tree/v2.0.3#connection-string
connectionString := fmt.Sprintf("file:%s?_synchronous=0&_journal_mode=OFF", path)
newDB, err := sql.Open(`sqlite3`, connectionString)
if err != nil {
log.Printf("[ERROR] delete db %s, because: %s while open\n", path, err.Error())
log.Printf("[ERROR] delete db %#v, because: %#v while open\n", connectionString, err.Error())
os.RemoveAll(path)
retries++
goto OpenSqlite
}
if _, errSync := newDB.Exec(`PRAGMA synchronous = 0`); errSync != nil {
if _, testQuery := newDB.Exec(`select 1`); testQuery != nil {
newDB.Close()
log.Printf("[ERROR] delete db %s, because: %s while set async\n", path, errSync.Error())
os.RemoveAll(path)
retries++
goto OpenSqlite
}
if _, errJournal := newDB.Exec(`PRAGMA journal_mode = OFF`); errJournal != nil {
newDB.Close()
log.Printf("[ERROR] delete db %s, because: %s while disable journal\n", path, errJournal.Error())
log.Printf("[ERROR] delete db %#v, because: %#v while exec test query\n", connectionString, testQuery.Error())
os.RemoveAll(path)
retries++
goto OpenSqlite
Expand Down
46 changes: 32 additions & 14 deletions gatherer/internal/cache/sqlite/rotate.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package sqlite

import (
"context"
"database/sql"
"fmt"
"log"
"strconv"
Expand All @@ -14,40 +15,37 @@ const (
select
name
from sqlite_master
where type = 'table' and name like '%%_%%' order by name asc;`
where type = 'table' and name like '%%_%%' order by name desc;`
)

func (c *Cache) rotateOldTablesRoutine() {
for {
if err := c.rotateOldTables(); err != nil {
log.Printf("[ERROR] cache %s rotate old tables: %s\n", c.path, err.Error())
time.Sleep(100 * time.Millisecond)
continue
}
time.Sleep(time.Second * time.Duration(c.getCacheRotateTable()/2))
}
}

func (c *Cache) rotateOldTables() error {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
rows, err := c.db.QueryContext(ctx, listTablesQuery)
time.Sleep(100 * time.Millisecond)
deadline := time.Now().Unix() - 2*c.getCacheRotateTable()
tables, err := listOfTables(c.db, time.Second)
if err != nil {
return err
}
defer rows.Close()
deadline := time.Now().Unix() - 2*c.getCacheRotateTable()
for rows.Next() {
tableName := ""
errScan := rows.Scan(&tableName)
if errScan != nil {
return errScan
}
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
for _, tableName := range tables {
if timeSlice := strings.Split(tableName, "_"); len(timeSlice) > 0 {
timeStr := timeSlice[len(timeSlice)-1]
t, err := strconv.ParseInt(timeStr, 10, 64)
if err == nil {
if deadline > t {
log.Printf("[INFO] cache drop table: %#v\n", tableName)
_, errExec := c.db.Exec(fmt.Sprintf(`drop table %#v`, tableName))
_, errExec := c.db.ExecContext(ctx, fmt.Sprintf(`drop table %#v`, tableName))
if errExec != nil {
return errExec
}
Expand All @@ -58,5 +56,25 @@ func (c *Cache) rotateOldTables() error {
}
}
}
return rows.Err()
return nil
}

func listOfTables(db *sql.DB, timeout time.Duration) ([]string, error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
rows, err := db.QueryContext(ctx, listTablesQuery)
if err != nil {
return nil, err
}
defer rows.Close()
var tables []string
for rows.Next() {
tableName := ""
errScan := rows.Scan(&tableName)
if errScan != nil {
return nil, errScan
}
tables = append(tables, tableName)
}
return tables, nil
}
4 changes: 4 additions & 0 deletions gatherer/internal/cache/tests/rotate.lua
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,7 @@ if not(value == 1) then error("must be get from prev table") end
time.sleep(3)
local value = cache:get("must_be_rotated_after_2_second")
if value then error("table must be rotated") end

time.sleep(3)
local value = cache:get("must_be_rotated_after_2_second")
if value then error("table must be rotated") end
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ go 1.12

require (
github.com/lib/pq v1.7.0
github.com/mattn/go-sqlite3 v2.0.3+incompatible
github.com/mattn/go-sqlite3 v1.14.3
github.com/prometheus/client_golang v1.5.1
github.com/vadv/gopher-lua-libs v0.0.9
github.com/vadv/gopher-lua-libs v0.1.1
github.com/yuin/gopher-lua v0.0.0-20200603152657-dc2b0ca8b37e
gopkg.in/yaml.v2 v2.3.0
)
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/lib/pq v1.7.0 h1:h93mCPfUSkaul3Ka/VG8uZdmW1uMHDGxzu0NWHuJmHY=
github.com/lib/pq v1.7.0/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
github.com/mattn/go-sqlite3 v2.0.3+incompatible h1:gXHsfypPkaMZrKbD5209QV9jbUTJKjyR5WD3HYQSd+U=
github.com/mattn/go-sqlite3 v2.0.3+incompatible/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc=
github.com/mattn/go-sqlite3 v1.14.3 h1:j7a/xn1U6TKA/PHHxqZuzh64CdtRc7rU9M+AvkOl5bA=
github.com/mattn/go-sqlite3 v1.14.3/go.mod h1:WVKg1VTActs4Qso6iwGbiFih2UIHo0ENGwNd0Lj+XmI=
github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU=
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
github.com/mitchellh/mapstructure v1.3.2 h1:mRS76wmkOn3KkKAyXDu42V+6ebnXWIztFSYGN7GeoRg=
Expand Down Expand Up @@ -94,8 +94,8 @@ github.com/stretchr/testify v1.5.1 h1:nOGnQDM7FYENwehXlg/kFVnos3rEvtKTjRvOWSzb6H
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/technoweenie/multipartstreamer v1.0.1 h1:XRztA5MXiR1TIRHxH2uNxXxaIkKQDeX7m2XsSOlQEnM=
github.com/technoweenie/multipartstreamer v1.0.1/go.mod h1:jNVxdtShOxzAsukZwTSw6MDx5eUJoiEBsSvzDU9uzog=
github.com/vadv/gopher-lua-libs v0.0.9 h1:emJ4LhOOD/TbdJL6nyK0FUWseTdUfmaYHx36Zhruwo4=
github.com/vadv/gopher-lua-libs v0.0.9/go.mod h1:sF/J6i86cVgu555FydBdFrXLOPtfUhXzEsIqj62SouI=
github.com/vadv/gopher-lua-libs v0.1.1 h1:eLHlTK9iILWq/7nCN4Qkz0/IQxf3PTYfmJj7pFAu4tM=
github.com/vadv/gopher-lua-libs v0.1.1/go.mod h1:Mv5mlDfz7K7QAU1SANxGZcHjsLeU+k4Ll98//WfEGNI=
github.com/yuin/gluamapper v0.0.0-20150323120927-d836955830e7 h1:noHsffKZsNfU38DwcXWEPldrTjIZ8FPNKx8mYMGnqjs=
github.com/yuin/gluamapper v0.0.0-20150323120927-d836955830e7/go.mod h1:bbMEM6aU1WDF1ErA5YJ0p91652pGv140gGw4Ww3RGp8=
github.com/yuin/gopher-lua v0.0.0-20200603152657-dc2b0ca8b37e h1:oIpIX9VKxSCFrfjsKpluGbNPBGq9iNnT9crH781j9wY=
Expand Down

0 comments on commit 16a9585

Please sign in to comment.