From 6666f65858949e204c7b303324651cecb29880f5 Mon Sep 17 00:00:00 2001 From: Andrei Zavada Date: Thu, 28 Apr 2022 16:39:25 +0300 Subject: [PATCH 01/15] explicit add_path in schema, for cuttlefish on osx which is funky --- priv/riak_kv.schema | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/priv/riak_kv.schema b/priv/riak_kv.schema index 0426c1f1d..a0114fd54 100644 --- a/priv/riak_kv.schema +++ b/priv/riak_kv.schema @@ -399,7 +399,10 @@ {translation, "riak_kv.aae_throttle_limits", - riak_core_throttle:create_limits_translator_fun("anti_entropy", "mailbox_size") + begin + lists:foreach(fun code:add_path/1, filelib:wildcard("lib/*/ebin")), + riak_core_throttle:create_limits_translator_fun("anti_entropy", "mailbox_size") + end }. %% @see leveldb.bloomfilter From 09d93566417320556f48bf400a06de5915a97881 Mon Sep 17 00:00:00 2001 From: Andrei Zavada Date: Tue, 10 May 2022 21:56:07 +0300 Subject: [PATCH 02/15] tmp switch to TI-Tokyo/riak_core (with aarch64-aware eleveldb) --- rebar.config | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rebar.config b/rebar.config index fc86f6d03..8ceaa242d 100644 --- a/rebar.config +++ b/rebar.config @@ -42,7 +42,7 @@ ]}. {deps, [ - {riak_core, {git, "https://github.com/basho/riak_core.git", {branch, "develop"}}}, + {riak_core, {git, "https://github.com/TI-Tokyo/riak_core.git", {branch, "develop"}}}, {sidejob, {git, "https://github.com/basho/sidejob.git", {branch, "develop"}}}, {bitcask, {git, "https://github.com/basho/bitcask.git", {branch, "develop"}}}, {redbug, {git, "https://github.com/massemanet/redbug", {tag, "v2.0.0"}}}, From 6e5a0915888c5ee57af30f87478471730163e2bd Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Tue, 18 Oct 2022 11:57:26 +0100 Subject: [PATCH 03/15] Update to mainstream redbug PR now accepted into redbug, so return to master. Changes to pass eqc tests on OTP 24. Also all eunit tests passing on OTP 25.1.1 --- eqc/crdt_statem_eqc.erl | 2 +- rebar.config | 2 +- src/riak_kv_bucket.erl | 10 +++++----- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/eqc/crdt_statem_eqc.erl b/eqc/crdt_statem_eqc.erl index 51b498ff8..3247fb7a9 100644 --- a/eqc/crdt_statem_eqc.erl +++ b/eqc/crdt_statem_eqc.erl @@ -62,7 +62,7 @@ next_state(#state{vnodes=VNodes0, mod_state=Expected, mod=Mod}=S,V, VNodes = lists:keyreplace(ID, 1, VNodes0, {ID, V}), S#state{vnodes=VNodes, mod_state=Mod:update_expected(ID, Op, Expected)}; next_state(#state{vnodes=VNodes0, mod_state=Expected0, mod=Mod}=S,V, - {call,?MODULE, merge, [_Mod, {IDS, _C}=_Source, {ID, _C}=_Dest]}) -> + {call,?MODULE, merge, [_Mod, {IDS, C}=_Source, {ID, C}=_Dest]}) -> VNodes = lists:keyreplace(ID, 1, VNodes0, {ID, V}), Expected = Mod:update_expected(ID, {merge, IDS}, Expected0), S#state{vnodes=VNodes, mod_state=Expected}; diff --git a/rebar.config b/rebar.config index 93f803a0f..868611f3e 100644 --- a/rebar.config +++ b/rebar.config @@ -45,7 +45,7 @@ {riak_core, {git, "https://github.com/TI-Tokyo/riak_core.git", {branch, "develop"}}}, {sidejob, {git, "https://github.com/basho/sidejob.git", {branch, "develop"}}}, {bitcask, {git, "https://github.com/basho/bitcask.git", {branch, "develop"}}}, - {redbug, {git, "https://github.com/shiguredo/redbug", {branch, "otp-25"}}}, + {redbug, {git, "https://github.com/massemanet/redbug", {branch, "master"}}}, {recon, {git, "https://github.com/ferd/recon", {tag, "2.5.2"}}}, {sext, {git, "https://github.com/uwiger/sext.git", {tag, "1.8.0"}}}, {riak_pipe, {git, "https://github.com/basho/riak_pipe.git", {branch, "develop"}}}, diff --git a/src/riak_kv_bucket.erl b/src/riak_kv_bucket.erl index e4a2f1502..ede072c1e 100644 --- a/src/riak_kv_bucket.erl +++ b/src/riak_kv_bucket.erl @@ -1263,7 +1263,7 @@ immutable_consistent(undefined, _N, undefined, _Bad) -> immutable_consistent(true, _N, undefined, _Bad) -> %% consistent still set to true and n_val not modified true; -immutable_consistent(Consistent, _N, _N, _Bad) when Consistent =:= undefined orelse +immutable_consistent(Consistent, N, N, _Bad) when Consistent =:= undefined orelse Consistent =:= true -> %% consistent not modified or still set to true and n_val %% modified but set to same value @@ -1306,10 +1306,10 @@ undefined_props(Names, Props, Errors) -> immutable_dt(_NewDT=undefined, _NewAllowMult=undefined, _ExistingDT, _Bad) -> %% datatype and allow_mult are not being modified, so its valid true; -immutable_dt(_Datatype, undefined, _Datatype, _Bad) -> +immutable_dt(Datatype, undefined, Datatype, _Bad) -> %% data types from new and existing match and allow mult not modified, valid true; -immutable_dt(_Datatype, true, _Datatype, _Bad) -> +immutable_dt(Datatype, true, Datatype, _Bad) -> %% data type from new and existing match and allow mult still set to true, %% valid true; @@ -1325,7 +1325,7 @@ immutable_dt(_Datatype, true, _Datatype2, Bad) -> immutable_dt(_Datatype, false, undefined, Bad) -> %% datatype defined when it wasn't before has_datatype(Bad); -immutable_dt(_Datatype, false, _Datatype, Bad) -> +immutable_dt(Datatype, false, Datatype, Bad) -> %% attempt to set allow_mult to false when data type set is invalid, datatype not modified has_allow_mult(Bad); immutable_dt(undefined, false, _Datatype, Bad) -> @@ -1337,7 +1337,7 @@ immutable_dt(_Datatype, false, _Datatype2, Bad) -> immutable_dt(undefined, _, _Datatype, Bad) -> %% datatype not modified but allow_mult is invalid has_allow_mult(Bad); -immutable_dt(_Datatype, _, _Datatype, Bad) -> +immutable_dt(Datatype, _, Datatype, Bad) -> %% allow mult is invalid but data types still match has_allow_mult(Bad); immutable_dt(_, _, _, Bad) -> From 125fe91ca261c13c30fe1b30c211c433c1c86843 Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Tue, 18 Oct 2022 13:46:36 +0100 Subject: [PATCH 04/15] Update erlang.yml (#1834) Add cmake to allow build of snappy --- .github/workflows/erlang.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/erlang.yml b/.github/workflows/erlang.yml index 6debfd95c..525857930 100644 --- a/.github/workflows/erlang.yml +++ b/.github/workflows/erlang.yml @@ -25,6 +25,7 @@ jobs: image: erlang:${{ matrix.otp }} steps: + - uses: lukka/get-cmake@latest - uses: actions/checkout@v2 - name: Compile run: ./rebar3 compile From 5458e797f440ffcf67d67e2e54d258bff7b0a706 Mon Sep 17 00:00:00 2001 From: Andrei Zavada Date: Tue, 22 Nov 2022 17:02:53 +0200 Subject: [PATCH 05/15] repoint most external deps (all except hyper) to hex.pm --- rebar.config | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/rebar.config b/rebar.config index 868611f3e..b2953eb88 100644 --- a/rebar.config +++ b/rebar.config @@ -1,3 +1,4 @@ +%% -*- mode: erlang -*- {minimum_otp_vsn, "22.0"}. {src_dirs, ["./priv/tracers", "./src"]}. @@ -42,16 +43,16 @@ ]}. {deps, [ + {redbug, "2.0.8"}, + {sext, "1.8.0"}, + {sidejob, "2.1.0"}, + {recon, "2.5.2"}, + {hyper, {git, "https://github.com/basho/hyper", {tag, "1.1.0"}}}, {riak_core, {git, "https://github.com/TI-Tokyo/riak_core.git", {branch, "develop"}}}, - {sidejob, {git, "https://github.com/basho/sidejob.git", {branch, "develop"}}}, {bitcask, {git, "https://github.com/basho/bitcask.git", {branch, "develop"}}}, - {redbug, {git, "https://github.com/massemanet/redbug", {branch, "master"}}}, - {recon, {git, "https://github.com/ferd/recon", {tag, "2.5.2"}}}, - {sext, {git, "https://github.com/uwiger/sext.git", {tag, "1.8.0"}}}, {riak_pipe, {git, "https://github.com/basho/riak_pipe.git", {branch, "develop"}}}, {riak_dt, {git, "https://github.com/basho/riak_dt.git", {branch, "develop"}}}, {riak_api, {git, "https://github.com/basho/riak_api.git", {branch, "develop"}}}, - {hyper, {git, "https://github.com/basho/hyper", {tag, "1.1.0"}}}, {kv_index_tictactree, {git, "https://github.com/martinsumner/kv_index_tictactree.git", {branch, "develop-3.1"}}}, {rhc, {git, "https://github.com/basho/riak-erlang-http-client", {branch, "develop-3.2-otp24"}}} ]}. From ccd384dd9dcc08572c4bf6cc5bde7ddab70c6dec Mon Sep 17 00:00:00 2001 From: Andrei Zavada Date: Wed, 23 Nov 2022 03:21:16 +0200 Subject: [PATCH 06/15] Mas i1807 overflowqueues (#1809) * Bound reap/erase queues Also don't log queues on a crash - avoid over-sized crash dumps * Create generic behaviour for eraser/reaper Have the queue backed to disk, so that beyond a certain size it overflows from memory to disk (and once the on disk part is consumed from the queue the files are removed). * Stop cleaning folder Risk of misconfiguration leading to wiping of wrong data. Also starting a job may lead to the main process having its disk_log wiped. * Setup folders correctly Avoid enoent errors * Correct log * Further log correction * Correct cleaning of directories for test * Switch to action/2 * Update eqc tests for refactoring of reaper/eraser * Improve comments/API * Pause reaper on overload Check for a soft overload on any vnode before reaping. This will add some delay - but reap has been show to potentially overload a cluster ... availability is more important than reap speed. There is no catch for {error, mailbox_overload} should it occur - it should not as the mailbox check should prevent it. If it does, the reaper will crash (and restart without any reaps) - return to a safe known position. * Adjustments following review The queue file generated, are still in UUID format, but the id now incorporates creation date. This should make it easier to detect and clean any garbage that might be accrued. One use case where '++' is used has also been removed (although a lists:flatten/1 was still required at the end in this case) # Conflicts: # priv/riak_kv.schema # rebar.config # src/riak_kv_overflow_queue.erl # src/riak_kv_reaper.erl # src/riak_kv_replrtq_src.erl # src/riak_kv_test_util.erl --- priv/riak_kv.schema | 58 +++++++++++++++++----------------- src/riak_kv_overflow_queue.erl | 30 +++++++++--------- src/riak_kv_reaper.erl | 2 +- 3 files changed, 45 insertions(+), 45 deletions(-) diff --git a/priv/riak_kv.schema b/priv/riak_kv.schema index 5892b22b2..f758f7f16 100644 --- a/priv/riak_kv.schema +++ b/priv/riak_kv.schema @@ -108,8 +108,8 @@ %% @doc Parallel key store type %% When running in parallel mode, which will be the default if the backend does -%% not support native tictac aae (i.e. is not leveled), what type of parallel -%% key store should be kept - leveled_ko (leveled and key-ordered), or +%% not support native tictac aae (i.e. is not leveled), what type of parallel +%% key store should be kept - leveled_ko (leveled and key-ordered), or %% leveled_so (leveled and segment ordered). %% When running in native mode, this setting is ignored {mapping, "tictacaae_parallelstore", "riak_kv.tictacaae_parallelstore", [ @@ -119,7 +119,7 @@ ]}. %% @doc Minimum Rebuild Wait -%% The minimum number of hours to wait between rebuilds. Default value is 2 +%% The minimum number of hours to wait between rebuilds. Default value is 2 %% weeks {mapping, "tictacaae_rebuildwait", "riak_kv.tictacaae_rebuildwait", [ {datatype, integer}, @@ -128,8 +128,8 @@ %% @doc Maximum Rebuild Delay %% The number of seconds which represents the length of the period in which the -%% next rebuild will be scheduled. So if all vnodes are scheduled to rebuild -%% at the same time, they will actually rebuild randomly between 0 an this +%% next rebuild will be scheduled. So if all vnodes are scheduled to rebuild +%% at the same time, they will actually rebuild randomly between 0 an this %% value (in seconds) after the rebuild time. Default value is 4 days {mapping, "tictacaae_rebuilddelay", "riak_kv.tictacaae_rebuilddelay", [ {datatype, integer}, @@ -137,9 +137,9 @@ ]}. %% @doc Store heads in parallel key stores -%% If running a parallel key store, the whole "head" object may be stored to -%% allow for fold_heads queries to be run against the parallel store. -%% Alternatively, the cost of the parallel key store can be reduced by storing +%% If running a parallel key store, the whole "head" object may be stored to +%% allow for fold_heads queries to be run against the parallel store. +%% Alternatively, the cost of the parallel key store can be reduced by storing %% only a minimal data set necessary for AAE and monitoring {mapping, "tictacaae_storeheads", "riak_kv.tictacaae_storeheads", [ {datatype, {flag, enabled, disabled}}, @@ -151,10 +151,10 @@ %% The number of milliseconds which the vnode must wait between self-pokes to %% maybe prompt the next exchange. Default is 8 minutes - check all partitions %% when n=3 once every hour (in each direction). A cycle of exchanges will -%% take (n - 1) * n + 1 exchange ticks for each nval. +%% take (n - 1) * n + 1 exchange ticks for each nval. %% Note if this is to be reduced further the riak_core vnode_inactivity_timeout -%% should also be reduced or handoffs may be blocked. To be safe the -%% vnode_inactivity_timeout must be < 0.5 * the tictacaae_exchangetick. +%% should also be reduced or handoffs may be blocked. To be safe the +%% vnode_inactivity_timeout must be < 0.5 * the tictacaae_exchangetick. {mapping, "tictacaae_exchangetick", "riak_kv.tictacaae_exchangetick", [ {datatype, integer}, {default, 480000}, @@ -180,7 +180,7 @@ %% faster by doubling. There are 1M segments in a standard tree overall. %% Performance tuning can also be made by adjusting the `tictacaae_repairloops` %% and `tictacaae_rangeboost` - but `tictacaae_maxresults` is the simplest -%% factor that is likely to result in a relatively predictable (and linear) +%% factor that is likely to result in a relatively predictable (and linear) %% outcome in terms of both CPU cost and repair speed. {mapping, "tictacaae_maxresults", "riak_kv.tictacaae_maxresults", [ {datatype, integer}, @@ -204,18 +204,18 @@ hidden ]}. -%% @doc Multiplier to the `tictcaaae_maxresults` when following an initial AAE +%% @doc Multiplier to the `tictcaaae_maxresults` when following an initial AAE %% exchange with a range-limited exchange. %% After each exchange, where sufficient deltas are discovered there will be a %% `tictacaae_repairloops` number of range-limited queries (assuming %% sufficient results continue to be found). Each of these may have the -%% the number of max results boosted by this integer factor. +%% the number of max results boosted by this integer factor. %% For example, if `tictacaae_maxresuts` is set to 64, and %% `tictacaae_repairloops` is set to 4, and the `tictacaae_rangeboost` is set %% to 2 - the initial loop will use `tictacaae_maxresuts` of 64, but any %% AAE exchanges on loops 1 to 4 will use 128. %% Exchanges with range-limited queries are more efficient, and so more tree -%% segments can be fetched without creating significant CPU overheads, hence +%% segments can be fetched without creating significant CPU overheads, hence %% the use of this boost to maxresults. {mapping, "tictacaae_rangeboost", "riak_kv.tictacaae_rangeboost", [ {datatype, integer}, @@ -249,7 +249,7 @@ %% Separate assured forwarding pools will be used of `af_worker_pool_size` for %% informational aae_folds (find_keys, object_stats) and functional folds %% (merge_tree_range, fetch_clock_range). The be_pool is used only for tictac -%% AAE rebuilds at present +%% AAE rebuilds at present {mapping, "node_worker_pool_size", "riak_kv.node_worker_pool_size", [ {datatype, integer}, {default, 4} @@ -946,11 +946,11 @@ ]}. -%% @doc For Tictac full-sync does all data need to be sync'd, or should a -%% specific bucket be sync'd (bucket), or a specific bucket type (type). +%% @doc For Tictac full-sync does all data need to be sync'd, or should a +%% specific bucket be sync'd (bucket), or a specific bucket type (type). %% Note that in most cases sync of all data is lower overhead than sync of %% a subset of data - as cached AAE trees will be used. -%% TODO: type is not yet implemented. +%% TODO: type is not yet implemented. {mapping, "ttaaefs_scope", "riak_kv.ttaaefs_scope", [ {datatype, {enum, [all, bucket, type, disabled]}}, {default, disabled} @@ -996,7 +996,7 @@ %% If using range_check to speed-up repairs, this can be reduced as the %% range_check maxresults will be boosted by the ttaaefs_rangeboost When using %% range_check a value of 64 is recommended, which may be reduced to 32 or 16 -%% if the cluster has a very large volume of keys and/or limited capacity. +%% if the cluster has a very large volume of keys and/or limited capacity. %% Only reduce below 16 in exceptional circumstances. %% More capacity to process sync queries can be added by increaseing the af2 %% and af3 queue sizes - but this will be at the risk of there being a bigger @@ -1011,21 +1011,21 @@ %% ttaaefs_max results * ttaaefs_rangeboost. %% When using range_check, a small maxresults can be used, in effect using %% other *_check syncs as discovery queries (to find the range_check for the -%% range_check to do the heavy lifting) +%% range_check to do the heavy lifting) {mapping, "ttaaefs_rangeboost", "riak_kv.ttaaefs_rangeboost", [ {datatype, integer}, {default, 16} ]}. %% @doc For Tictac bucket full-sync which bucket should be sync'd by this -%% node. Only ascii string bucket definitions supported (which will be -%% converted using list_to_binary). +%% node. Only ascii string bucket definitions supported (which will be +%% converted using list_to_binary). {mapping, "ttaaefs_bucketfilter_name", "riak_kv.ttaaefs_bucketfilter_name", [ {datatype, string}, {commented, "sample_bucketname"} ]}. -%% @doc For Tictac bucket full-sync what is the bucket type of the bucket name. +%% @doc For Tictac bucket full-sync what is the bucket type of the bucket name. %% Only ascii string type bucket definitions supported (these %% definitions will be converted to binary using list_to_binary) {mapping, "ttaaefs_bucketfilter_type", "riak_kv.ttaaefs_bucketfilter_type", [ @@ -1033,7 +1033,7 @@ {commented, "default"} ]}. -%% @doc For Tictac bucket-type full-sync what is the bucket type to be sync'd. +%% @doc For Tictac bucket-type full-sync what is the bucket type to be sync'd. %% Only ascii string type bucket definitions supported (these %% definitions will be converted to binary using list_to_binary). %% TODO: Type-based filtering is not yet supported @@ -1134,7 +1134,7 @@ %% The af3_queue size, and the ttaaefs_maxresults, both need to be tuned to %% ensure that the allcheck can run wihtin the 30 minute timeout. %% For per-bucket replication all is a reference to all of the data for that -%% bucket, and warnings about sizing are specially relevant. +%% bucket, and warnings about sizing are specially relevant. {mapping, "ttaaefs_allcheck", "riak_kv.ttaaefs_allcheck", [ {datatype, integer}, {default, 0} @@ -1180,7 +1180,7 @@ %% @doc How many times per 24hour period should the a range_check be run. The %% range_check is intended to be a smart check, in that it will: %% - use a last_modified range starting from the last successful check as its -%% range if the last check was successful (i.e. showed the clusters to be +%% range if the last check was successful (i.e. showed the clusters to be %% in sync); %% - use a range identified by the last check (a last modified range, and %% perhaps also a specific Bucket) if a range to limit the issues has been @@ -1407,7 +1407,7 @@ %% @doc Enable the `recalc` compaction strategy within the leveled backend in %% riak. The default (when disabled) is `retain`, but this will leave -%% uncollected garbage within the, journal. +%% uncollected garbage within the, journal. %% It is now recommended from Riak KV 2.9.2 to consider the `recalc` strategy. %% This strategy has a side effect of slower startups, and slower recovery %% from a wiped ledger - but it will not keep an overhead of garbage within @@ -1442,7 +1442,7 @@ %% each worker is taking per query in microseconds, so the overall queries %% per second supported will be: %% (1000000 div worker_vnode_pool_worktime) * n_val * worker_count -%% It should normally be possible to support >> 100 queries per second with +%% It should normally be possible to support >> 100 queries per second with %% just a single worker per vnode. %% The statistic worker_vnode_pool_queuetime_mean will track the average time %% a query is spending on a queue, should the vnode pool be exhausted. diff --git a/src/riak_kv_overflow_queue.erl b/src/riak_kv_overflow_queue.erl index 5d69b64d3..b08ceb0a7 100644 --- a/src/riak_kv_overflow_queue.erl +++ b/src/riak_kv_overflow_queue.erl @@ -52,7 +52,7 @@ -type filename() :: file:filename()|none. -type queue_stats() :: list({pos_integer(), non_neg_integer()}). --record(overflowq, +-record(overflowq, { mqueues :: list({priority(), queue:queue()}) | not_logged, @@ -134,7 +134,7 @@ log(Type, JobID, Attempts, Aborts, Queue) -> OverflowLengths, DiscardCounts]), [Type, JobID, Attempts, Aborts]), - + ResetDiscards = lists:map(fun({P, _L}) -> {P, 0} end, Queue#overflowq.overflow_discards), @@ -155,13 +155,13 @@ stats(Queue) -> addto_queue(Priority, Item, FlowQ) -> MQueueLimit = FlowQ#overflowq.mqueue_limit, MQueueLengths = FlowQ#overflowq.mqueue_lengths, - {Priority, CurrentQL} = + {Priority, CurrentQL} = lists:keyfind(Priority, 1, MQueueLengths), OverflowFiles = FlowQ#overflowq.overflow_files, MQueues = FlowQ#overflowq.mqueues, {Priority, {OverflowFile, OverflowC}} = lists:keyfind(Priority, 1, OverflowFiles), - + case {OverflowFile, CurrentQL} of {none, CurrentQL} when CurrentQL < MQueueLimit -> UpdQueueLengths = @@ -272,7 +272,7 @@ close(FilePath, FlowQ) -> fetch_batch(Priority, MaxBatchSize, FlowQ) -> UpdFlowQ = maybereload_queue(Priority, MaxBatchSize, FlowQ), case lists:keyfind(Priority, 1, UpdFlowQ#overflowq.mqueue_lengths) of - {Priority, 0} -> + {Priority, 0} -> {empty, UpdFlowQ}; {Priority, MQueueL} -> BatchSize = min(MQueueL, MaxBatchSize), @@ -315,7 +315,7 @@ maybereload_queue(Priority, BatchSize, FlowQ) -> %% There are enough items on the queue, don't reload FlowQ; {{Priority, _N}, {Priority, {none, start}}} -> - %% There are no overflow files to reload from + %% There are no overflow files to reload from FlowQ; {{Priority, N}, {Priority, {File, Continuation}}} -> %% Attempt to refill the queue from the overflow file @@ -364,7 +364,7 @@ maybereload_queue(Priority, BatchSize, FlowQ) -> mqueues = UpdMQueues, mqueue_lengths = UpdMQueueCounts, overflow_lengths = UpdOverflowCounts, - overflow_files = UpdOverflowFiles} + overflow_files = UpdOverflowFiles} end end. @@ -439,16 +439,16 @@ basic_inmemory_test() -> ?assertMatch([1], B2), {mqueue_lengths, MQL2} = lists:keyfind(mqueue_lengths, 1, stats(FlowQ3)), ?assertMatch([{1, 99}, {2, 0}], MQL2), - + {B3, FlowQ4} = fetch_batch(1, 99, FlowQ3), ExpB = lists:seq(2, 100), ?assertMatch(ExpB, B3), {empty, _FlowQ5} = fetch_batch(1, 1, FlowQ4), - + ok = filelib:ensure_dir(RootPath), {ok, Files} = file:list_dir(RootPath), - + ?assertMatch([], Files). basic_overflow_test() -> @@ -473,12 +473,12 @@ basic_overflow_test() -> ok = filelib:ensure_dir(RootPath), {ok, Files1} = file:list_dir(RootPath), ?assertMatch(0, length(Files1)), - + FlowQ_NEW = new([1, 2], RootPath, 1000, 5000), {mqueue_lengths, MQL2} = lists:keyfind(mqueue_lengths, 1, stats(FlowQ_NEW)), ?assertMatch([{1, 0}, {2, 0}], MQL2), - + ok = filelib:ensure_dir(RootPath), {ok, Files2} = file:list_dir(RootPath), ?assertMatch(0, length(Files2)). @@ -527,7 +527,7 @@ underover_overflow_test() -> Refs2 = lists:seq(7001, 8000), FlowQ8 = lists:foldl(fun(R, FQ) -> addto_queue(1, R, FQ) end, FlowQ7, Refs2), - + {B7, FlowQ9} = fetch_batch(1, 1200, FlowQ8), ExpB7 = lists:seq(1801, 3000), ?assertMatch(ExpB7, B7), @@ -569,7 +569,7 @@ underover_overflow_test() -> Refs3 = lists:seq(8001, 10000), FlowQ15 = lists:foldl(fun(R, FQ) -> addto_queue(1, R, FQ) end, FlowQ14, Refs3), - + {mqueue_lengths, MQL2} = lists:keyfind(mqueue_lengths, 1, stats(FlowQ15)), ?assertMatch([{1, 1000}, {2, 0}], MQL2), @@ -583,4 +583,4 @@ underover_overflow_test() -> close(RootPath, FlowQ15). --endif. \ No newline at end of file +-endif. diff --git a/src/riak_kv_reaper.erl b/src/riak_kv_reaper.erl index 305694d44..5d9a655a1 100644 --- a/src/riak_kv_reaper.erl +++ b/src/riak_kv_reaper.erl @@ -98,7 +98,7 @@ request_reap(Pid, ReapReference) -> list({atom(), non_neg_integer()|riak_kv_overflow_queue:queue_stats()}). reap_stats() -> reap_stats(?MODULE). --spec reap_stats(pid()|module()) -> +-spec reap_stats(pid()|module()) -> list({atom(), non_neg_integer()|riak_kv_overflow_queue:queue_stats()}). reap_stats(Pid) -> riak_kv_queue_manager:stats(Pid). From a975ea2f19e56b902036716a71f415ce6e98c609 Mon Sep 17 00:00:00 2001 From: Andrei Zavada Date: Wed, 23 Nov 2022 03:34:02 +0200 Subject: [PATCH 07/15] Mas i1804 peerdiscovery (#1812) See #1804 The heart of the problem is how to avoid needing configuration changes on sink clusters when source clusters are bing changed. This allows for new nodes to be discovered automatically, from configured nodes. Default behaviour is to always fallback to configured behaviour. Worker Counts and Per Peer Limits need to be set based on an understanding of whether this will be enabled. Although, if per peer limit is left to default, the consequence will be the worker count will be evenly distributed (independently by each node). Note, if Worker Count mod (Src Node Count) =/= 0 - then there will be no balancing of the excess workers across the sink nodes. # Conflicts: # rebar.config # src/riak_kv_replrtq_peer.erl # src/riak_kv_replrtq_snk.erl # src/riak_kv_replrtq_src.erl --- src/riak_kv_replrtq_peer.erl | 23 +++++++++------------ src/riak_kv_replrtq_snk.erl | 18 ++++++++-------- src/riak_kv_replrtq_src.erl | 40 ++++++++++++++++++++++++++++-------- 3 files changed, 51 insertions(+), 30 deletions(-) diff --git a/src/riak_kv_replrtq_peer.erl b/src/riak_kv_replrtq_peer.erl index e6871d85b..7b8b76403 100644 --- a/src/riak_kv_replrtq_peer.erl +++ b/src/riak_kv_replrtq_peer.erl @@ -81,19 +81,19 @@ update_workers(WorkerCount, PerPeerLimit) -> init([]) -> case application:get_env(riak_kv, replrtq_peer_discovery, false) of - true -> + true -> SinkPeers = application:get_env(riak_kv, replrtq_sinkpeers, ""), DefaultQueue = app_helper:get_env(riak_kv, replrtq_sinkqueue), SnkQueuePeerInfo = riak_kv_replrtq_snk:tokenise_peers(DefaultQueue, SinkPeers), - MinDelay = + MinDelay = application:get_env(riak_kv, replrtq_prompt_min_seconds, ?AUTO_DISCOVERY_MINIMUM_SECONDS), lists:foreach( - fun({QueueName, _PeerInfo}) -> + fun({QueueName, _PeerInfo}) -> _ = schedule_discovery(QueueName, self(), MinDelay) end, SnkQueuePeerInfo), @@ -105,9 +105,8 @@ init([]) -> handle_call({update_discovery, QueueName}, _From, State) -> case lists:keyfind(QueueName, 1, State#state.discovery_peers) of false -> - ?LOG_INFO( - "Type=~w discovery for unconfigured QueueName=~w", - [update, QueueName]), + ?LOG_INFO("Type=~w discovery for unconfigured QueueName=~w", + [update, QueueName]), {reply, false, State}; {QueueName, PeerInfo} -> R = do_discovery(QueueName, PeerInfo, update), @@ -133,7 +132,7 @@ handle_cast({prompt_discovery, QueueName}, State) -> handle_info({scheduled_discovery, QueueName}, State) -> ok = prompt_discovery(QueueName), - MinDelay = + MinDelay = application:get_env( riak_kv, replrtq_prompt_min_seconds, @@ -158,7 +157,7 @@ code_change(_OldVsn, State, _Extra) -> %%%============================================================================ %% @doc -%% Prompt the riak_kv_replrtq_peer to discover peers for a given queue name +%% Prompt the riak_kv_replrtq_peer to discover peers for a given queue name -spec prompt_discovery(riak_kv_replrtq_snk:queue_name()) -> ok. prompt_discovery(QueueName) -> gen_server:cast(?MODULE, {prompt_discovery, QueueName}). @@ -180,7 +179,7 @@ schedule_discovery(QueueName, DiscoveryPid, SecondsDelay) -> do_discovery(QueueName, PeerInfo, Type) -> {SnkWorkerCount, PerPeerLimit} = riak_kv_replrtq_snk:get_worker_counts(), StartDelayMS = riak_kv_replrtq_snk:starting_delay(), - CurrentPeers = + CurrentPeers = case Type of count_change -> %% Ignore current peers, to update worker counts, so all @@ -235,7 +234,7 @@ discover_peers(PeerInfo, StartingDelayMS) -> end, lists:usort(lists:foldl(ConvertToPeerInfoFun, [], Peers)). - + -spec discover_from_peer( riak_kv_replrtq_snk:peer_info(), list({binary(), pos_integer(), pb|http})) @@ -244,7 +243,7 @@ discover_from_peer(PeerInfo, Acc) -> {_PeerID, _Delay, Host, Port, Protocol} = PeerInfo, RemoteGenFun = riak_kv_replrtq_snk:remote_client_fun(Protocol, Host, Port), RemoteFun = RemoteGenFun(), - UpdAcc = + UpdAcc = try case RemoteFun(peer_discovery) of {ok, IPPorts} -> @@ -265,5 +264,3 @@ discover_from_peer(PeerInfo, Acc) -> end, RemoteFun(close), UpdAcc. - - \ No newline at end of file diff --git a/src/riak_kv_replrtq_snk.erl b/src/riak_kv_replrtq_snk.erl index db45cb4f2..16deb8ccc 100644 --- a/src/riak_kv_replrtq_snk.erl +++ b/src/riak_kv_replrtq_snk.erl @@ -47,7 +47,7 @@ add_snkqueue/4, current_peers/1]). --export([repl_fetcher/1, +-export([repl_fetcher/1, tokenise_peers/2, get_worker_counts/0, set_worker_counts/2, @@ -210,7 +210,7 @@ add_snkqueue(QueueName, Peers, WorkerCount) -> %% number of workers overall -spec add_snkqueue(queue_name(), list(peer_info()), pos_integer(), pos_integer()) -> ok. -add_snkqueue(QueueName, Peers, WorkerCount, PerPeerLimit) +add_snkqueue(QueueName, Peers, WorkerCount, PerPeerLimit) when PerPeerLimit =< WorkerCount -> gen_server:call(?MODULE, {add, QueueName, Peers, WorkerCount, PerPeerLimit}). @@ -218,7 +218,7 @@ add_snkqueue(QueueName, Peers, WorkerCount, PerPeerLimit) %% @doc %% Return the current list of peers being used by this snk host, and the -%% settings currently being used for this host and he workers per peer. +%% settings currently being used for this host and he workers per peer. %% Returns undefined if there are currently no peers defined. -spec current_peers(queue_name()) -> list(peer_info())|undefined. current_peers(QueueName) -> @@ -435,7 +435,7 @@ handle_info({prompt_requeue, WorkItem}, State) -> terminate(_Reason, State) -> WorkItems = lists:map(fun(SW) -> element(3, SW) end, State#state.work), - CloseFun = + CloseFun = fun(SinkWork) -> lists:foreach( fun({{_QN, _Iter, _Peer}, _LocalC, RemoteFun, _RCF}) -> @@ -558,7 +558,7 @@ map_peer_to_wi_fun({QueueName, Iteration, PeerInfo}) -> %% @doc %% Return a function which when called will enclose a remote_fun for sending %% requests with a reusable client (if required) --spec remote_client_fun(http|pb, string(), pos_integer()) -> +-spec remote_client_fun(http|pb, string(), pos_integer()) -> fun(() -> remote_fun()). remote_client_fun(http, Host, Port) -> InitClientFun = client_start(http, Host, Port, []), @@ -582,9 +582,9 @@ remote_client_fun(pb, Host, Port) -> app_helper:get_env(riak_kv, repl_cert_filename), KeyFilename = app_helper:get_env(riak_kv, repl_key_filename), - SecuritySitename = + SecuritySitename = app_helper:get_env(riak_kv, repl_username), - Opts = + Opts = case CaCertificateFilename of undefined -> [{silence_terminate_crash, true}]; @@ -621,7 +621,7 @@ remote_client_fun(pb, Host, Port) -> end end. --spec client_start(pb|http, string(), pos_integer(), list()) +-spec client_start(pb|http, string(), pos_integer(), list()) -> fun(() -> rhc:rhc()|pid()|no_pid). client_start(pb, Host, Port, Opts) -> fun() -> @@ -789,7 +789,7 @@ add_failure({S, {failure, Failure}, FT, PT, RT, MT}) -> -spec add_repltime(queue_stats(), {integer(), integer(), integer()}) -> queue_stats(). -add_repltime({S, +add_repltime({S, F, {replfetch_time, FT}, {replpush_time, PT}, {replmod_time, RT}, MT}, diff --git a/src/riak_kv_replrtq_src.erl b/src/riak_kv_replrtq_src.erl index 20d9336cb..9a8451822 100644 --- a/src/riak_kv_replrtq_src.erl +++ b/src/riak_kv_replrtq_src.erl @@ -161,7 +161,7 @@ % If a priority 3 item is pushed to the queue and the length of the % queue_cache is less than the object limit, and the overflowq is empty % for this priority, the item will be added to the queue_cache. - % + % % If a priority 3 item is pushed to the queue and the length of the % queue_cache is at/over the object limit, or the overflowq is non-empty % then the item will be added to the overflowq, and if the object_ref is @@ -171,7 +171,7 @@ % added first to the overflowq . % % If a fetch request is received and the priority 3 queue_cache is - % non-empty then the next entry from this queue will be returned. + % non-empty then the next entry from this queue will be returned. % If the overflow queue is empty, then an attempt will be made to % return a batch from the overflowq, to add to the queue_cache. % @@ -310,9 +310,33 @@ stop() -> %%% gen_server callbacks %%%============================================================================ -init([FilePath]) -> - QueueDefnString = app_helper:get_env(riak_kv, replrtq_srcqueue, ""), +init([]) -> + QueueDefnString = application:get_env(riak_kv, replrtq_srcqueue, ""), QFM = tokenise_queuedefn(QueueDefnString), + MapToQM = + fun({QueueName, _QF, _QA}) -> + {QueueName, riak_core_priority_queue:new()} + end, + MaptoQC = + fun({QueueName, _QF, _QA}) -> + {QueueName, {0, 0, 0}} + end, + QM = lists:map(MapToQM, QFM), + QC = lists:map(MaptoQC, QFM), + QL = application:get_env(riak_kv, replrtq_srcqueuelimit, ?QUEUE_LIMIT), + OL = application:get_env(riak_kv, replrtq_srcobjectlimit, ?OBJECT_LIMIT), + LogFreq = + application:get_env( + riak_kv, + replrtq_logfrequency, + ?LOG_TIMER_SECONDS * 1000), + erlang:send_after(LogFreq, self(), log_queue), + {ok, #state{queue_filtermap = QFM, + queue_map = QM, + queue_countmap = QC, + queue_limit = QL, + object_limit = OL, + log_frequency_in_ms = LogFreq}}. {OL, QL} = get_limits(), @@ -828,7 +852,7 @@ empty_local_queue() -> {{queue:new(), 0, 0}, {queue:new(), 0, 0}, {queue:new(), 0, 0}}. -spec empty_overflow_queue(queue_name(), string()) - -> riak_kv_overflow_queue:overflowq(). + -> riak_kv_overflow_queue:overflowq(). empty_overflow_queue(QueueName, FilePath) -> {_OL, QL} = get_limits(), Priorities = [?FLD_PRIORITY, ?AAE_PRIORITY, ?RTQ_PRIORITY], @@ -1079,7 +1103,7 @@ limit_aaefold_test() -> ok = replrtq_aaefold(?QN1, Grp4), ?assertMatch({?QN1, {100000, 0, 2000}}, length_rtq(?QN1)), - + lists:foreach(fun(_I) -> _ = popfrom_rtq(?QN1) end, lists:seq(1, 4000)), ?assertMatch({?QN1, {98000, 0, 0}}, length_rtq(?QN1)), @@ -1106,7 +1130,7 @@ limit_ttaaefs_test() -> ok = replrtq_ttaaefs(?QN1, Grp4), ?assertMatch({?QN1, {0, 100000, 2000}}, length_rtq(?QN1)), - + lists:foreach(fun(_I) -> _ = popfrom_rtq(?QN1) end, lists:seq(1, 4000)), ?assertMatch({?QN1, {0, 98000, 0}}, length_rtq(?QN1)), @@ -1118,7 +1142,7 @@ limit_ttaaefs_test() -> ok = replrtq_ttaaefs(?QN1, Grp4), ?assertMatch({?QN1, {0, 2000, 0}}, length_rtq(?QN1)), - + lists:foreach(fun(_I) -> _ = popfrom_rtq(?QN1) end, lists:seq(1, 2000)), ?assertMatch({?QN1, {0, 0, 0}}, length_rtq(?QN1)), From 0eefa607db314c85c0904aeacd1450b314059e7a Mon Sep 17 00:00:00 2001 From: Andrei Zavada Date: Wed, 23 Nov 2022 03:40:25 +0200 Subject: [PATCH 08/15] Mas i1815 autocheck (#1816) To simplify the configuration, rather than have the operator select all_check day_check range_check etc, there is now a default strategy of auto_check which tries to do a sensible thing: do range_check when a range is set, otherwise do all_check if it is out of hours (in the all_check window), otherwise do day_check Some stats added to help with monitoring, the detail is still also in the console logs. See #1815 # Conflicts: # src/riak_kv_stat.erl # src/riak_kv_ttaaefs_manager.erl --- src/riak_kv_ttaaefs_manager.erl | 134 ++++++++++++++++---------------- 1 file changed, 67 insertions(+), 67 deletions(-) diff --git a/src/riak_kv_ttaaefs_manager.erl b/src/riak_kv_ttaaefs_manager.erl index 6fceee1bc..2cd742824 100644 --- a/src/riak_kv_ttaaefs_manager.erl +++ b/src/riak_kv_ttaaefs_manager.erl @@ -143,7 +143,7 @@ start_link() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). - + %% @doc %% Override shcedule and process an individual work_item. If called from %% riak_client an integer ReqID is passed to allow for a response to be @@ -160,7 +160,7 @@ process_workitem(WorkItem, ReqID, From, Now) -> gen_server:cast(?MODULE, {WorkItem, ReqID, From, Now}). %% @doc -%% Pause the management of full-sync from this node +%% Pause the management of full-sync from this node -spec pause() -> ok|{error, already_paused}. pause() -> gen_server:call(?MODULE, pause). @@ -217,12 +217,12 @@ init([]) -> RangeCheck = app_helper:get_env(riak_kv, ttaaefs_rangecheck), AutoCheck = app_helper:get_env(riak_kv, ttaaefs_autocheck), - {SliceCount, Schedule} = + {SliceCount, Schedule} = case Scope of disabled -> {24, [{no_check, 24}, - {all_check, 0}, + {all_check, 0}, {day_check, 0}, {hour_check, 0}, {range_check, 0}, @@ -238,11 +238,11 @@ init([]) -> {range_check, RangeCheck}, {auto_check, AutoCheck}]} end, - + CheckWindow = app_helper:get_env(riak_kv, ttaaefs_allcheck_window), - State1 = + State1 = case Scope of all -> LocalNVal = app_helper:get_env(riak_kv, ttaaefs_localnval), @@ -257,7 +257,7 @@ init([]) -> B = app_helper:get_env(riak_kv, ttaaefs_bucketfilter_name), T = app_helper:get_env(riak_kv, ttaaefs_bucketfilter_type), B0 = - case is_binary(B) of + case is_binary(B) of true -> B; false -> @@ -283,8 +283,8 @@ init([]) -> slice_count = SliceCount, slot_info_fun = fun get_slotinfo/0} end, - - + + % Fetch connectivity information for remote cluster PeerIP = app_helper:get_env(riak_kv, ttaaefs_peerip), PeerPort = app_helper:get_env(riak_kv, ttaaefs_peerport), @@ -295,9 +295,9 @@ init([]) -> app_helper:get_env(riak_kv, repl_cert_filename), KeyFilename = app_helper:get_env(riak_kv, repl_key_filename), - SecuritySitename = + SecuritySitename = app_helper:get_env(riak_kv, repl_username), - SSLEnabled = + SSLEnabled = (CaCertificateFilename =/= undefined) and (CertificateFilename =/= undefined) and (KeyFilename =/= undefined) and @@ -312,13 +312,13 @@ init([]) -> false -> undefined end, - + % Queue name to be used for AAE exchanges on this cluster SrcQueueName = app_helper:get_env(riak_kv, ttaaefs_queuename), PeerQueueName = application:get_env(riak_kv, ttaaefs_queuename_peer, disabled), - State2 = + State2 = State1#state{peer_ip = PeerIP, peer_port = PeerPort, peer_protocol = PeerProtocol, @@ -326,7 +326,7 @@ init([]) -> queue_name = SrcQueueName, peer_queue_name = PeerQueueName, check_window = CheckWindow}, - + ?LOG_INFO("Initiated Tictac AAE Full-Sync Mgr with scope=~w", [Scope]), {ok, State2, ?INITIAL_TIMEOUT}. @@ -334,7 +334,7 @@ handle_call(pause, _From, State) -> case State#state.is_paused of true -> {reply, {error, already_paused}, State}; - false -> + false -> PausedSchedule = [{no_check, State#state.slice_count}, {all_check, 0}, @@ -369,7 +369,7 @@ handle_call(resume, _From, State) -> {reply, {error, not_paused}, State, ?INITIAL_TIMEOUT} end; handle_call({set_sink, Protocol, PeerIP, PeerPort}, _From, State) -> - State0 = + State0 = State#state{peer_ip = PeerIP, peer_port = PeerPort, peer_protocol = Protocol}, @@ -398,7 +398,7 @@ handle_call({set_bucketsync, BucketList}, _From, State) -> handle_cast({reply_complete, ReqID, Result}, State) -> LastExchangeStart = State#state.last_exchange_start, Duration = timer:now_diff(os:timestamp(), LastExchangeStart), - {Pause, State0} = + {Pause, State0} = case Result of {waiting_all_results, _Deltas} -> % If the exchange ends with waiting all results, then consider @@ -414,7 +414,7 @@ handle_cast({reply_complete, ReqID, Result}, State) -> SyncState == branch_compare -> riak_kv_stat:update({ttaaefs, sync_sync, Duration}), ?LOG_INFO( - "exchange=~w complete result=~w in duration=~w s" ++ + "exchange=~w complete result=~w in duration=~w s" " sync_state=true", [ReqID, Result, Duration div 1000000]), disable_tree_repairs(), @@ -447,7 +447,7 @@ handle_cast({all_check, ReqID, From, _Now}, State) -> none, undefined, full}; bucket -> [H|T] = State#state.bucket_list, - {range, range, + {range, range, {filter, H, all, large, all, all, pre_hash}, T ++ [H], partial} @@ -472,7 +472,7 @@ handle_cast({day_check, ReqID, From, Now}, State) -> State#state.local_nval, State#state.remote_nval, Filter, - undefined, + undefined, full, State, day_check), @@ -508,14 +508,14 @@ handle_cast({hour_check, ReqID, From, Now}, State) -> State#state.local_nval, State#state.remote_nval, Filter, - undefined, + undefined, full, State, hour_check), {noreply, State0, Timeout}; bucket -> [H|T] = State#state.bucket_list, - + % Note that the tree size is amended as well as the time range. % The bigger the time range, the bigger the tree. Bigger trees % are less efficient when there is little change, but can more @@ -577,7 +577,7 @@ handle_cast({range_check, ReqID, From, _Now}, State) -> State#state.local_nval, State#state.remote_nval, Filter, - undefined, + undefined, full, State, range_check), @@ -651,7 +651,7 @@ handle_info(timeout, State) -> [OldInfo, SlotInfo]), {[], undefined} end, - {WorkItem, Wait, RemainingSlices, ScheduleStartTime} = + {WorkItem, Wait, RemainingSlices, ScheduleStartTime} = take_next_workitem(Allocations, State#state.schedule, StartTime, @@ -696,7 +696,7 @@ code_change(_OldVsn, State, _Extra) -> set_range(Bucket, KeyRange, LowDate, HighDate) -> EpochTime = calendar:datetime_to_gregorian_seconds({{1970,1,1},{0,0,0}}), - LowTS = + LowTS = calendar:datetime_to_gregorian_seconds(LowDate) - EpochTime, HighTS = calendar:datetime_to_gregorian_seconds(HighDate) - EpochTime, @@ -710,7 +710,7 @@ clear_range() -> application:set_env(riak_kv, ttaaefs_check_range, none). -spec get_range() -> - none|{riak_object:bucket()|all, + none|{riak_object:bucket()|all, {riak_object:key(), riak_object:key()}|all, pos_integer(), pos_integer()}. get_range() -> @@ -781,7 +781,7 @@ sync_clusters(From, ReqID, LNVal, RNVal, Filter, NextBucketList, StopFun = fun() -> stop_client(RemoteClient, RemoteMod) end, RemoteSendFun = generate_sendfun({RemoteClient, RemoteMod}, RNVal), LocalSendFun = generate_sendfun(local, LNVal), - ReqID0 = + ReqID0 = case ReqID of no_reply -> erlang:phash2({self(), os:timestamp()}); @@ -790,7 +790,7 @@ sync_clusters(From, ReqID, LNVal, RNVal, Filter, NextBucketList, end, ReplyFun = generate_replyfun(ReqID == no_reply, ReqID0, From, StopFun), - + MaxResults = case WorkType of range_check -> @@ -806,7 +806,7 @@ sync_clusters(From, ReqID, LNVal, RNVal, Filter, NextBucketList, ttaaefs_maxresults, ?MAX_RESULTS) end, - + LocalRepairFun = fun(RepairList) -> riak_kv_replrtq_src:replrtq_ttaaefs( @@ -859,17 +859,17 @@ sync_clusters(From, ReqID, LNVal, RNVal, Filter, NextBucketList, [{RemoteSendFun, all}], RepairFun, ReplyFun, - Filter, + Filter, [{transition_pause_ms, ExchangePause}, {max_results, MaxResults}, {scan_timeout, ?CRASH_TIMEOUT div 2}, {purpose, WorkType}]), - - ?LOG_INFO("Starting ~w full-sync work_item=~w " ++ + + ?LOG_INFO("Starting ~w full-sync work_item=~w " ++ "reqid=~w exchange id=~s pid=~w", [Ref, WorkType, ReqID0, ExID, ExPid]), riak_kv_stat:update({ttaaefs, WorkType}), - + {State#state{bucket_list = NextBucketList, last_exchange_start = os:timestamp()}, ?CRASH_TIMEOUT} @@ -902,7 +902,7 @@ get_slotinfo() -> lists:sort([node()|UpNodes]) end, NotMe = lists:takewhile(fun(N) -> N /= node() end, UpNodes0), - ClusterSlice = + ClusterSlice = max(min(app_helper:get_env(riak_kv, ttaaefs_cluster_slice, 1), 4), 1), {length(NotMe) + 1, length(UpNodes0), ClusterSlice}. @@ -910,17 +910,17 @@ get_slotinfo() -> %% Return a function which will send aae_exchange messages to a remote %% cluster, and return the response. The function should make an async call %% to try and make the remote and local cluster sends happen as close to -%% parallel as possible. +%% parallel as possible. -spec generate_sendfun({rhc:rhc(), rhc}|{pid(), riakc_pb_socket}|local, nval()) -> aae_exchange:send_fun(). generate_sendfun(SendClient, NVal) -> fun(Msg, all, Colour) -> AAE_Exchange = self(), - ReturnFun = - fun(R) -> + ReturnFun = + fun(R) -> aae_exchange:reply(AAE_Exchange, R, Colour) end, - SendFun = + SendFun = case SendClient of local -> C = riak_client:new(node(), undefined), @@ -952,7 +952,7 @@ init_client(pb, IP, Port, undefined) -> Options = [{auto_reconnect, true}], init_pbclient(IP, Port, Options); init_client(pb, IP, Port, Credentials) -> - SecurityOpts = + SecurityOpts = [{cacertfile, element(1, Credentials)}, {certfile, element(2, Credentials)}, {keyfile, element(3, Credentials)}, @@ -977,7 +977,7 @@ init_pbclient(IP, Port, Options) -> ?LOG_INFO("Cannot reach remote cluster ~p ~p as ~p", [IP, Port, Reason]), {no_client, riakc_pb_socket} - catch + catch _Exception:Reason -> ?LOG_WARNING("Cannot reach remote cluster ~p ~p exception ~p", [IP, Port, Reason]), @@ -995,7 +995,7 @@ local_sender({fetch_clocks, SegmentIDs}, C, ReturnFun, NVal) -> local_sender({fetch_clocks, SegmentIDs, MR}, C, ReturnFun, NVal) -> %% riak_client expects modified range of form %% {date, non_neg_integer(), non_neg_integer()} - %% where as the riak erlang clients just expect + %% where as the riak erlang clients just expect %% {non_neg_integer(), non_neg_integer()} %% They keyword all must also be supported LMR = localise_modrange(MR), @@ -1010,12 +1010,12 @@ local_sender({fetch_clocks_range, B0, KR, SF, MR}, C, ReturnFun, _NVal) -> -spec run_localfold(riak_kv_clusteraae_fsm:query_definition(), riak_client:riak_client(), - fun((any()) -> ok)) -> + fun((any()) -> ok)) -> fun(() -> ok). run_localfold(Query, Client, ReturnFun) -> fun() -> case riak_client:aae_fold(Query, Client) of - {ok, R} -> + {ok, R} -> ReturnFun(R); {error, Error} -> ReturnFun({error, Error}) @@ -1126,7 +1126,7 @@ generate_replyfun(Clientless, ReqID, From, StopClientFun) -> % Reply to riak_client From ! {ReqID, Result} end, - gen_server:cast(?MODULE, {reply_complete, ReqID, Result}), + gen_server:cast(?MODULE, {reply_complete, ReqID, Result}), StopClientFun() end. @@ -1172,11 +1172,11 @@ generate_repairfun(LocalRepairFun, RemoteRepairFun, MaxResults, LogInfo) -> {SrcRepair, SnkRepair} = lists:foldl(FoldFun, {[], []}, RepairList), ?LOG_INFO( "AAE reqid=~w work_item=~w scope=~w shows sink ahead " ++ - "for key_count=~w keys limited by max_results=~w", + "for key_count=~w keys limited by max_results=~w", [ExchangeID, WorkItem, WorkScope, length(SnkRepair), MaxResults]), ?LOG_INFO( "AAE reqid=~w work_item=~w scope=~w shows source ahead " ++ - "for key_count=~w keys limited by max_results=~w", + "for key_count=~w keys limited by max_results=~w", [ExchangeID, WorkItem, WorkScope, length(SrcRepair), MaxResults]), riak_kv_stat:update({ttaaefs, snk_ahead, length(SnkRepair)}), riak_kv_stat:update({ttaaefs, src_ahead, length(SrcRepair)}), @@ -1192,9 +1192,9 @@ generate_repairfun(LocalRepairFun, RemoteRepairFun, MaxResults, LogInfo) -> %% @doc Examine the number of repairs, and the repair summary and determine -%% what to do next e.g. set a range for the next range_check +%% what to do next e.g. set a range for the next range_check -spec determine_next_action( - non_neg_integer(), + non_neg_integer(), pos_integer(), work_scope(), work_item(), list(repair_summary())) -> ok. @@ -1267,7 +1267,7 @@ decode_clock(EncodedClock) -> -spec summarise_repairs(integer(), list(repair_reference()), work_scope(), - work_item()) -> + work_item()) -> list(repair_summary()). summarise_repairs(ExchangeID, RepairList, WorkScope, WorkItem) -> FoldFun = @@ -1285,7 +1285,7 @@ summarise_repairs(ExchangeID, RepairList, WorkScope, WorkItem) -> LogFun = fun({B, C, MinDT, MaxDT}) -> ?LOG_INFO( - "AAE exchange=~w work_item=~w type=~w repaired " ++ + "AAE exchange=~w work_item=~w type=~w repaired " ++ "key_count=~w for bucket=~p with low date ~p high date ~p", [ExchangeID, WorkScope, WorkItem, C, B, MinDT, MaxDT]) end, @@ -1296,7 +1296,7 @@ summarise_repairs(ExchangeID, RepairList, WorkScope, WorkItem) -> %% Take the next work item from the list of allocations, assuming that the %% starting time for that work item has not alreasy passed. If there are no %% more items queue, start a new queue based on the wants for the schedule. --spec take_next_workitem(list(allocation()), +-spec take_next_workitem(list(allocation()), schedule_wants(), erlang:timestamp()|undefined, node_info(), @@ -1307,7 +1307,7 @@ take_next_workitem([], Wants, ScheduleStartTime, SlotInfo, SliceCount) -> NewAllocations = choose_schedule(Wants), % Should be 24 hours after ScheduleStartTime - so add 24 hours to % ScheduleStartTime - RevisedStartTime = + RevisedStartTime = case ScheduleStartTime of undefined -> beginning_of_next_period(os:timestamp(), SliceCount); @@ -1375,7 +1375,7 @@ beginning_of_next_period({Mega, Sec, _Micro}, SlotCount) -> SlotsPassed = (NowGS - TopOfDayGS) div SlotSize, NextPeriodGS = TopOfDayGS + SlotSize * (SlotsPassed + 1), EpochSeconds = Mega * ?MEGA + Sec + NextPeriodGS - NowGS, - {EpochSeconds div ?MEGA, EpochSeconds rem ?MEGA, 0}. + {EpochSeconds div ?MEGA, EpochSeconds rem ?MEGA, 0}. %% @doc @@ -1506,8 +1506,8 @@ choose_schedule_test() -> AllSyncAll = choose_schedule(AllSyncAllSchedule), ExpAllSyncAll = lists:map(fun(I) -> {I, all_check} end, lists:seq(1, 100)), ?assertMatch(AllSyncAll, ExpAllSyncAll), - - MixedSyncSchedule = + + MixedSyncSchedule = [{no_check, 6}, {all_check, 1}, {auto_check, 3}, @@ -1521,14 +1521,14 @@ choose_schedule_test() -> HourWorkload = lists:map(SliceForHourFun, lists:filter(IsSyncFun, MixedSync)), ?assertMatch(84, length(lists:usort(HourWorkload))), - FoldFun = + FoldFun = fun(I, Acc) -> true = I > Acc, I end, BiggestI = lists:foldl(FoldFun, 0, HourWorkload), ?assertMatch(true, BiggestI >= 84), - + CountFun = fun({_I, Type}, Acc) -> {Type, CD} = lists:keyfind(Type, 1, Acc), @@ -1543,7 +1543,7 @@ take_first_workitem_test() -> SC = 48, Wants = [{no_check, SC}, - {all_check, 0}, + {all_check, 0}, {auto_check, 0}, {day_check, 0}, {hour_check, 0}, @@ -1562,26 +1562,26 @@ take_first_workitem_test() -> beginning_of_next_period({Mega, Sec, Micro}, SC), % 24 hours on, the new scheudle start time should be the same it would be % if we started now - {no_check, PromptSeconds, SchedRem, ScheduleStartTime} = + {no_check, PromptSeconds, SchedRem, ScheduleStartTime} = take_next_workitem([], Wants, OrigStartTime, {1, 8, 1}, SC), ?assertMatch(true, ScheduleStartTime > {Mega, Sec, Micro}), ?assertMatch(true, PromptSeconds > 0), - {no_check, PromptMoreSeconds, SchedRem, ScheduleStartTime} = + {no_check, PromptMoreSeconds, SchedRem, ScheduleStartTime} = take_next_workitem([], Wants, OrigStartTime, {2, 8, 1}, SC), ?assertMatch(true, PromptMoreSeconds > PromptSeconds), - {no_check, PromptEvenMoreSeconds, SchedRem, ScheduleStartTime} = + {no_check, PromptEvenMoreSeconds, SchedRem, ScheduleStartTime} = take_next_workitem([], Wants, OrigStartTime, {7, 8, 1}, SC), ?assertMatch(true, PromptEvenMoreSeconds > PromptMoreSeconds), - {no_check, PromptYetMoreSeconds, _T0, ScheduleStartTime} = + {no_check, PromptYetMoreSeconds, _T0, ScheduleStartTime} = take_next_workitem(SchedRem, Wants, ScheduleStartTime, {1, 8, 1}, SC), ?assertMatch(true, PromptYetMoreSeconds > PromptEvenMoreSeconds), - {no_check, PromptS2YetMoreSeconds, _, ScheduleStartTime} = + {no_check, PromptS2YetMoreSeconds, _, ScheduleStartTime} = take_next_workitem(SchedRem, Wants, ScheduleStartTime, {1, 8, 2}, SC), - {no_check, PromptS3YetMoreSeconds, _, ScheduleStartTime} = + {no_check, PromptS3YetMoreSeconds, _, ScheduleStartTime} = take_next_workitem(SchedRem, Wants, ScheduleStartTime, {1, 8, 3}, SC), - {no_check, PromptS4YetMoreSeconds, _, ScheduleStartTime} = + {no_check, PromptS4YetMoreSeconds, _, ScheduleStartTime} = take_next_workitem(SchedRem, Wants, ScheduleStartTime, {1, 8, 4}, SC), - {no_check, PromptN2YetMoreSeconds, _, ScheduleStartTime} = + {no_check, PromptN2YetMoreSeconds, _, ScheduleStartTime} = take_next_workitem(SchedRem, Wants, ScheduleStartTime, {2, 8, 1}, SC), ?assertMatch(true, PromptS4YetMoreSeconds > PromptS3YetMoreSeconds), ?assertMatch(true, PromptS3YetMoreSeconds > PromptS2YetMoreSeconds), @@ -1599,7 +1599,7 @@ window_test() -> ?assert(in_window(Now0, {0, 0})), ?assertNot(in_window(Now0, {1, 1})), ?assertNot(in_window(Now0, {23, 23})), - + NowSecs1 = calendar:datetime_to_gregorian_seconds( {{2000, 1, 1}, {23, 59, 59}}), From 9bd0b0f248c6d90c8edab1e2cba29dee97bec5e8 Mon Sep 17 00:00:00 2001 From: Andrei Zavada Date: Wed, 23 Nov 2022 03:44:39 +0200 Subject: [PATCH 09/15] Mas i1817 overflowrtq (#1829) Expand on use of riak_kv_overflow_queue so that it is used by the riak_kv_replrtq_src, as well as riak_kv_reaper and riak_kv_eraser. This means that larger queue sizes can be supported for riak_kv_replrtq_src without having to worry about compromising the memory of the node. This should allow for repl_keys_range AAE folds to generate very large replication sets, without clogging the node worker pool by pausing so that real-time replication can keep up. The overflow queues are deleted on shutdown (if there is a queue on disk). The feature is to allow for larger queues without memory exhaustion, persistence is not used to persist queues across restarts. Overflow Queues extended to include a 'reader' queue which may be used for read_repairs. Currently this queue is only used for the repair_keys_range query and the read-repair trigger. # Conflicts: # priv/riak_kv.schema # src/riak_kv_overflow_queue.erl # src/riak_kv_replrtq_src.erl --- priv/riak_kv.schema | 1 - src/riak_kv_replrtq_src.erl | 29 ++++++++++++----------------- 2 files changed, 12 insertions(+), 18 deletions(-) diff --git a/priv/riak_kv.schema b/priv/riak_kv.schema index f758f7f16..8be962113 100644 --- a/priv/riak_kv.schema +++ b/priv/riak_kv.schema @@ -66,7 +66,6 @@ ]}. %% @doc A path under which the repl real-time overload queue will be stored. -%% @doc A path under which the reaper overload queue will be stored. {mapping, "replrtq_dataroot", "riak_kv.replrtq_dataroot", [ {default, "$(platform_data_dir)/kv_replrtqsrc"}, {datatype, directory} diff --git a/src/riak_kv_replrtq_src.erl b/src/riak_kv_replrtq_src.erl index 9a8451822..9ec0f6208 100644 --- a/src/riak_kv_replrtq_src.erl +++ b/src/riak_kv_replrtq_src.erl @@ -310,33 +310,28 @@ stop() -> %%% gen_server callbacks %%%============================================================================ -init([]) -> - QueueDefnString = application:get_env(riak_kv, replrtq_srcqueue, ""), +init([FilePath]) -> + QueueDefnString = app_helper:get_env(riak_kv, replrtq_srcqueue, ""), QFM = tokenise_queuedefn(QueueDefnString), - MapToQM = + + {OL, QL} = get_limits(), + + MapToQOverflow = fun({QueueName, _QF, _QA}) -> - {QueueName, riak_core_priority_queue:new()} + {QueueName, empty_overflow_queue(QueueName, FilePath)} end, - MaptoQC = + MaptoQCache = fun({QueueName, _QF, _QA}) -> - {QueueName, {0, 0, 0}} + {QueueName, empty_local_queue()} end, - QM = lists:map(MapToQM, QFM), - QC = lists:map(MaptoQC, QFM), - QL = application:get_env(riak_kv, replrtq_srcqueuelimit, ?QUEUE_LIMIT), - OL = application:get_env(riak_kv, replrtq_srcobjectlimit, ?OBJECT_LIMIT), + QO = lists:map(MapToQOverflow, QFM), + QC = lists:map(MaptoQCache, QFM), LogFreq = - application:get_env( + app_helper:get_env( riak_kv, replrtq_logfrequency, ?LOG_TIMER_SECONDS * 1000), erlang:send_after(LogFreq, self(), log_queue), - {ok, #state{queue_filtermap = QFM, - queue_map = QM, - queue_countmap = QC, - queue_limit = QL, - object_limit = OL, - log_frequency_in_ms = LogFreq}}. {OL, QL} = get_limits(), From e441decbf5bdd02104a316e3e6776c4a0e13d452 Mon Sep 17 00:00:00 2001 From: Andrei Zavada Date: Wed, 23 Nov 2022 03:47:29 +0200 Subject: [PATCH 10/15] Mas i1813 keepamnesia (#1830) Introduces a reader overflowq for doing read repair operations. Initially this is used for: - repair_keys_range aae_fold - avoids the pausing of the fold that would block the worker pool; - repair on key_amnesia - triggers the required repair rather than causing an AAE delta; - repair on finding a corrupted object when folding to rebuild aae_store - previously the fold would crash, and the AAE store would therefore never be rebuilt. [This PR](https://github.com/martinsumner/kv_index_tictactree/pull/106) is required to make this consistent in both AAE solutions. # Conflicts: # priv/riak_kv.schema # rebar.config # src/riak_kv_clusteraae_fsm.erl # src/riak_kv_index_hashtree.erl # src/riak_kv_vnode.erl --- src/riak_kv_clusteraae_fsm.erl | 149 ++++++++++++++++----------------- src/riak_kv_index_hashtree.erl | 16 ++-- src/riak_kv_stat.erl | 21 ----- src/riak_kv_vnode.erl | 6 +- 4 files changed, 85 insertions(+), 107 deletions(-) diff --git a/src/riak_kv_clusteraae_fsm.erl b/src/riak_kv_clusteraae_fsm.erl index 66b9f779e..11b99e8e9 100644 --- a/src/riak_kv_clusteraae_fsm.erl +++ b/src/riak_kv_clusteraae_fsm.erl @@ -18,7 +18,7 @@ %% %% ------------------------------------------------------------------- -%% @doc The AAE fold FSM allows for coverage folds acrosss Tictac AAE +%% @doc The AAE fold FSM allows for coverage folds acrosss Tictac AAE %% Controllers -module(riak_kv_clusteraae_fsm). @@ -42,10 +42,10 @@ -define(EMPTY, <<>>). --define(NVAL_QUERIES, +-define(NVAL_QUERIES, [merge_root_nval, merge_branch_nval, fetch_clocks_nval, list_buckets]). --define(RANGE_QUERIES, +-define(RANGE_QUERIES, [merge_tree_range, fetch_clocks_range, repl_keys_range, repair_keys_range, find_keys, object_stats, @@ -90,8 +90,8 @@ %% the cluster and increase parallelistaion of the process. %% The count change_method() will perform no reaps/deletes - but will %% simply count the matching keys - this is cheaper than runnning - %% find_tombs/find_keys to accumulate/sort a large list for counting. --type query_types() :: + %% find_tombs/find_keys to accumulate/sort a large list for counting. +-type query_types() :: merge_root_nval|merge_branch_nval|fetch_clocks_nval| merge_tree_range|fetch_clocks_range|repl_keys_range|repair_keys_range| find_keys|object_stats| @@ -100,7 +100,7 @@ -type query_definition() :: % Use of these folds depends on the Tictac AAE being enabled in either - % native mode, or in parallel mode with key_order being used. + % native mode, or in parallel mode with key_order being used. % N-val AAE (using cached trees) {merge_root_nval, n_val()}| @@ -114,19 +114,19 @@ {fetch_clocks_nval, n_val(), segment_filter()}| {fetch_clocks_nval, n_val(), segment_filter(), modified_range()}| % Scan over all the keys for a given n_val in the tictac AAE key store - % (which for native stores will be the actual key store), skipping + % (which for native stores will be the actual key store), skipping % those blocks of the store not containing keys in the segment filter, % returning a list of keys and clocks for that n_val within the - % cluster. This is a background operation, but will have lower + % cluster. This is a background operation, but will have lower % overheads than traditional store folds, subject to the size of the % segment filter being small - ideally o(10) or smaller % Variant supported with a modified range, which will be converted into % a fetch_clocks_range % Range-based AAE (requiring folds over native/parallel AAE key stores) - {merge_tree_range, + {merge_tree_range, bucket(), - key_range(), + key_range(), tree_size(), {segments, segment_filter(), tree_size()} | all, modified_range() | all, @@ -139,7 +139,7 @@ % Different size trees can be requested. Smaller tree sizes are more % likely to lead to false negative results, but are more efficient % to calculate and have a reduced load on the network - % + % % A segment_filter() may be passed. For example, if a tree comparison % has been done between two clusters, it might be preferable to confirm % the differences before fetching clocks. This can be done by @@ -169,9 +169,9 @@ % which are either: % - pre_hash (use the default pre-calculated hash) % - {rehash, IV} rehash the vector clock concatenated with an integer - {fetch_clocks_range, + {fetch_clocks_range, bucket(), - key_range(), + key_range(), {segments, segment_filter(), tree_size()} | all, modified_range() | all}| % Return the keys and clocks in the given bucket and key range. @@ -197,13 +197,13 @@ % % The leveled backend supports a max_key_count which could be used to % provide a loose_limit on the results returned. However, there are - % issues with this and segment_ordered backends, as well as extra + % issues with this and segment_ordered backends, as well as extra % complexity curtailing the results (and signalling the results are % curtailed). The main downside of large result sets is network over % use. Perhaps compressing the payload may be a better answer? - {repl_keys_range, + {repl_keys_range, bucket(), - key_range(), + key_range(), modified_range() | all, riak_kv_replrtq_src:queue_name()}| % Replicate all the objects in a given key and modified range. By @@ -228,7 +228,7 @@ % participating in coverage % Operational support functions - {find_keys, + {find_keys, bucket(), key_range(), modified_range() | all, @@ -244,11 +244,11 @@ % is the pre-calculated size stored in the aae key store as % metadata. % - % The query returns a list of [{Key, SiblingCount}] tuples or - % [{Key, ObjectSize}] tuples depending on the filter requested. The + % The query returns a list of [{Key, SiblingCount}] tuples or + % [{Key, ObjectSize}] tuples depending on the filter requested. The % cost of this operation will increase with the size of the range - % - % It would be beneficial to use the results of object_stats (or + % + % It would be beneficial to use the results of object_stats (or % knowledge of the application) to ensure that the result size of % this query is reasonably bounded (e.g. don't set too low an object % size). If only interested in the outcom of recent modifications, @@ -259,15 +259,15 @@ % - the total count of objects in the key range % - the accumulated total size of all objects in the range % - a list [{Magnitude, ObjectCount}] tuples where Magnitude represents - % the order of magnitude of the size of the object (e.g. 1KB is objects + % the order of magnitude of the size of the object (e.g. 1KB is objects % from 100 bytes to 1KB, 10KB is objects from 1KB to 10KB etc) % - a list of [{SiblingCount, ObjectCount}] tuples where Sibling Count % is the number of siblings the object has. % - sample portion - (n_val * sample_size) / ring_size % e.g. - % [{total_count, 1000}, - % {total_size, 1000000}, - % {sizes, [{1, 800}, {2, 180}, {3, 20}]}, + % [{total_count, 1000}, + % {total_size, 1000000}, + % {sizes, [{1, 800}, {2, 180}, {3, 20}]}, % {siblings, [{1, 1000}]}] % % If only interested in the outcome of recent modifications, @@ -275,7 +275,7 @@ {find_tombs, bucket(), - key_range(), + key_range(), {segments, segment_filter(), tree_size()} | all, modified_range() | all} | % Find all tombstones in the range that match the criteria, and @@ -355,16 +355,16 @@ -endif. -spec init(from(), inbound_api()) -> init_response(). -%% @doc -%% Return a tuple containing the ModFun to call per vnode, the number of -%% primary preflist vnodes the operation should cover, the service to use to -%% check for available nodes,and the registered name to use to access the +%% @doc +%% Return a tuple containing the ModFun to call per vnode, the number of +%% primary preflist vnodes the operation should cover, the service to use to +%% check for available nodes,and the registered name to use to access the %% vnode master process. init(From={_, _, _}, [Query, Timeout]) -> % Get the bucket n_val for use in creating a coverage plan QueryType = element(1, Query), - NVal = - case {lists:member(QueryType, ?NVAL_QUERIES), + NVal = + case {lists:member(QueryType, ?NVAL_QUERIES), lists:member(QueryType, ?RANGE_QUERIES)} of {true, false} -> element(2, Query); @@ -382,7 +382,7 @@ init(From={_, _, _}, [Query, Timeout]) -> merge_root_nval -> ?EMPTY; merge_branch_nval -> - lists:map(fun(X) -> {X, ?EMPTY} end, + lists:map(fun(X) -> {X, ?EMPTY} end, element(3, Query)); merge_tree_range -> TreeSize = element(4, Query), @@ -392,7 +392,7 @@ init(From={_, _, _}, [Query, Timeout]) -> repair_keys_range -> {[], 0, element(5, Query), ?REPAIR_BATCH_SIZE}; object_stats -> - [{total_count, 0}, + [{total_count, 0}, {total_size, 0}, {sizes, []}, {siblings, []}]; @@ -415,28 +415,28 @@ init(From={_, _, _}, [Query, Timeout]) -> end end, - Req = riak_kv_requests:new_aaefold_request(Query, InitAcc, NVal), + Req = riak_kv_requests:new_aaefold_request(Query, InitAcc, NVal), - State = #state{from = From, - acc = InitAcc, + State = #state{from = From, + acc = InitAcc, start_time = os:timestamp(), query_type = QueryType}, ?LOG_INFO("AAE fold prompted of type=~w", [QueryType]), - {Req, all, NVal, 1, - riak_kv, riak_kv_vnode_master, - Timeout, + {Req, all, NVal, 1, + riak_kv, riak_kv_vnode_master, + Timeout, State}. - + process_results({error, Reason}, _State) -> ?LOG_WARNING("Failure to process fold results due to ~w", [Reason]), {error, Reason}; process_results(Results, State) -> - % Results are received as a one-off for each vnode in this case, and so + % Results are received as a one-off for each vnode in this case, and so % once results are merged work is always done. Acc = State#state.acc, QueryType = State#state.query_type, - UpdAcc = + UpdAcc = case lists:member(QueryType, ?LIST_ACCUMULATE_QUERIES) of true -> case QueryType of @@ -472,15 +472,15 @@ process_results(Results, State) -> {_EL, AccCount, all, RBS} = Acc, {[], AccCount + Count, all, RBS}; object_stats -> - [{total_count, R_TC}, + [{total_count, R_TC}, {total_size, R_TS}, {sizes, R_SzL}, {siblings, R_SbL}] = Results, - [{total_count, A_TC}, + [{total_count, A_TC}, {total_size, A_TS}, {sizes, A_SzL}, {siblings, A_SbL}] = Acc, - [{total_count, R_TC + A_TC}, + [{total_count, R_TC + A_TC}, {total_size, R_TS + A_TS}, {sizes, merge_countinlists(A_SzL, R_SzL)}, {siblings, merge_countinlists(A_SbL, R_SbL)}]; @@ -503,7 +503,7 @@ process_results(Results, State) -> %% Once the coverage FSM has received done for all vnodes (as an output from %% process_results), then it will call finish(clean, State) and so the results -%% can be sent to the client, and the FSM can be stopped. +%% can be sent to the client, and the FSM can be stopped. finish({error, Error}, State=#state{from={raw, ReqId, ClientPid}}) -> % Notify the requesting client that an error % occurred or the timeout has elapsed. @@ -511,10 +511,10 @@ finish({error, Error}, State=#state{from={raw, ReqId, ClientPid}}) -> ClientPid ! {ReqId, {error, Error}}, {stop, normal, State}; finish(clean, State=#state{from={raw, ReqId, ClientPid}}) -> - % The client doesn't expect results in increments only the final result, + % The client doesn't expect results in increments only the final result, % so no need for a seperate send of a 'done' message QueryDuration = timer:now_diff(os:timestamp(), State#state.start_time), - ?LOG_INFO("Finished aaefold of type=~w with fold_time=~w seconds", + ?LOG_INFO("Finished aaefold of type=~w with fold_time=~w seconds", [State#state.query_type, QueryDuration/1000000]), Results = case State#state.query_type of @@ -530,7 +530,7 @@ finish(clean, State=#state{from={raw, ReqId, ClientPid}}) -> end; false -> ok - end, + end, Count; _ -> State#state.acc @@ -614,14 +614,14 @@ pb_encode_results(merge_tree_range, QD, Tree) -> %% TODO: %% Using leveled_tictac:export_tree/1 requires unnecessary base64 encoding %% and decoding. Add a leveled_tictac:export_tree_raw fun to avoid this - {struct, - [{<<"level1">>, EncodedL1}, + {struct, + [{<<"level1">>, EncodedL1}, {<<"level2">>, {struct, EncodedL2}}]} = leveled_tictac:export_tree(Tree), L2 = - lists:map(fun({I, CB}) -> + lists:map(fun({I, CB}) -> CBDecoded = base64:decode(CB), - Iint = binary_to_integer(I), + Iint = binary_to_integer(I), <> end, EncodedL2), @@ -636,32 +636,32 @@ pb_encode_results(fetch_clocks_range, _QD, KeysNClocks) -> keys_value = lists:map(fun pb_encode_bucketkeyclock/1, KeysNClocks)}; pb_encode_results(repl_keys_range, _QD, ReplResult) -> R = element(2, ReplResult), - #rpbaaefoldkeycountresp{response_type = <<"repl_keys">>, + #rpbaaefoldkeycountresp{response_type = <<"repl_keys">>, keys_count = [#rpbkeyscount{tag = <<"dispatched_count">>, count = R}]}; pb_encode_results(repair_keys_range, _QD, ReplResult) -> R = element(2, ReplResult), - #rpbaaefoldkeycountresp{response_type = <<"repair_keys">>, + #rpbaaefoldkeycountresp{response_type = <<"repair_keys">>, keys_count = [#rpbkeyscount{tag = <<"dispatched_count">>, count = R}]}; pb_encode_results(find_keys, _QD, Results) -> - KeyCountMap = + KeyCountMap = fun({_B, K, V}) -> #rpbkeyscount{tag = K, count = V} end, - #rpbaaefoldkeycountresp{response_type = <<"find_keys">>, + #rpbaaefoldkeycountresp{response_type = <<"find_keys">>, keys_count = lists:map(KeyCountMap, Results)}; pb_encode_results(find_tombs, QD, Results) -> pb_encode_results(find_keys, QD, Results); pb_encode_results(reap_tombs, _QD, Count) -> - #rpbaaefoldkeycountresp{response_type = <<"reap_tombs">>, + #rpbaaefoldkeycountresp{response_type = <<"reap_tombs">>, keys_count = [#rpbkeyscount{tag = <<"dispatched_count">>, count = Count}]}; pb_encode_results(erase_keys, _QD, Count) -> - #rpbaaefoldkeycountresp{response_type = <<"erase_keys">>, + #rpbaaefoldkeycountresp{response_type = <<"erase_keys">>, keys_count = [#rpbkeyscount{tag = <<"dispatched_count">>, count = Count}]}; @@ -680,7 +680,7 @@ pb_encode_results(object_stats, _QD, Results) -> end, SzL0 = lists:map(EncodeIdxL(sizes), SzL), SbL0 = lists:map(EncodeIdxL(siblings), SbL), - KeysCount = + KeysCount = [#rpbkeyscount{tag = atom_to_binary(total_count, unicode), count = TC}, #rpbkeyscount{tag = atom_to_binary(total_size, unicode), @@ -732,7 +732,7 @@ encode_find_key(Key, Value) -> {<<"value">>, Value}]. encode_bucket({Type, Bucket}) -> - {struct, + {struct, [{<<"bucket-type">>, Type}, {<<"bucket">>, Bucket}]}; encode_bucket(Bucket) -> {struct, [{<<"bucket">>, Bucket}]}. @@ -761,7 +761,7 @@ hash_function({rehash, InitialisationVector}) -> %% @doc -%% Send requests to the reaper, but every batch size get the reaper stats (a +%% Send requests to the reaper, but every batch size get the reaper stats (a %% sync operation) to avoid mailbox overload. -spec handle_in_batches(reap_tombs|erase_keys, list(riak_kv_reaper:reap_reference())| @@ -771,7 +771,7 @@ handle_in_batches(_Type, [], _BatchCount, _Worker) -> ok; handle_in_batches(Type, RefList, BatchCount, Worker) when BatchCount >= ?DELETE_BATCH_SIZE -> - + case Type of reap_tombs -> _ = riak_kv_reaper:reap_stats(Worker); @@ -792,8 +792,8 @@ handle_in_batches(Type, [Ref|RestRefs], BatchCount, Worker) -> %% Internal functions %% =================================================================== --spec merge_countinlists(list({integer(), integer()}), - list({integer(), integer()})) +-spec merge_countinlists(list({integer(), integer()}), + list({integer(), integer()})) -> list({integer(), integer()}). %% @doc %% Take two lists with {IntegerId, Count} tuples and return a list where the @@ -810,7 +810,7 @@ merge_countinlists(ResultList, AccList) -> end end, AccList0 = lists:map(MapFun, AccList), - lists:ukeymerge(1, + lists:ukeymerge(1, lists:ukeysort(1, AccList0), lists:ukeysort(1, ResultList)). @@ -842,7 +842,7 @@ is_nval(_) -> is_segment_list(L) when is_list(L) -> lists:all(fun is_integer/1, L); is_segment_list(_) -> - false. + false. is_segment_filter({segments, SegmentList, TreeSize}) -> IsSegmentList = is_segment_list(SegmentList), @@ -862,12 +862,12 @@ is_modrange(_) -> false. -convert_modrange({date, {LowDate, LowTime}, {HighDate, HighTime}}) +convert_modrange({date, {LowDate, LowTime}, {HighDate, HighTime}}) when is_tuple(LowDate), is_tuple(LowTime), is_tuple(HighDate), is_tuple(HighTime) -> EpochTime = calendar:datetime_to_gregorian_seconds({{1970,1,1},{0,0,0}}), - LowTS = + LowTS = calendar:datetime_to_gregorian_seconds({LowDate, LowTime}) - EpochTime, HighTS = @@ -989,12 +989,12 @@ json_encode_tictac_withentries_test() -> encode_results_ofsize(TreeSize) -> Tree = leveled_tictac:new_tree(tictac_folder_test, TreeSize), ExtractFun = fun(K, V) -> {K, V} end, - FoldFun = + FoldFun = fun({Key, Value}, AccTree) -> leveled_tictac:add_kv(AccTree, Key, Value, ExtractFun) end, - KVList = [{<<"key1">>, <<"value1">>}, - {<<"key2">>, <<"value2">>}, + KVList = [{<<"key1">>, <<"value1">>}, + {<<"key2">>, <<"value2">>}, {<<"key3">>, <<"value3">>}], Tree0 = lists:foldl(FoldFun, Tree, KVList), JsonTree = json_encode_results(merge_tree_range, Tree0), @@ -1027,7 +1027,7 @@ convert_validate_test() -> ?assertMatch(true, is_valid_fold(convert_fold(F))) end, [Q1, Q2, Q3, Q4, Q5, Q6, Q7]), - + IQ1 = {objects_stats, {<<"T">>, <<"B">>}, all, all}, IQ2 = {object_stats, all, all, all}, IQ3 = {object_stats, <<"B">>, {<<"SK">>, <<"EK">>}, all}, @@ -1041,4 +1041,3 @@ convert_validate_test() -> [IQ1, IQ2, IQ3, IQ4, IQ5, IQ6, IQ7]). -endif. - diff --git a/src/riak_kv_index_hashtree.erl b/src/riak_kv_index_hashtree.erl index 7c74af222..ba4f5ec25 100644 --- a/src/riak_kv_index_hashtree.erl +++ b/src/riak_kv_index_hashtree.erl @@ -591,7 +591,7 @@ load_built(#state{trees=Trees}) -> %% Generate a hash value for a `riak_object' -spec hash_object({riak_object:bucket(), riak_object:key()}, - riak_object_t2b() | + riak_object_t2b() | riak_object:riak_object() | riak_object:proxy_object(), version()) -> binary(). hash_object({Bucket, Key}, RObj, Version) -> @@ -616,10 +616,10 @@ fold_keys(Partition, HashtreePid, Index, HasIndexTree) -> FoldFun = fold_fun(HashtreePid, HasIndexTree), {Limit, Wait} = get_build_throttle(), ?LOG_INFO("Making fold request to reconstruct AAE tree idx=~p" - ++ " with version ~w", + ++ " with version ~w", [Partition, Version]), - Opts = - case Version of + Opts = + case Version of legacy -> [aae_reconstruction, {iterator_refresh, true}]; _ -> @@ -637,13 +637,13 @@ fold_keys(Partition, HashtreePid, Index, HasIndexTree) -> %% The accumulator in the fold is the number of bytes hashed %% modulo the "build limit" size. If we get an int back, everything is ok -handle_fold_keys_result({Result, {Limit, Delay}}, HashtreePid, Index) +handle_fold_keys_result({Result, {Limit, Delay}}, HashtreePid, Index) when is_integer(Result) -> - ?LOG_INFO("Finished AAE tree build idx=~p limit ~w delay ~w", + ?LOG_INFO("Finished AAE tree build idx=~p limit ~w delay ~w", [Index, Limit, Delay]), gen_server:cast(HashtreePid, build_finished); handle_fold_keys_result(Result, HashtreePid, Index) -> - ?LOG_ERROR("Failed to build hashtree for idx=~p. Result was: ~p", + ?LOG_ERROR("Failed to build hashtree for idx=~p. Result was: ~p", [Index, Result]), gen_server:cast(HashtreePid, build_failed). @@ -866,7 +866,7 @@ expand_items(HasIndex, Items, Version) -> expand_item(Has2ITree, {object, BKey, RObj}, Version, Others) -> IndexN = riak_kv_util:get_index_n(BKey), BinBKey = term_to_binary(BKey), - ObjHash = + ObjHash = try hash_object(BKey, RObj, Version) catch Error:Reason -> diff --git a/src/riak_kv_stat.erl b/src/riak_kv_stat.erl index b5379e80d..4dbb40061 100644 --- a/src/riak_kv_stat.erl +++ b/src/riak_kv_stat.erl @@ -291,27 +291,6 @@ do_update(ngrrepl_error) -> ok = exometer:update([?PFX, ?APP, node, puts, ngrrepl_error], 1); do_update({ngrrepl_srcdiscard, C}) -> ok = exometer:update([?PFX, ?APP, node, puts, ngrrepl_srcdiscard], C); -do_update({ttaaefs, all_check}) -> - ok = exometer:update([?PFX, ?APP, ttaaefs_manager, all_check], 1); -do_update({ttaaefs, day_check}) -> - ok = exometer:update([?PFX, ?APP, ttaaefs_manager, day_check], 1); -do_update({ttaaefs, hour_check}) -> - ok = exometer:update([?PFX, ?APP, ttaaefs_manager, hour_check], 1); -do_update({ttaaefs, range_check}) -> - ok = exometer:update([?PFX, ?APP, ttaaefs_manager, range_check], 1); -do_update({ttaaefs, sync_sync, Microsecs}) -> - ok = exometer:update([?PFX, ?APP, ttaaefs_manager, sync, count], 1), - ok = exometer:update([?PFX, ?APP, ttaaefs_manager, sync, time], Microsecs); -do_update({ttaaefs, sync_nosync, Microsecs}) -> - ok = exometer:update([?PFX, ?APP, ttaaefs_manager, nosync, count], 1), - ok = exometer:update([?PFX, ?APP, ttaaefs_manager, nosync, time], Microsecs); -do_update({ttaaefs, sync_fail, Microsecs}) -> - ok = exometer:update([?PFX, ?APP, ttaaefs_manager, fail, count], 1), - ok = exometer:update([?PFX, ?APP, ttaaefs_manager, fail, time], Microsecs); -do_update({ttaaefs, src_ahead, C}) -> - ok = exometer:update([?PFX, ?APP, ttaaefs_manager, src_ahead], C); -do_update({ttaaefs, snk_ahead, C}) -> - ok = exometer:update([?PFX, ?APP, ttaaefs_manager, snk_ahead], C); do_update(skipped_read_repairs) -> ok = exometer:update([?PFX, ?APP, node, gets, skipped_read_repairs], 1); do_update(coord_redir) -> diff --git a/src/riak_kv_vnode.erl b/src/riak_kv_vnode.erl index 016ed4698..a63bc0b35 100644 --- a/src/riak_kv_vnode.erl +++ b/src/riak_kv_vnode.erl @@ -4490,9 +4490,9 @@ log_key_amnesia(VId, Obj, InEpoch, InCntr, Coord, LocalEpoch, LocalCntr) -> % so a read repair is prompted riak_kv_reader:request_read({B, K}) end, - ?LOG_WARNING("Inbound clock entry for ~p in ~p/~p greater than local." ++ - "Epochs: {In:~p Local:~p}. Counters: {In:~p Local:~p}.", - [VId, B, K, InEpoch, LocalEpoch, InCntr, LocalCntr]). + ?LOG_WARNING("Inbound clock entry for ~p in ~p/~p greater than local." + "Epochs: {In:~p Local:~p}. Counters: {In:~p Local:~p}.", + [VId, B, K, InEpoch, LocalEpoch, InCntr, LocalCntr]). %% @private generate an epoch actor, and update the vnode state. -spec new_key_epoch(#state{}) -> {EpochActor :: binary(), #state{}}. From c1271c9b937b1611bfb73f89aaeae7fce64ba9da Mon Sep 17 00:00:00 2001 From: Andrei Zavada Date: Wed, 23 Nov 2022 03:48:29 +0200 Subject: [PATCH 11/15] Merge corrections Merges removed the stat updates for ttaae full-sync (detected by riak_test). A log had been introduced in riak_kv_replrtq_peer what could crash (detected by riak_test). The safety change to avoid coordination in full-sync by setting time for first work item from beginning of next hour, makes sense with 24 slices (one per hour) ... but less sense with different values. riak_test which uses a very high slice_count to avoid delays then failed. # Conflicts: # src/riak_kv_replrtq_peer.erl # src/riak_kv_ttaaefs_manager.erl --- src/riak_kv_replrtq_peer.erl | 4 ++-- src/riak_kv_stat.erl | 21 +++++++++++++++++++++ 2 files changed, 23 insertions(+), 2 deletions(-) diff --git a/src/riak_kv_replrtq_peer.erl b/src/riak_kv_replrtq_peer.erl index 7b8b76403..6946271a5 100644 --- a/src/riak_kv_replrtq_peer.erl +++ b/src/riak_kv_replrtq_peer.erl @@ -212,8 +212,8 @@ do_discovery(QueueName, PeerInfo, Type) -> ok; CurrentPeers when is_list(CurrentPeers) -> ?LOG_INFO( - "Type=~w discovery old_peers=~w new_peers=~w", - [Type, length(CurrentPeers), length(DiscoveredPeers)]) + "Type=~w discovery old_peers=~w new_peers=~w", + [Type, length(CurrentPeers), length(DiscoveredPeers)]) end, riak_kv_replrtq_snk:add_snkqueue(QueueName, DiscoveredPeers, diff --git a/src/riak_kv_stat.erl b/src/riak_kv_stat.erl index 4dbb40061..b5379e80d 100644 --- a/src/riak_kv_stat.erl +++ b/src/riak_kv_stat.erl @@ -291,6 +291,27 @@ do_update(ngrrepl_error) -> ok = exometer:update([?PFX, ?APP, node, puts, ngrrepl_error], 1); do_update({ngrrepl_srcdiscard, C}) -> ok = exometer:update([?PFX, ?APP, node, puts, ngrrepl_srcdiscard], C); +do_update({ttaaefs, all_check}) -> + ok = exometer:update([?PFX, ?APP, ttaaefs_manager, all_check], 1); +do_update({ttaaefs, day_check}) -> + ok = exometer:update([?PFX, ?APP, ttaaefs_manager, day_check], 1); +do_update({ttaaefs, hour_check}) -> + ok = exometer:update([?PFX, ?APP, ttaaefs_manager, hour_check], 1); +do_update({ttaaefs, range_check}) -> + ok = exometer:update([?PFX, ?APP, ttaaefs_manager, range_check], 1); +do_update({ttaaefs, sync_sync, Microsecs}) -> + ok = exometer:update([?PFX, ?APP, ttaaefs_manager, sync, count], 1), + ok = exometer:update([?PFX, ?APP, ttaaefs_manager, sync, time], Microsecs); +do_update({ttaaefs, sync_nosync, Microsecs}) -> + ok = exometer:update([?PFX, ?APP, ttaaefs_manager, nosync, count], 1), + ok = exometer:update([?PFX, ?APP, ttaaefs_manager, nosync, time], Microsecs); +do_update({ttaaefs, sync_fail, Microsecs}) -> + ok = exometer:update([?PFX, ?APP, ttaaefs_manager, fail, count], 1), + ok = exometer:update([?PFX, ?APP, ttaaefs_manager, fail, time], Microsecs); +do_update({ttaaefs, src_ahead, C}) -> + ok = exometer:update([?PFX, ?APP, ttaaefs_manager, src_ahead], C); +do_update({ttaaefs, snk_ahead, C}) -> + ok = exometer:update([?PFX, ?APP, ttaaefs_manager, snk_ahead], C); do_update(skipped_read_repairs) -> ok = exometer:update([?PFX, ?APP, node, gets, skipped_read_repairs], 1); do_update(coord_redir) -> From 8df351f64d7885ab64a0a52fd33d843b3f2f4e46 Mon Sep 17 00:00:00 2001 From: Andrei Zavada Date: Wed, 23 Nov 2022 03:49:26 +0200 Subject: [PATCH 12/15] Tags for release # Conflicts: # rebar.config --- rebar.config | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rebar.config b/rebar.config index b2953eb88..0b355cae6 100644 --- a/rebar.config +++ b/rebar.config @@ -53,6 +53,6 @@ {riak_pipe, {git, "https://github.com/basho/riak_pipe.git", {branch, "develop"}}}, {riak_dt, {git, "https://github.com/basho/riak_dt.git", {branch, "develop"}}}, {riak_api, {git, "https://github.com/basho/riak_api.git", {branch, "develop"}}}, - {kv_index_tictactree, {git, "https://github.com/martinsumner/kv_index_tictactree.git", {branch, "develop-3.1"}}}, - {rhc, {git, "https://github.com/basho/riak-erlang-http-client", {branch, "develop-3.2-otp24"}}} + {kv_index_tictactree, {git, "https://github.com/martinsumner/kv_index_tictactree.git", {tag, "1.0.2"}}}, + {rhc, {git, "https://github.com/basho/riak-erlang-http-client", {tag, "3.0.10"}}} ]}. From 1d003a4fda28a781103760505f81d4227f4f9fd5 Mon Sep 17 00:00:00 2001 From: Andrei Zavada Date: Wed, 23 Nov 2022 04:19:01 +0200 Subject: [PATCH 13/15] rebase oops --- src/riak_kv_replrtq_src.erl | 17 ----------------- 1 file changed, 17 deletions(-) diff --git a/src/riak_kv_replrtq_src.erl b/src/riak_kv_replrtq_src.erl index 9ec0f6208..b029685d5 100644 --- a/src/riak_kv_replrtq_src.erl +++ b/src/riak_kv_replrtq_src.erl @@ -335,23 +335,6 @@ init([FilePath]) -> {OL, QL} = get_limits(), - MapToQOverflow = - fun({QueueName, _QF, _QA}) -> - {QueueName, empty_overflow_queue(QueueName, FilePath)} - end, - MaptoQCache = - fun({QueueName, _QF, _QA}) -> - {QueueName, empty_local_queue()} - end, - QO = lists:map(MapToQOverflow, QFM), - QC = lists:map(MaptoQCache, QFM), - LogFreq = - app_helper:get_env( - riak_kv, - replrtq_logfrequency, - ?LOG_TIMER_SECONDS * 1000), - erlang:send_after(LogFreq, self(), log_queue), - {ok, #state{queue_filtermap = QFM, queue_overflow = QO, queue_local = QC, From d360020bdd076d79b643f8306e08029ea1070476 Mon Sep 17 00:00:00 2001 From: Andrei Zavada Date: Wed, 23 Nov 2022 18:13:23 +0200 Subject: [PATCH 14/15] correctly name and refer to riakhhtpc app --- rebar.config | 2 +- src/riak_kv.app.src | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/rebar.config b/rebar.config index 0b355cae6..251d12678 100644 --- a/rebar.config +++ b/rebar.config @@ -54,5 +54,5 @@ {riak_dt, {git, "https://github.com/basho/riak_dt.git", {branch, "develop"}}}, {riak_api, {git, "https://github.com/basho/riak_api.git", {branch, "develop"}}}, {kv_index_tictactree, {git, "https://github.com/martinsumner/kv_index_tictactree.git", {tag, "1.0.2"}}}, - {rhc, {git, "https://github.com/basho/riak-erlang-http-client", {tag, "3.0.10"}}} + {riakhttpc, {git, "https://github.com/basho/riak-erlang-http-client", {tag, "3.0.10"}}} ]}. diff --git a/src/riak_kv.app.src b/src/riak_kv.app.src index 663237879..36deaa90a 100644 --- a/src/riak_kv.app.src +++ b/src/riak_kv.app.src @@ -26,7 +26,7 @@ redbug, recon, riakc, - rhc + riakhttpc ]}, {registered, []}, {mod, {riak_kv_app, []}}, From 7613888417cbb78b5ac0777e4cb0f2112320da7e Mon Sep 17 00:00:00 2001 From: James M <35449344+JMercerGit@users.noreply.github.com> Date: Wed, 6 Mar 2024 12:30:43 +0000 Subject: [PATCH 15/15] Fixing some remaining riak-admin relics Fixing some remaining riak-admin relics that should be `riak admin` --- src/riak_kv_console.erl | 2 +- src/riak_kv_stat_bc.erl | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/riak_kv_console.erl b/src/riak_kv_console.erl index e6cb38eea..ad8ea8222 100644 --- a/src/riak_kv_console.erl +++ b/src/riak_kv_console.erl @@ -692,7 +692,7 @@ repair_2i(Args) -> error; {error, Reason} -> io:format("Error: ~p\n", [Reason]), - io:format("Usage: riak-admin repair-2i [--speed [1-100]] ...\n", []), + io:format("Usage: riak admin repair-2i [--speed [1-100]] ...\n", []), io:format("Speed defaults to 100 (full speed)\n", []), io:format("If no partitions are given, all partitions in the\n" "node are repaired\n", []), diff --git a/src/riak_kv_stat_bc.erl b/src/riak_kv_stat_bc.erl index 1353fc054..84c20a16d 100644 --- a/src/riak_kv_stat_bc.erl +++ b/src/riak_kv_stat_bc.erl @@ -23,7 +23,7 @@ %% @doc riak_kv_stat_bc is a module that maps the new riak_kv_stats metrics %% to the old set of stats. It exists to maintain backwards compatibility for -%% those using the `/stats' endpoint and `riak-admin status'. This module +%% those using the `/stats' endpoint and `riak admin status'. This module %% should be considered soon to be deprecated and temporary. %% %% Legacy stats: