From dd21ab60458c71f6a450b56cc291e128ab453d86 Mon Sep 17 00:00:00 2001 From: Yi Duan Date: Wed, 2 Aug 2023 15:01:25 +0800 Subject: [PATCH] feat: update config using env (#3) * feat: add export API * test * check env when init * not implicitly init default manager * update readme * update README.md * update * check enable * simplify backup option * seperate backup handler * add BackupHandler type * back: pass BackupHandler every time * rename --- README.md | 24 ++--- api_test.go | 188 ++++++++++++++++++++-------------------- backup/metainfo.go | 111 ++++++++++++++++++++++++ backup/metainfo_test.go | 102 ++++++++++++++++++++++ example_test.go | 59 +++++++------ gls.go | 73 +++++++++------- go.mod | 1 + go.sum | 11 +++ manager.go | 27 +++--- session.go | 65 +++++++++----- 10 files changed, 460 insertions(+), 201 deletions(-) create mode 100644 backup/metainfo.go create mode 100644 backup/metainfo_test.go diff --git a/README.md b/README.md index a5cf170..fb119c9 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,7 @@ # LocalSession ## Introduction -LocalSession is used to **implicitly** manage and transmit context **within** or **between** goroutines. +LocalSession is used to **implicitly** manage and transmit context **within** or **between** goroutines. In canonical way, Go recommands developers to explicitly pass `context.Context` between functions to ensure the downstream callee get desired information from upstream. However this is tedious and ineffecient, resulting in many developers forget (or just don't want) to follow this practice. We have found many cases like that, especially in framework. Therefore, we design and implement a way to implicitly pass application context from root caller to end callee, without troubling intermediate implementation to always bring context. ## Usage ### Session @@ -68,10 +68,10 @@ func GetDataX() { } ``` -We provides a `defaultManager` to manage session between different goroutines, thus you don't need to make a SessionManager by your own. +We provide a globally default manager to manage session between different goroutines, as long as you set `InitDefaultManager()` first. ### Explicitly Transmit Async Context (Recommended) -You can use `Go()` or `GoSession()` to initiatively transmit your context to other goroutines. +You can use `Go()` or `GoSession()` to explicitly transmit your context to other goroutines. ```go @@ -82,6 +82,11 @@ import ( . "github.com/cloudwego/localsession" ) +func init() { + // initialize default manager first + InitDefaultManager(DefaultManagerOptions()) +} + func GetCurSession() Session { s, ok := CurSession() if !ok { @@ -196,28 +201,23 @@ func main() { You can also set option `EnableImplicitlyTransmitAsync` as true to transparently transmit context. Once the option is enabled, every goroutine will inherit their parent's session. ```go func ExampleSessionCtx_EnableImplicitlyTransmitAsync() { - // rest DefaultManager with new Options + // EnableImplicitlyTransmitAsync must be true ResetDefaultManager(ManagerOptions{ ShardNumber: 10, EnableImplicitlyTransmitAsync: true, GCInterval: time.Hour, }) - // WARNING: pprof.Do() must be called before BindSession(), + // WARNING: if you want to use `pprof.Do()`, it must be called before `BindSession()`, // otherwise transparently transmitting session will be dysfunctional - labels := pprof.Labels("c", "d") - pprof.Do(context.Background(), labels, func(ctx context.Context){}) + // labels := pprof.Labels("c", "d") + // pprof.Do(context.Background(), labels, func(ctx context.Context){}) s := NewSessionMap(map[interface{}]interface{}{ "a": "b", }) BindSession(s) - // WARNING: pprof.Do() must be called before BindSession(), - // otherwise transparently transmitting session will be dysfunctional - // labels := pprof.Labels("c", "d") - // pprof.Do(context.Background(), labels, func(ctx context.Context){}) - wg := sync.WaitGroup{} wg.Add(3) go func() { diff --git a/api_test.go b/api_test.go index 07060b8..1b59a43 100644 --- a/api_test.go +++ b/api_test.go @@ -27,17 +27,22 @@ import ( const N = 10 +func TestMain(m *testing.M) { + InitDefaultManager(DefaultManagerOptions()) + m.Run() +} + func TestResetDefaultManager(t *testing.T) { old := defaultManagerObj - + t.Run("arg", func(t *testing.T) { defaultManagerOnce = sync.Once{} exp := ManagerOptions{ - ShardNumber: 1, + ShardNumber: 1, EnableImplicitlyTransmitAsync: true, - GCInterval: time.Second*2, + GCInterval: time.Second * 2, } - ResetDefaultManager(exp) + InitDefaultManager(exp) act := defaultManagerObj.Options() require.Equal(t, exp, act) }) @@ -46,41 +51,41 @@ func TestResetDefaultManager(t *testing.T) { defaultManagerOnce = sync.Once{} env := `true,10,10s` os.Setenv(SESSION_CONFIG_KEY, env) - ResetDefaultManager(ManagerOptions{}) - act := defaultManagerObj.Options() exp := DefaultManagerOptions() + InitDefaultManager(exp) + act := defaultManagerObj.Options() exp.ShardNumber = 10 exp.EnableImplicitlyTransmitAsync = true - exp.GCInterval = time.Second*10 + exp.GCInterval = time.Second * 10 require.Equal(t, exp, act) defaultManagerOnce = sync.Once{} env = `,1000` os.Setenv(SESSION_CONFIG_KEY, env) - ResetDefaultManager(ManagerOptions{}) - act = defaultManagerObj.Options() exp = DefaultManagerOptions() + InitDefaultManager(exp) + act = defaultManagerObj.Options() exp.ShardNumber = 1000 require.Equal(t, exp, act) defaultManagerOnce = sync.Once{} env = `,1,2s` os.Setenv(SESSION_CONFIG_KEY, env) - ResetDefaultManager(ManagerOptions{}) - act = defaultManagerObj.Options() exp = DefaultManagerOptions() + InitDefaultManager(exp) + act = defaultManagerObj.Options() exp.ShardNumber = 1 - exp.GCInterval = time.Second*2 + exp.GCInterval = time.Second * 2 require.Equal(t, exp, act) defaultManagerOnce = sync.Once{} env = `true,,2s` os.Setenv(SESSION_CONFIG_KEY, env) - ResetDefaultManager(ManagerOptions{}) - act = defaultManagerObj.Options() exp = DefaultManagerOptions() + InitDefaultManager(exp) + act = defaultManagerObj.Options() exp.EnableImplicitlyTransmitAsync = true - exp.GCInterval = time.Second*2 + exp.GCInterval = time.Second * 2 require.Equal(t, exp, act) }) @@ -88,14 +93,13 @@ func TestResetDefaultManager(t *testing.T) { defaultManagerOnce = sync.Once{} } -// //go:nocheckptr func TestTransparentTransmitAsync(t *testing.T) { old := defaultManagerObj - ResetDefaultManager(ManagerOptions{ - ShardNumber: 10, + InitDefaultManager(ManagerOptions{ + ShardNumber: 10, EnableImplicitlyTransmitAsync: true, - GCInterval: time.Hour, + GCInterval: time.Second * 2, }) s := NewSessionMap(map[interface{}]interface{}{ "a": "b", @@ -103,10 +107,10 @@ func TestTransparentTransmitAsync(t *testing.T) { labels := pprof.Labels("c", "d") - // WARNING: pprof.Do() must be called before BindSession(), + // WARNING: pprof.Do() must be called before BindSession(), // otherwise transparently transmitting session will be dysfunctional pprof.Do(context.Background(), labels, func(ctx context.Context) {}) - + BindSession(s) wg := sync.WaitGroup{} @@ -140,7 +144,7 @@ func TestSessionTimeout(t *testing.T) { ss := s.WithValue(1, 2) m := NewSessionMapWithTimeout(map[interface{}]interface{}{}, time.Second) mm := m.WithValue(1, 2) - time.Sleep(time.Second*2) + time.Sleep(time.Second * 2) require.False(t, ss.IsValid()) require.False(t, mm.IsValid()) } @@ -153,7 +157,7 @@ func TestSessionCtx(t *testing.T) { var sig2 = make(chan struct{}) // initialize new session with context - var session = NewSessionCtx(ctx)// implementation... + var session = NewSessionCtx(ctx) // implementation... // set specific key-value and update session start := session.WithValue(key, v) @@ -162,20 +166,20 @@ func TestSessionCtx(t *testing.T) { BindSession(start) // pass to new goroutine... - Go(func(){ + Go(func() { // read specific key under current session val := mustCurSession().Get(key) // val exists require.Equal(t, v, val) // doSomething.... - + // set specific key-value under current session // NOTICE: current session won't change here next := mustCurSession().WithValue(key2, v2) val2 := mustCurSession().Get(key2) // val2 == nil require.Nil(t, val2) - + // pass both parent session and new session to sub goroutine - GoSession(next, func(){ + GoSession(next, func() { // read specific key under current session val := mustCurSession().Get(key) // val exists require.Equal(t, v, val) @@ -183,16 +187,16 @@ func TestSessionCtx(t *testing.T) { val2 := mustCurSession().Get(key2) // val2 exists require.Equal(t, v2, val2) // doSomething.... - + sig2 <- struct{}{} - <- sig + <-sig require.False(t, mustCurSession().IsValid()) // current session is invalid - + println("g2 done") sig2 <- struct{}{} }) - + Go(func() { // read specific key under current session val := mustCurSession().Get(key) // val exists @@ -204,29 +208,29 @@ func TestSessionCtx(t *testing.T) { sig2 <- struct{}{} - <- sig + <-sig require.False(t, mustCurSession().IsValid()) // current session is invalid println("g3 done") sig2 <- struct{}{} }) - + BindSession(next) val2 = mustCurSession().Get(key2) // val2 exists require.Equal(t, v2, val2) sig2 <- struct{}{} - <- sig + <-sig require.False(t, next.IsValid()) // next is invalid println("g1 done") sig2 <- struct{}{} }) - <- sig2 - <- sig2 - <- sig2 + <-sig2 + <-sig2 + <-sig2 val2 := mustCurSession().Get(key2) // val2 == nil require.Nil(t, val2) @@ -237,10 +241,10 @@ func TestSessionCtx(t *testing.T) { close(sig) require.False(t, start.IsValid()) // start is invalid - - <- sig2 - <- sig2 - <- sig2 + + <-sig2 + <-sig2 + <-sig2 println("g0 done") UnbindSession() @@ -258,7 +262,7 @@ func TestSessionMap(t *testing.T) { var sig2 = make(chan struct{}) // initialize new session with context - var session = NewSessionMap(map[interface{}]interface{}{})// implementation... + var session = NewSessionMap(map[interface{}]interface{}{}) // implementation... // set specific key-value and update session start := session.WithValue(key, v) @@ -267,20 +271,20 @@ func TestSessionMap(t *testing.T) { BindSession(start) // pass to new goroutine... - Go(func(){ + Go(func() { // read specific key under current session val := mustCurSession().Get(key) // val exists require.Equal(t, v, val) // doSomething.... - + // set specific key-value under current session // NOTICE: current session won't change here next := mustCurSession().WithValue(key2, v2) val2 := mustCurSession().Get(key2) // val2 exist require.Equal(t, v2, val2) - + // pass both parent session and new session to sub goroutine - GoSession(next, func(){ + GoSession(next, func() { // read specific key under current session val := mustCurSession().Get(key) // val exists require.Equal(t, v, val) @@ -288,16 +292,16 @@ func TestSessionMap(t *testing.T) { val2 := mustCurSession().Get(key2) // val2 exists require.Equal(t, v2, val2) // doSomething.... - + sig2 <- struct{}{} - <- sig + <-sig require.False(t, mustCurSession().IsValid()) // current session is invalid - + println("g2 done") sig2 <- struct{}{} }) - + Go(func() { // read specific key under current session val := mustCurSession().Get(key) // val exists @@ -309,29 +313,29 @@ func TestSessionMap(t *testing.T) { sig2 <- struct{}{} - <- sig + <-sig require.False(t, mustCurSession().IsValid()) // current session is invalid println("g3 done") sig2 <- struct{}{} }) - + BindSession(next) val2 = mustCurSession().Get(key2) // val2 exists require.Equal(t, v2, val2) sig2 <- struct{}{} - <- sig + <-sig require.False(t, next.IsValid()) // next is invalid println("g1 done") sig2 <- struct{}{} }) - <- sig2 - <- sig2 - <- sig2 + <-sig2 + <-sig2 + <-sig2 val2 := mustCurSession().Get(key2) // val2 exists require.Equal(t, v2, val2) @@ -342,26 +346,25 @@ func TestSessionMap(t *testing.T) { close(sig) require.False(t, start.IsValid()) // start is invalid - - <- sig2 - <- sig2 - <- sig2 + + <-sig2 + <-sig2 + <-sig2 println("g0 done") UnbindSession() } - func TestSessionManager_GC(t *testing.T) { - inter := time.Second*2 + inter := time.Second * 2 sd := 10 manager := NewSessionManager(ManagerOptions{ ShardNumber: sd, - GCInterval: inter, + GCInterval: inter, }) - + var N = 1000 - for i:=0; i>1) + time.Sleep(inter + inter>>1) sum := 0 for _, shard := range manager.shards { shard.lock.Lock() @@ -391,7 +394,7 @@ func BenchmarkSessionManager_CurSession(b *testing.B) { b.Run("sync", func(b *testing.B) { BindSession(s) - for i:=0; i 0 { + // persistent kvs + kvs := make([]string, 0, n*2) + mkvs := metainfo.GetAllPersistentValues(ctx) + + // incoming ctx is prior to session + if len(mkvs) == 0 { + // merge all kvs from pre + metainfo.RangePersistentValues(pre, func(k, v string) bool { + kvs = append(kvs, k, v) + return true + }) + } else { + metainfo.RangePersistentValues(pre, func(k, v string) bool { + // filter kvs which exists in cur + if _, ok := mkvs[k]; !ok { + kvs = append(kvs, k, v) + } + return true + }) + } + ctx = metainfo.WithPersistentValues(ctx, kvs...) + } + + return ctx +} + +// Set current Sessioin +func BackupCtx(ctx context.Context) { + localsession.BindSession(localsession.NewSessionCtx(ctx)) +} + +// Unset current Session +func ClearCtx() { + localsession.UnbindSession() +} diff --git a/backup/metainfo_test.go b/backup/metainfo_test.go new file mode 100644 index 0000000..15c20b9 --- /dev/null +++ b/backup/metainfo_test.go @@ -0,0 +1,102 @@ +/* + * Copyright 2023 CloudWeGo Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package backup + +import ( + "context" + "os" + "testing" + + "github.com/bytedance/gopkg/cloud/metainfo" +) + +func TestMain(m *testing.M) { + opts := DefaultOptions() + opts.EnableImplicitlyTransmitAsync = true + opts.Enable = true + Init(opts) + os.Exit(m.Run()) +} + +type CtxKeyTestType struct{} + +var CtxKeyTest1 CtxKeyTestType + +func TestRecoverCtxOndemands(t *testing.T) { + ctx := context.WithValue(context.Background(), CtxKeyTest1, "c") + BackupCtx(metainfo.WithPersistentValues(ctx, "a", "a", "b", "b")) + + handler := BackupHandler(func(prev, cur context.Context) (ctx context.Context, backup bool) { + if v := cur.Value(CtxKeyTest1); v == nil { + v = prev.Value(CtxKeyTest1) + if v != nil { + ctx = context.WithValue(cur, CtxKeyTest1, v) + } else { + ctx = cur + } + return ctx, true + } + return cur, false + }) + type args struct { + ctx context.Context + handler BackupHandler + } + tests := []struct { + name string + args args + want context.Context + }{ + { + name: "triggered", + args: args{ + ctx: metainfo.WithValue(metainfo.WithPersistentValue(context.Background(), "a", "aa"), "b", "bb"), + handler: handler, + }, + want: metainfo.WithPersistentValues(ctx, "a", "aa", "b", "b"), + }, + { + name: "not triggered", + args: args{ + ctx: metainfo.WithValue(metainfo.WithPersistentValue(ctx, "a", "aa"), "b", "bb"), + handler: handler, + }, + want: metainfo.WithValue(metainfo.WithPersistentValue(ctx, "a", "aa"), "b", "bb"), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := RecoverCtxOnDemands(tt.args.ctx, tt.args.handler); got != nil { + if v := got.Value(CtxKeyTest1); v == nil { + t.Errorf("not got CtxKeyTest1") + } + a, _ := metainfo.GetPersistentValue(got, "a") + ae, _ := metainfo.GetPersistentValue(tt.want, "a") + if a != ae { + t.Errorf("CurSession() = %v, want %v", a, ae) + } + b, _ := metainfo.GetPersistentValue(got, "b") + be, _ := metainfo.GetPersistentValue(tt.want, "b") + if b != be { + t.Errorf("CurSession() = %v, want %v", b, be) + } + } else { + t.Fatal("no got") + } + }) + } +} diff --git a/example_test.go b/example_test.go index 995e5bc..aa3b344 100644 --- a/example_test.go +++ b/example_test.go @@ -36,24 +36,24 @@ func GetCurSession() Session { } func ExampleSessionCtx_EnableImplicitlyTransmitAsync() { - // rest DefaultManager with new Options - ResetDefaultManager(ManagerOptions{ - ShardNumber: 10, + // EnableImplicitlyTransmitAsync must be true + InitDefaultManager(ManagerOptions{ + ShardNumber: 10, EnableImplicitlyTransmitAsync: true, - GCInterval: time.Hour, + GCInterval: time.Hour, }) - // WARNING: pprof.Do() must be called before BindSession(), + // WARNING: pprof.Do() must be called before BindSession(), // otherwise transparently transmitting session will be dysfunctional labels := pprof.Labels("c", "d") - pprof.Do(context.Background(), labels, func(ctx context.Context){}) - + pprof.Do(context.Background(), labels, func(ctx context.Context) {}) + s := NewSessionMap(map[interface{}]interface{}{ "a": "b", }) BindSession(s) - // WARNING: pprof.Do() must be called before BindSession(), + // WARNING: pprof.Do() must be called before BindSession(), // otherwise transparently transmitting session will be dysfunctional // labels := pprof.Labels("c", "d") // pprof.Do(context.Background(), labels, func(ctx context.Context){}) @@ -83,6 +83,9 @@ func ExampleSessionCtx_EnableImplicitlyTransmitAsync() { } func ExampleSessionCtx() { + // initialize default manager first + InitDefaultManager(DefaultManagerOptions()) + var ctx = context.Background() var key, v = "a", "b" var key2, v2 = "c", "d" @@ -90,7 +93,7 @@ func ExampleSessionCtx() { var sig2 = make(chan struct{}) // initialize new session with context - var session = NewSessionCtx(ctx)// implementation... + var session = NewSessionCtx(ctx) // implementation... // set specific key-value and update session start := session.WithValue(key, v) @@ -99,20 +102,20 @@ func ExampleSessionCtx() { BindSession(start) // pass to new goroutine... - Go(func(){ + Go(func() { // read specific key under current session val := GetCurSession().Get(key) // val exists ASSERT(val == v) // doSomething.... - + // set specific key-value under current session // NOTICE: current session won't change here next := GetCurSession().WithValue(key2, v2) val2 := GetCurSession().Get(key2) // val2 == nil ASSERT(val2 == nil) - + // pass both parent session and new session to sub goroutine - GoSession(next, func(){ + GoSession(next, func() { // read specific key under current session val := GetCurSession().Get(key) // val exists ASSERT(val == v) @@ -120,16 +123,16 @@ func ExampleSessionCtx() { val2 := GetCurSession().Get(key2) // val2 exists ASSERT(val2 == v2) // doSomething.... - + sig2 <- struct{}{} - <- sig + <-sig ASSERT(GetCurSession().IsValid() == false) // current session is invalid - + println("g2 done") sig2 <- struct{}{} }) - + Go(func() { // read specific key under current session val := GetCurSession().Get(key) // val exists @@ -141,29 +144,29 @@ func ExampleSessionCtx() { sig2 <- struct{}{} - <- sig + <-sig ASSERT(GetCurSession().IsValid() == false) // current session is invalid println("g3 done") sig2 <- struct{}{} }) - + BindSession(next) val2 = GetCurSession().Get(key2) // val2 exists ASSERT(v2 == val2) sig2 <- struct{}{} - <- sig + <-sig ASSERT(next.IsValid() == false) // next is invalid println("g1 done") sig2 <- struct{}{} }) - <- sig2 - <- sig2 - <- sig2 + <-sig2 + <-sig2 + <-sig2 val2 := GetCurSession().Get(key2) // val2 == nil ASSERT(val2 == nil) @@ -174,11 +177,11 @@ func ExampleSessionCtx() { close(sig) ASSERT(start.IsValid() == false) // start is invalid - - <- sig2 - <- sig2 - <- sig2 + + <-sig2 + <-sig2 + <-sig2 println("g0 done") UnbindSession() -} \ No newline at end of file +} diff --git a/gls.go b/gls.go index 9cc8a58..2ed7dbf 100644 --- a/gls.go +++ b/gls.go @@ -24,10 +24,12 @@ import ( ) // SESSION_CONFIG_KEY is the env key for configuring default session manager. -// Value format: [EnableImplicitlyTransmitAsync][,ShardNumber][,GCInterval] -// - EnableImplicitlyTransmitAsync: 'true' means enabled, otherwist means disabled -// - ShardNumber: integer > 0 -// - GCInterval: Golang time.Duration format, such as '1h' means one hour +// +// Value format: [EnableImplicitlyTransmitAsync][,ShardNumber][,GCInterval] +// - EnableImplicitlyTransmitAsync: 'true' means enabled, otherwist means disabled +// - ShardNumber: integer > 0 +// - GCInterval: Golang time.Duration format, such as '10m' means ten minutes for each GC +// // Once the key is set, default option values will be set if the option value doesn't exist. const SESSION_CONFIG_KEY = "CLOUDWEGO_SESSION_CONFIG_KEY" @@ -36,36 +38,40 @@ var ( defaultManagerOnce sync.Once ) -func init() { - obj := NewSessionManager(DefaultManagerOptions()) - defaultManagerObj = &obj -} - -// DefaultManagerOptions returns default options for the default manager +// DefaultManagerOptions returns default options for the default manager func DefaultManagerOptions() ManagerOptions { return ManagerOptions{ - ShardNumber: 100, - GCInterval: time.Hour, + ShardNumber: 100, + GCInterval: time.Minute * 10, EnableImplicitlyTransmitAsync: false, } } -// ResetDefaultManager update and restart manager, -// which means previous sessions (if any) will be cleared. +// InitDefaultManager update and restart default manager. // It accept argument opts and env config both. // -// NOTICE: +// NOTICE: // - It use env SESSION_CONFIG_KEY prior to argument opts; // - If both env and opts are empty, it won't reset manager; // - For concurrent safety, you can only successfully reset manager ONCE. // //go:nocheckptr -func ResetDefaultManager(opts ManagerOptions) { - // check env first +func InitDefaultManager(opts ManagerOptions) { + defaultManagerOnce.Do(func() { + // env config has high priority + checkEnvOptions(&opts) + + if defaultManagerObj != nil { + defaultManagerObj.Close() + } + obj := NewSessionManager(opts) + defaultManagerObj = &obj + }) +} + +func checkEnvOptions(opts *ManagerOptions) { if env := os.Getenv(SESSION_CONFIG_KEY); env != "" { envs := strings.Split(env, ",") - opts = DefaultManagerOptions() - // parse first option as EnableTransparentTransmitAsync if strings.ToLower(envs[0]) == "true" { opts.EnableImplicitlyTransmitAsync = true @@ -77,7 +83,7 @@ func ResetDefaultManager(opts ManagerOptions) { opts.ShardNumber = opt } } - + // parse third option as EnableTransparentTransmitAsync if len(envs) > 2 { if d, err := time.ParseDuration(envs[2]); err == nil && d > time.Second { @@ -85,33 +91,40 @@ func ResetDefaultManager(opts ManagerOptions) { } } } - - defaultManagerOnce.Do(func() { - if defaultManagerObj != nil { - defaultManagerObj.Close() - } - obj := NewSessionManager(opts) - defaultManagerObj = &obj - }) } // CurSession gets the session for current goroutine +// +// NOTICE: MUST call `InitDefaultManager()` once before using this API func CurSession() (Session, bool) { + if defaultManagerObj == nil { + return nil, false + } s, ok := defaultManagerObj.GetSession(SessionID(goID())) return s, ok } // BindSession binds the session with current goroutine +// +// NOTICE: MUST call `InitDefaultManager()` once before using this API func BindSession(s Session) { + if defaultManagerObj == nil { + return + } defaultManagerObj.BindSession(SessionID(goID()), s) } // UnbindSession unbind a session (if any) with current goroutine // -// Notice: If you want to end the session, +// NOTICE: If you want to end the session, // please call `Disable()` (or whatever make the session invalid) // on your session's implementation +// +// NOTICE: MUST call `InitDefaultManager()` once before using this API func UnbindSession() { + if defaultManagerObj == nil { + return + } defaultManagerObj.UnbindSession(SessionID(goID())) } @@ -127,7 +140,7 @@ func Go(f func()) { // SessionGo calls f asynchronously and pass s session to the new goroutine func GoSession(s Session, f func()) { - go func(){ + go func() { defer func() { if v := recover(); v != nil { println(fmt.Sprintf("GoSession recover: %v", v)) diff --git a/go.mod b/go.mod index 1cb1f54..76b3e92 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module github.com/cloudwego/localsession go 1.16 require ( + github.com/bytedance/gopkg v0.0.0-20230728082804-614d0af6619b github.com/modern-go/gls v0.0.0-20220109145502-612d0167dce5 github.com/modern-go/reflect2 v1.0.2 // indirect github.com/stretchr/testify v1.7.0 diff --git a/go.sum b/go.sum index f7c8a37..e7f87cf 100644 --- a/go.sum +++ b/go.sum @@ -1,3 +1,5 @@ +github.com/bytedance/gopkg v0.0.0-20230728082804-614d0af6619b h1:R6PWoQtxEMpWJPHnpci+9LgFxCS7iJCfOGBvCgZeTKI= +github.com/bytedance/gopkg v0.0.0-20230728082804-614d0af6619b/go.mod h1:FtQG3YbQG9L/91pbKSw787yBQPutC+457AvDW77fgUQ= github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/modern-go/gls v0.0.0-20220109145502-612d0167dce5 h1:uiS4zKYKJVj5F3ID+5iylfKPsEQmBEOucSD9Vgmn0i0= @@ -9,6 +11,15 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +golang.org/x/net v0.0.0-20221014081412-f15817d10f9b h1:tvrvnPFcdzp294diPnrdZZZ8XUt2Tyj7svb7X52iDuU= +golang.org/x/net v0.0.0-20221014081412-f15817d10f9b/go.mod h1:YDH+HFinaLZZlnHAfSS6ZXJJ9M9t4Dl22yv3iI2vPwk= +golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= +golang.org/x/text v0.3.7 h1:olpwvP2KacW1ZWvsR7uQhoyTYvKAupfQrRGBFM352Gk= +golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= diff --git a/manager.go b/manager.go index 3d1aeba..7dfa000 100644 --- a/manager.go +++ b/manager.go @@ -22,17 +22,17 @@ import ( // ManagerOptions for SessionManager type ManagerOptions struct { - // EnableImplicitlyTransmitAsync enables transparently transmit + // EnableImplicitlyTransmitAsync enables transparently transmit // current session to children goroutines // - // WARNING: Once this option enables, `pprof.Do()` must be called before `BindSession()`, + // WARNING: Once this option enables, if you want to use `pprof.Do()`, it must be called before `BindSession()`, // otherwise transmitting will be dysfunctional EnableImplicitlyTransmitAsync bool - // ShardNumber is used to shard session id, it must be larger than zero + // ShardNumber is used to shard session id, it must be larger than zero ShardNumber int - // GCInterval decides the GC interval for SessionManager, + // GCInterval decides the GC interval for SessionManager, // it must be larger than 1s or zero means disable GC GCInterval time.Duration } @@ -47,7 +47,7 @@ type SessionManager struct { shards []*shard inGC uint32 tik *time.Ticker - opts ManagerOptions + opts ManagerOptions } var defaultShardCap = 10 @@ -59,7 +59,7 @@ func newShard() *shard { } // NewSessionManager creates a SessionManager with default containers -// If opts.GCInterval > 0, it will start scheduled GC() loop automatically +// If opts.GCInterval > 0, it will start scheduled GC() loop automatically func NewSessionManager(opts ManagerOptions) SessionManager { if opts.ShardNumber <= 0 { panic("ShardNumber must be larger than zero") @@ -70,7 +70,7 @@ func NewSessionManager(opts ManagerOptions) SessionManager { } ret := SessionManager{ shards: shards, - opts : opts, + opts: opts, } if opts.GCInterval > 0 { @@ -107,7 +107,7 @@ func (s *shard) Delete(id SessionID) { s.lock.Unlock() } -// Get gets specific session +// Get gets specific session // or get inherited session if option EnableImplicitlyTransmitAsync is true func (self *SessionManager) GetSession(id SessionID) (Session, bool) { shard := self.shards[uint64(id)%uint64(self.opts.ShardNumber)] @@ -118,7 +118,7 @@ func (self *SessionManager) GetSession(id SessionID) (Session, bool) { if !self.opts.EnableImplicitlyTransmitAsync { return nil, false } - + id, ok = getSessionID() if !ok { return nil, false @@ -130,7 +130,7 @@ func (self *SessionManager) GetSession(id SessionID) (Session, bool) { // BindSession binds the session with current goroutine func (self *SessionManager) BindSession(id SessionID, s Session) { shard := self.shards[uint64(id)%uint64(self.opts.ShardNumber)] - + shard.Store(id, s) if self.opts.EnableImplicitlyTransmitAsync { @@ -140,17 +140,17 @@ func (self *SessionManager) BindSession(id SessionID, s Session) { // UnbindSession clears current session // -// Notice: If you want to end the session, +// Notice: If you want to end the session, // please call `Disable()` (or whatever make the session invalid) // on your session's implementation func (self *SessionManager) UnbindSession(id SessionID) { shard := self.shards[uint64(id)%uint64(self.opts.ShardNumber)] - + _, ok := shard.Load(id) if ok { shard.Delete(id) } - + if self.opts.EnableImplicitlyTransmitAsync { clearSessionID() } @@ -181,7 +181,6 @@ func (self SessionManager) GC() { atomic.StoreUint32(&self.inGC, 0) } - // startGC start a scheduled goroutine to call GC() according to GCInterval func (self *SessionManager) startGC() { if self.opts.GCInterval < time.Second { diff --git a/session.go b/session.go index 39fca0f..2b647b8 100644 --- a/session.go +++ b/session.go @@ -23,17 +23,17 @@ import ( // Session represents a local storage for one session type Session interface { - // IsValid tells if the session is valid at present - IsValid() bool + // IsValid tells if the session is valid at present + IsValid() bool - // Get returns value for specific key - Get(key interface{}) interface{} - - // WithValue sets value for specific key,and return newly effective session - WithValue(key interface{}, val interface{}) Session + // Get returns value for specific key + Get(key interface{}) interface{} + + // WithValue sets value for specific key,and return newly effective session + WithValue(key interface{}, val interface{}) Session } -// SessionCtx implements Session with context, +// SessionCtx implements Session with context, // which means children session WON'T affect parent and sibling sessions type SessionCtx struct { enabled *atomic.Value @@ -41,10 +41,10 @@ type SessionCtx struct { } // NewSessionCtx creates and enables a SessionCtx -func NewSessionCtx(ctx context.Context) *SessionCtx { +func NewSessionCtx(ctx context.Context) SessionCtx { var enabled atomic.Value enabled.Store(true) - return &SessionCtx{ + return SessionCtx{ enabled: &enabled, storage: ctx, } @@ -52,10 +52,10 @@ func NewSessionCtx(ctx context.Context) *SessionCtx { // NewSessionCtx creates and enables a SessionCtx, // and disable the session after timeout -func NewSessionCtxWithTimeout(ctx context.Context, timeout time.Duration) *SessionCtx { +func NewSessionCtxWithTimeout(ctx context.Context, timeout time.Duration) SessionCtx { ret := NewSessionCtx(ctx) go func() { - <- time.NewTimer(timeout).C + <-time.NewTimer(timeout).C ret.Disable() }() return ret @@ -66,32 +66,31 @@ func (self SessionCtx) Disable() { self.enabled.Store(false) } +// Export exports underlying context +func (self SessionCtx) Export() context.Context { + return self.storage +} + // IsValid tells if the session is valid at present -func (self *SessionCtx) IsValid() bool { - if self == nil { - return false - } +func (self SessionCtx) IsValid() bool { return self.enabled.Load().(bool) } // Get value for specific key -func (self *SessionCtx) Get(key interface{}) interface{} { - if self == nil { - return nil - } +func (self SessionCtx) Get(key interface{}) interface{} { return self.storage.Value(key) } // Set value for specific key,and return newly effective session func (self SessionCtx) WithValue(key interface{}, val interface{}) Session { ctx := context.WithValue(self.storage, key, val) - return &SessionCtx{ + return SessionCtx{ enabled: self.enabled, storage: ctx, } } -// NewSessionMap implements Session with map, +// NewSessionMap implements Session with map, // which means children session WILL affect parent session and sibling sessions type SessionMap struct { enabled *atomic.Value @@ -114,7 +113,7 @@ func NewSessionMap(m map[interface{}]interface{}) *SessionMap { func NewSessionMapWithTimeout(m map[interface{}]interface{}, timeout time.Duration) *SessionMap { ret := NewSessionMap(m) go func() { - <- time.NewTimer(timeout).C + <-time.NewTimer(timeout).C ret.Disable() }() return ret @@ -130,9 +129,26 @@ func (self *SessionMap) IsValid() bool { // Disable ends the session func (self *SessionMap) Disable() { + if self == nil { + return + } self.enabled.Store(false) } +// Export COPIES and exports underlying map +func (self *SessionMap) Export() map[interface{}]interface{} { + if self == nil { + return nil + } + m := make(map[interface{}]interface{}, len(self.storage)) + self.lock.RLock() + for k, v := range self.storage { + m[k] = v + } + self.lock.RUnlock() + return m +} + // Get value for specific key func (self *SessionMap) Get(key interface{}) interface{} { if self == nil { @@ -146,6 +162,9 @@ func (self *SessionMap) Get(key interface{}) interface{} { // Set value for specific key,and return itself func (self *SessionMap) WithValue(key interface{}, val interface{}) Session { + if self == nil { + return nil + } self.lock.Lock() self.storage[key] = val self.lock.Unlock()