Skip to content
This repository has been archived by the owner on Nov 20, 2020. It is now read-only.

Commit

Permalink
Implement LimitBy
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Nov 4, 2018
1 parent 0f40396 commit f8e00a3
Show file tree
Hide file tree
Showing 12 changed files with 463 additions and 63 deletions.
40 changes: 27 additions & 13 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,18 @@ CallPolly wraps Polly to provide:

## Non-goals

- low level policy implementations should live elsewhere (see [CONTRIBUTION notes](#contribution-notes))
- the core CallPolly library faciltates, but should never bind _directly_ to any specific log or metrics emission sink
- low level policy implementations should live elsewhere (see [CONTRIBUTION notes](#contribution-notes)) - _[e.g., `BulkheadMulti` needs to move out](https://github.com/App-vNext/Polly/issues/507)_
- the core CallPolly library facilitates, but should never bind _directly_ to any specific log or metrics emission sink

# Dependencies

The core library extends [`Polly`](https://github.com/App-vNext/Polly) and is intended to work based on `netstandard20`
The core library extends [`Polly`](https://github.com/App-vNext/Polly) and is intended to support running on `netstandard2.0` and `net461`.

For reasons of code clarity and performance, a core secondary dependency is [`Serilog`](https://github.com/serilog/serilog); the pervasiveness and low dependency nature of Serilog and the [practical unlimited interop with other loggers/targets/sinks](https://github.com/serilog/serilog/wiki/Provided-Sinks) is considered enough of a win to make this a hard dependency _e.g., if your logger is NLog, it's 2 lines of code to [forward to it](https://www.nuget.org/packages/serilog.sinks.nlog) with minimal perf cost over CallPolly binding to that directly_.

Being written in F#, there's a dependency on `FSharp.Core`.
Being written in F#, there's a dependency on `FSharp.Core` (v4.5 for `netstandard2.0`, or anything >= `3.1.2.5` / F# 3.1 as present in VS 2012 if you're targeting `net461`).

The tests [`xUnit.net`](https://github.com/xunit/xunit), [`FSCheck.xUnit`](https://github.com/fscheck/FsCheck), [`Unquote`](https://github.com/SwensenSoftware/unquote) and [`Serilog.Sinks.Seq`](https://github.com/serilog/serilog-sinks-seq) (to view, see https://getseq.net, which provides a free single user license for clearer insight into log traces).
The tests use [`xUnit.net`](https://github.com/xunit/xunit), [`FSCheck.xUnit`](https://github.com/fscheck/FsCheck), [`Unquote`](https://github.com/SwensenSoftware/unquote) and [`Serilog.Sinks.Seq`](https://github.com/serilog/serilog-sinks-seq) (to view, see https://getseq.net, which provides a free single user license for clearer insight into log traces).

The acceptance tests add a reliance on [`Newtonsoft.Json`](https://github.com/JamesNK/Newtonsoft.Json).

Expand All @@ -50,7 +50,7 @@ In service of this, the assumption is that most extensions to CallPolly should l

Yes, there should be a real README with real examples; we'll get there :sweat_smile:

See the [acceptance tests](https://github.com/jet/CallPolly/blob/master/tests/CallPolly.Acceptance/Orchestration.fs#L142) for behavior implied by this configuration:
See the [acceptance tests](https://github.com/jet/CallPolly/blob/master/tests/CallPolly.Acceptance/Scenarios.fs) for behavior implied by this configuration:
```
{ "services": {
Expand All @@ -62,10 +62,10 @@ See the [acceptance tests](https://github.com/jet/CallPolly/blob/master/tests/Ca
"defaultPolicy": null,
"policies": {
"quick": [
{ "rule": "Cutoff", "timeoutMs": 1000, "slaMs": 500 }
{ "rule": "Cutoff", "timeoutMs": 1000, "slaMs": 500 }
],
"slow": [
{ "rule": "Cutoff", "timeoutMs": 10000, "slaMs": 5000 }
{ "rule": "Cutoff", "timeoutMs": 10000, "slaMs": 5000 }
]
}
},
Expand All @@ -78,10 +78,10 @@ See the [acceptance tests](https://github.com/jet/CallPolly/blob/master/tests/Ca
"defaultPolicy": null,
"policies": {
"default": [
{ "rule": "Limit", "maxParallel": 10, "maxQueue": 3 }
{ "rule": "Limit", "maxParallel": 10, "maxQueue": 3 }
],
"looser": [
{ "rule": "Limit", "maxParallel": 100, "maxQueue": 300 }
{ "rule": "Limit", "maxParallel": 100, "maxQueue": 300 }
],
"defaultBroken": [
{ "rule": "Isolate" }
Expand All @@ -95,11 +95,25 @@ See the [acceptance tests](https://github.com/jet/CallPolly/blob/master/tests/Ca
"defaultPolicy": null,
"policies": {
"default": [
{ "rule": "Limit", "maxParallel": 2, "maxQueue": 8 },
{ "rule": "Break", "windowS": 5, "minRequests": 10, "failPct": 20, "breakS": 1 }
{ "rule": "Limit", "maxParallel": 2, "maxQueue": 8 },
{ "rule": "Break", "windowS": 5, "minRequests": 10, "failPct": 20, "breakS": 1 },
{ "rule": "Uri", "base": "https://upstreamb" },
{ "rule": "Log", "req": "Always", "res": "Always" }
]
}
},
"upstreamC": {
"calls": {},
"defaultPolicy": "default",
"policies": {
"default": [
{ "rule": "Limit", "maxParallel": 10, "maxQueue": 20 },
{ "rule": "LimitBy","maxParallel": 2, "maxQueue": 4, "tag": "clientIp" },
{ "rule": "LimitBy","maxParallel": 2, "maxQueue": 4, "tag": "clientDomain" },
{ "rule": "LimitBy","maxParallel": 2, "maxQueue": 4, "tag": "clientType" }
]
}
}
}
```
```
66 changes: 66 additions & 0 deletions src/CallPolly/BulkheadMulti.fs
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
module private CallPolly.BulkheadMulti

open Polly
open System
open System.Collections.Generic
open System.Threading
open System.Threading.Tasks

type RefCounted<'T> = { mutable refCount: int; value: 'T }

// via https://stackoverflow.com/a/31194647/11635
type private BulkheadPool(gen : unit -> Bulkhead.BulkheadPolicy) =
let inners: Dictionary<string, RefCounted<Bulkhead.BulkheadPolicy>> = Dictionary()

let getOrCreateSlot key =
lock inners <| fun () ->
match inners.TryGetValue key with
| true, inner ->
inner.refCount <- inner.refCount + 1
inner.value
| false, _ ->
let value = gen ()
inners.[key] <- { refCount = 1; value = value }
value
let slotReleaseGuard key : IDisposable =
{ new System.IDisposable with
member __.Dispose() =
lock inners <| fun () ->
let item = inners.[key]
match item.refCount with
| 1 -> inners.Remove key |> ignore
| current -> item.refCount <- current - 1 }
member __.Execute k f = async {
let x = getOrCreateSlot k
use _ = slotReleaseGuard k
return! f x }
member __.DumpState() : IDictionary<string,Bulkhead.BulkheadPolicy> =
lock inners <| fun () ->
dict <| seq { for KeyValue(k, { value = v }) in inners -> k,v }

type BulkheadMultiAsyncPolicy private
( tag : string,
locks: BulkheadPool,
asyncExecutionPolicy : Func<Func<Context, CancellationToken, Task>, Context, CancellationToken, bool, Task>) =
inherit Polly.Policy(asyncExecutionPolicy, Seq.empty)
new(tag, maxParallelization, maxQueuingActions, tryGetTagValue, onBulkheadRejected) =
let mkBulkheadForTagValue () = Policy.BulkheadAsync(maxParallelization, maxQueuingActions, fun ctx -> onBulkheadRejected ctx; Task.CompletedTask)
let locks = BulkheadPool(mkBulkheadForTagValue)
let run (inner: Func<Context,CancellationToken,Task>) (ctx: Context) (ct: CancellationToken) (continueOnCapturedContext: bool) : Task =
match tryGetTagValue ctx with
| None -> inner.Invoke(ctx, ct)
| Some tagVal ->
let startInnerTask (ctx: Context) (ct: CancellationToken) =
inner.Invoke(ctx,ct)
let executeInner (bulkhead: Polly.Bulkhead.BulkheadPolicy) : Async<unit> =
bulkhead.ExecuteAsync(Func<Context,CancellationToken,Task> startInnerTask, ctx, ct, continueOnCapturedContext)
|> Async.AwaitTaskCorrect
Async.StartAsTask(locks.Execute tagVal executeInner, cancellationToken=ct) :> _
new BulkheadMultiAsyncPolicy(tag, locks, Func<Func<Context,CancellationToken,Task>,Context,CancellationToken,bool,_> run)
member __.DumpState() : IDictionary<string,Bulkhead.BulkheadPolicy> = locks.DumpState()
member __.Tag = tag

type Policy =
/// Placeholder impl of https://github.com/App-vNext/Polly/issues/507
static member BulkheadMultiAsync(tag, maxParallelization : int, maxQueuingActions : int, tryGetTagValue : Context -> string option, onBulkheadRejected: Context -> unit) =
BulkheadMultiAsyncPolicy(tag, maxParallelization, maxQueuingActions, tryGetTagValue, onBulkheadRejected)
2 changes: 2 additions & 0 deletions src/CallPolly/CallPolly.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@
<IsTestProject>false</IsTestProject>
<DisableImplicitFSharpCoreReference>true</DisableImplicitFSharpCoreReference>
<DisableImplicitSystemValueTupleReference>true</DisableImplicitSystemValueTupleReference>
<VersionPrefix>0.0.6</VersionPrefix>
</PropertyGroup>

<ItemGroup>
<Compile Include="Infrastructure.fs" />
<Compile Include="BulkheadMulti.fs" />
<Compile Include="Events.fs" />
<Compile Include="Rules.fs" />
<Compile Include="Config.fs" />
Expand Down
15 changes: 14 additions & 1 deletion src/CallPolly/Config.fs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ module Policy =
[<JsonProperty(Required=Required.DisallowNull)>]
dryRun: bool option }

[<NoComparison; RequireQualifiedAccess>]
[<JsonObject(ItemRequired=Required.Always)>]
type LimitByInput =
{ tag: string; maxParallel: int; maxQueue: int }

[<NoComparison; RequireQualifiedAccess>]
[<JsonObject(ItemRequired=Required.Always)>]
type CutoffInput =
Expand All @@ -40,6 +45,7 @@ module Policy =
type Value =
| Break of BreakInput
| Limit of LimitInput
| LimitBy of LimitByInput
| Cutoff of CutoffInput
| Isolate

Expand All @@ -48,6 +54,7 @@ module Policy =
type Rule =
| Break of Rules.BreakerConfig
| Limit of Rules.BulkheadConfig
| LimitBy of Rules.TaggedBulkheadConfig
| Cutoff of Rules.CutoffConfig
| Isolate

Expand All @@ -65,6 +72,11 @@ module Policy =
dop = x.maxParallel
queue = x.maxQueue
dryRun = defaultArg x.dryRun false }
| Input.Value.LimitBy x ->
Rule.LimitBy {
dop = x.maxParallel
queue = x.maxQueue
tag = x.tag }
| Input.Value.Cutoff ({ timeoutMs=TimeSpanMs timeout } as x) ->
Rule.Cutoff {
timeout = timeout
Expand All @@ -76,8 +88,9 @@ module Policy =
| Rule.Isolate -> { s with isolate = true }
| Rule.Break breakerConfig -> { s with breaker = Some breakerConfig }
| Rule.Limit bulkheadConfig -> { s with limit = Some bulkheadConfig }
| Rule.LimitBy taggedBulkheadConfig -> { s with taggedLimits = s.taggedLimits @ [taggedBulkheadConfig] }
| Rule.Cutoff cutoffConfig -> { s with cutoff = Some cutoffConfig }
Seq.fold folder { isolate = false; cutoff = None; limit = None; breaker = None }
Seq.fold folder { isolate = false; cutoff = None; limit = None; taggedLimits = []; breaker = None }
let ofInputs xs = xs |> Seq.map interpret |> fold

module Http =
Expand Down
53 changes: 43 additions & 10 deletions src/CallPolly/Events.fs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
module CallPolly.Events

open System
open System.Collections.Generic

/// Represents a time measurement of a computation that includes stopwatch tick metadata
[<NoComparison; NoEquality>]
Expand All @@ -15,23 +16,36 @@ module Constants =
let [<Literal>] EventPropertyName = "cpe"

type BreakerParams = { window: TimeSpan; minThroughput: int; errorRateThreshold: float }
type BulkheadParams = { dop: int; queue: int }

[<NoComparison>]
type BulkheadParams =
| Any of GlobalBulkheadParams
| Tagged of TaggedBulkheadParams
and [<NoComparison>] GlobalBulkheadParams = { dop: int; queue: int }
and [<NoComparison>] TaggedBulkheadParams = { tag: string; dop: int; queue: int }

[<NoComparison; NoEquality>]
type CutoffParams = { timeout: TimeSpan; sla: Nullable<TimeSpan> }

[<NoComparison; NoEquality>]
type Event =
| Isolated of service: string * call: string
| Broken of service: string * call: string * config: BreakerParams
| Deferred of service: string * call: string * interval: StopwatchInterval
| Shed of service: string * call: string * config: BulkheadParams
| Deferred of service: string * call: string * tags: IReadOnlyDictionary<string,string> * interval: StopwatchInterval
| Shed of service: string * call: string * tags: IReadOnlyDictionary<string,string> * config: BulkheadParams
| Breached of service: string * call: string * sla: TimeSpan * interval: StopwatchInterval
| Canceled of service: string * call: string * config: CutoffParams * interval: StopwatchInterval
override __.ToString() = "(Metrics)"

module internal Log =
open Serilog.Events

// Serilog will grab [only] real Dictionaries (not IDictionary or IReadOnlyDictionary), so we need to wrap them like this for them to be captured as maps
let private makePropertyForCapturableAsMap (rod: IReadOnlyDictionary<_,_>) : Dictionary<_,_> =
let d = Dictionary()
for KeyValue (k,v) in rod do d.Add(k,v)
d

/// Attach a property to the log context to hold the metrics
// Sidestep Log.ForContext converting to a string; see https://github.com/serilog/serilog/issues/1124
let private forEvent (value : Event) (log : Serilog.ILogger) =
Expand Down Expand Up @@ -65,15 +79,34 @@ module internal Log =
// - in others, the call will eventually fall victim to shedding
let queuing (service: string, call:string) (log : Serilog.ILogger) =
log.Information("Bulkhead Queuing likely for {service:l}-{call:l}", service, call)
let deferral (service: string, call:string, policy:string) (interval : StopwatchInterval) (concurrencyLimit : int) (log: Serilog.ILogger) =
let lfe = log |> forEvent (Deferred (service, call, interval))
lfe.Warning("Bulkhead Delayed {service:l}-{call:l} for {timespan} due to concurrency limit of {maxParallel} in {policy:l}",
service, call, interval.Elapsed, concurrencyLimit, policy)
let shedding (service: string, call:string, policy:string) (config:BulkheadParams) (log : Serilog.ILogger) =
let lfe = log |> forEvent (Shed (service, call, config))
lfe.Warning("Bulkhead Shedding for {service:l}-{call:l} based on {policy:l}: {@bulkheadConfig}", service, call, policy, config)
// Deferral can happen due to either a Limit or a LimitBy
let deferral
(service: string, call:string, policy:string, configLimits: IReadOnlyDictionary<string,int>)
(interval: StopwatchInterval)
(tags: IReadOnlyDictionary<string,string>)
(log: Serilog.ILogger) =
let lfe = log |> forEvent (Deferred (service,call,tags,interval))
match tags with
| dict when dict.Count = 0 ->
lfe.Warning("Bulkhead Delayed {service:l}-{call:l} for {timespan} due to {@maxParallel} in {policy:l}",
service, call, interval.Elapsed, makePropertyForCapturableAsMap configLimits, policy)
| tags ->
lfe.Warning("Bulkhead Delayed {service:l}-{call:l} for {timespan} due to {@maxParallel} with {@tags} in {policy:l}",
service, call, interval.Elapsed, makePropertyForCapturableAsMap configLimits, makePropertyForCapturableAsMap tags, policy)
// Shedding can happen due to either a Limit or a LimitBy
let shedding (service: string, call: string, policy: string) (tags: IReadOnlyDictionary<string,string>, config:BulkheadParams) (log : Serilog.ILogger) =
let lfe = log |> forEvent (Shed (service,call,tags,config))
let flatConfig = match config with Any a -> box a | Tagged t -> box t
match tags with
| dict when dict.Count = 0 ->
lfe.Warning("Bulkhead Shedding for {service:l}-{call:l} based on {policy:l}: {@bulkheadConfig}",
service, call, policy, flatConfig)
| tags ->
lfe.Warning("Bulkhead Shedding for {service:l}-{call:l} with {@tags} based on {policy:l}: {@bulkheadConfig}",
service, call, makePropertyForCapturableAsMap tags, policy, flatConfig)
let queuingDryRun (service: string, call:string) (log : Serilog.ILogger) =
log.ForContext("dryRun",true).Information("Bulkhead DRYRUN Queuing for {service:l}-{call:l}", service, call)
// Only implemented for Limit, not LimitBy
let sheddingDryRun (service: string, call:string) (log : Serilog.ILogger) =
log.ForContext("dryRun",true).Warning("Bulkhead DRYRUN Shedding for {service:l}-{call:l}", service, call)

Expand Down
21 changes: 20 additions & 1 deletion src/CallPolly/Infrastructure.fs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ type TimeSpan with
type Async with

/// <summary>
/// Gets the result of given task so that in the event of exception
/// Awaits the Result of given Task so that in the event of exception
/// the actual user exception is raised as opposed to being wrapped
/// in a System.AggregateException.
/// </summary>
Expand All @@ -29,4 +29,23 @@ type Async with
else ec e
elif t.IsCanceled then ec(new System.Threading.Tasks.TaskCanceledException())
else sc t.Result)
|> ignore)
/// <summary>
/// Awaits the provided Task such that in the event of an exception
/// the actual user exception is raised as opposed to being wrapped
/// in a System.AggregateException.
/// </summary>
/// <param name="task">Task to be awaited.</param>
[<DebuggerStepThrough>]
static member AwaitTaskCorrect(task : System.Threading.Tasks.Task) : Async<unit> =
Async.FromContinuations(fun (sc,ec,cc) ->
task.ContinueWith(fun (task:System.Threading.Tasks.Task) ->
if task.IsFaulted then
let e = task.Exception
if e.InnerExceptions.Count = 1 then ec e.InnerExceptions.[0]
else ec e
elif task.IsCanceled then
ec(System.Threading.Tasks.TaskCanceledException())
else
sc ())
|> ignore)
2 changes: 2 additions & 0 deletions src/CallPolly/Parser.fs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ and [<NoComparison>]
| Isolate
| Break of Config.Policy.Input.BreakInput
| Limit of Config.Policy.Input.LimitInput
| LimitBy of Config.Policy.Input.LimitByInput
| Cutoff of Config.Policy.Input.CutoffInput

(* Config.Http.Input.Value *)
Expand Down Expand Up @@ -117,6 +118,7 @@ let parseInternal defsJson : ParsedService [] =
| Input.Isolate -> yield ParsedRule.Policy Config.Policy.Input.Value.Isolate
| Input.Break x -> yield ParsedRule.Policy (Config.Policy.Input.Value.Break x)
| Input.Limit x -> yield ParsedRule.Policy (Config.Policy.Input.Value.Limit x)
| Input.LimitBy x -> yield ParsedRule.Policy (Config.Policy.Input.Value.LimitBy x)
| Input.Cutoff x -> yield ParsedRule.Policy (Config.Policy.Input.Value.Cutoff x)

| Input.Unknown x -> yield ParsedRule.Unknown x
Expand Down
Loading

0 comments on commit f8e00a3

Please sign in to comment.