Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*add solr upgrade test which tests upgrade to 4.10.4 from 4.7.0 and a… #799

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/rt.erl
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,8 @@
wait_until_bucket_type_status/3,
whats_up/0
]).
-export_type([interfaces/0,
conn_info/0]).

-type strings() :: [string(),...] | [].
-type capability() :: atom() | {atom(), tuple()}.
Expand Down
143 changes: 127 additions & 16 deletions src/yokozuna_rt.erl
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,33 @@
-include_lib("eunit/include/eunit.hrl").
-include("yokozuna_rt.hrl").

-export([expire_trees/1,
-export([check_exists/2,
expire_trees/1,
host_entries/1,
remove_index_dirs/2,
rolling_upgrade/2,
rolling_upgrade/3,
search/4,
search/5,
search_expect/5,
search_expect/6,
search_expect/7,
verify_num_found_query/3,
wait_for_aae/1,
wait_for_full_exchange_round/2,
write_data/5]).

-type host() :: string().
-type portnum() :: integer().
-type count() :: non_neg_integer().
-type json_string() :: atom | string() | binary().

-define(FMT(S, Args), lists:flatten(io_lib:format(S, Args))).

-spec host_entries(rt:conn_info()) -> [{host(), portnum()}].
host_entries(ClusterConnInfo) ->
[riak_http(I) || {_,I} <- ClusterConnInfo].

%% @doc Write `Keys' via the PB inteface to a `Bucket' and have them
%% searchable in an `Index'.
-spec write_data([node()], pid(), index_name(), bucket(), [binary()]) -> ok.
Expand Down Expand Up @@ -79,21 +98,6 @@ rolling_upgrade(Cluster, Vsn, YZCfgChanges) ->
end || {SolrPort, Node} <- Cluster2],
ok.

-spec config_merge(proplists:proplist(), proplists:proplist()) ->
orddict:orddict() | proplists:proplist().
config_merge(DefaultCfg, NewCfg) when NewCfg /= [] ->
orddict:update(yokozuna,
fun(V) ->
orddict:merge(fun(_, _X, Y) -> Y end,
orddict:from_list(V),
orddict:from_list(
orddict:fetch(
yokozuna, NewCfg)))
end,
DefaultCfg);
config_merge(DefaultCfg, _NewCfg) ->
DefaultCfg.

%% @doc Use AAE status to verify that exchange has occurred for all
%% partitions since the time this function was invoked.
-spec wait_for_aae([node()]) -> ok.
Expand Down Expand Up @@ -162,6 +166,27 @@ expire_trees(Cluster) ->
timer:sleep(100),
ok.

%% @doc Remove index directories, removing the index.
-spec remove_index_dirs([node()], index_name()) -> ok.
remove_index_dirs(Nodes, IndexName) ->
IndexDirs = [rpc:call(Node, yz_index, index_dir, [IndexName]) ||
Node <- Nodes],
lager:info("Remove index dirs: ~p, on nodes: ~p~n",
[IndexDirs, Nodes]),
[rt:stop(ANode) || ANode <- Nodes],
[rt:del_dir(binary_to_list(IndexDir)) || IndexDir <- IndexDirs],
[rt:start(ANode) || ANode <- Nodes],
ok.

%% @doc Check if index/core exists in metadata, disk via yz_index:exists.
-spec check_exists([node()], index_name()) -> ok.
check_exists(Nodes, IndexName) ->
rt:wait_until(Nodes,
fun(N) ->
rpc:call(N, yz_index, exists, [IndexName])
end).

-spec verify_num_found_query([node()], index_name(), count()) -> ok.
verify_num_found_query(Cluster, Index, ExpectedCount) ->
F = fun(Node) ->
Pid = rt:pbc(Node),
Expand All @@ -172,3 +197,89 @@ verify_num_found_query(Cluster, Index, ExpectedCount) ->
end,
rt:wait_until(Cluster, F),
ok.

search_expect(HP, Index, Name, Term, Expect) ->
search_expect(yokozuna, HP, Index, Name, Term, Expect).

search_expect(Type, HP, Index, Name, Term, Expect) ->
{ok, "200", _, R} = search(Type, HP, Index, Name, Term),
verify_count_http(Expect, R).

search_expect(solr, {Host, Port}, Index, Name0, Term0, Shards, Expect)
when is_list(Shards), length(Shards) > 0 ->
Name = quote_unicode(Name0),
Term = quote_unicode(Term0),
URL = internal_solr_url(Host, Port, Index, Name, Term, Shards),
lager:info("Run search ~s", [URL]),
Opts = [{response_format, binary}],
{ok, "200", _, R} = ibrowse:send_req(URL, [], get, [], Opts),
verify_count_http(Expect, R).

search(HP, Index, Name, Term) ->
search(yokozuna, HP, Index, Name, Term).

search(Type, {Host, Port}, Index, Name, Term) when is_integer(Port) ->
search(Type, {Host, integer_to_list(Port)}, Index, Name, Term);

search(Type, {Host, Port}, Index, Name0, Term0) ->
Name = quote_unicode(Name0),
Term = quote_unicode(Term0),
FmtStr = case Type of
solr ->
"http://~s:~s/internal_solr/~s/select?q=~s:~s&wt=json";
yokozuna ->
"http://~s:~s/search/query/~s?q=~s:~s&wt=json"
end,
URL = ?FMT(FmtStr, [Host, Port, Index, Name, Term]),
lager:info("Run search ~s", [URL]),
Opts = [{response_format, binary}],
ibrowse:send_req(URL, [], get, [], Opts).

%%%===================================================================
%%% Private
%%%===================================================================

-spec verify_count_http(count(), json_string()) -> boolean().
verify_count_http(Expected, Resp) ->
Count = get_count_http(Resp),
lager:info("Expected: ~p, Actual: ~p", [Expected, Count]),
Expected == Count.

-spec get_count_http(json_string()) -> count().
get_count_http(Resp) ->
Struct = mochijson2:decode(Resp),
kvc:path([<<"response">>, <<"numFound">>], Struct).

-spec riak_http({node(), rt:interfaces()} | rt:interfaces()) ->
{host(), portnum()}.
riak_http({_Node, ConnInfo}) ->
riak_http(ConnInfo);
riak_http(ConnInfo) ->
proplists:get_value(http, ConnInfo).

-spec config_merge(proplists:proplist(), proplists:proplist()) ->
orddict:orddict() | proplists:proplist().
config_merge(DefaultCfg, NewCfg) when NewCfg /= [] ->
orddict:update(yokozuna,
fun(V) ->
orddict:merge(fun(_, _X, Y) -> Y end,
orddict:from_list(V),
orddict:from_list(
orddict:fetch(
yokozuna, NewCfg)))
end,
DefaultCfg);
config_merge(DefaultCfg, _NewCfg) ->
DefaultCfg.

internal_solr_url(Host, Port, Index) ->
?FMT("http://~s:~B/internal_solr/~s", [Host, Port, Index]).
internal_solr_url(Host, Port, Index, Name, Term, Shards) ->
Ss = [internal_solr_url(Host, ShardPort, Index)
|| {_, ShardPort} <- Shards],
?FMT("http://~s:~B/internal_solr/~s/select?wt=json&q=~s:~s&shards=~s",
[Host, Port, Index, Name, Term, string:join(Ss, ",")]).

quote_unicode(Value) ->
mochiweb_util:quote_plus(binary_to_list(
unicode:characters_to_binary(Value))).
27 changes: 5 additions & 22 deletions tests/yz_core_properties_create_unload.erl
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ test_core_props_removal(Cluster, RandNodes, KeyCount, Pid) ->
lager:info("Remove core.properties file in each index data dir"),
remove_core_props(RandNodes, ?INDEX),

check_exists(Cluster, ?INDEX),
yokozuna_rt:check_exists(Cluster, ?INDEX),

lager:info("Write one more piece of data"),
ok = rt:pbc_write(Pid, ?BUCKET, <<"foo">>, <<"foo">>, "text/plain"),
Expand All @@ -93,9 +93,9 @@ test_core_props_removal(Cluster, RandNodes, KeyCount, Pid) ->

test_remove_index_dirs(Cluster, RandNodes, KeyCount, Pid) ->
lager:info("Remove index directories on each node and let them recreate/reindex"),
remove_index_dirs(RandNodes, ?INDEX),
yokozuna_rt:remove_index_dirs(RandNodes, ?INDEX),

check_exists(Cluster, ?INDEX),
yokozuna_rt:check_exists(Cluster, ?INDEX),

yokozuna_rt:expire_trees(Cluster),
yokozuna_rt:wait_for_aae(Cluster),
Expand All @@ -112,9 +112,9 @@ test_remove_segment_infos_and_rebuild(Cluster, RandNodes, KeyCount, Pid) ->

lager:info("To fix, we remove index directories on each node and let them recreate/reindex"),

remove_index_dirs(RandNodes, ?INDEX),
yokozuna_rt:remove_index_dirs(RandNodes, ?INDEX),

check_exists(Cluster, ?INDEX),
yokozuna_rt:check_exists(Cluster, ?INDEX),

yokozuna_rt:expire_trees(Cluster),
yokozuna_rt:wait_for_aae(Cluster),
Expand Down Expand Up @@ -147,23 +147,6 @@ remove_core_props(Nodes, IndexName) ->
[file:delete(PropsFile) || PropsFile <- PropsFiles],
ok.

%% @doc Check if index/core exists in metadata, disk via yz_index:exists.
check_exists(Nodes, IndexName) ->
rt:wait_until(Nodes,
fun(N) ->
rpc:call(N, yz_index, exists, [IndexName])
end).

%% @doc Remove index directories, removing the index.
remove_index_dirs(Nodes, IndexName) ->
IndexDirs = [rpc:call(Node, yz_index, index_dir, [IndexName]) ||
Node <- Nodes],
lager:info("Remove index dirs: ~p, on nodes: ~p~n",
[IndexDirs, Nodes]),
[rt:stop(ANode) || ANode <- Nodes],
[rt:del_dir(binary_to_list(IndexDir)) || IndexDir <- IndexDirs],
[rt:start(ANode) || ANode <- Nodes].

%% @doc Remove lucence segment info files to check if reindexing will occur
%% on re-creation/re-indexing.
remove_segment_infos(Nodes, IndexName) ->
Expand Down
122 changes: 122 additions & 0 deletions tests/yz_solr_upgrade.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
%% -------------------------------------------------------------------
%%
%% Copyright (c) 2014 Basho Technologies, Inc.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%%--------------------------------------------------------------------
-module(yz_solr_upgrade).
-compile(export_all).
-include_lib("eunit/include/eunit.hrl").
-include_lib("riakc/include/riakc.hrl").

-define(INDEX, <<"test_solr_upgrade_idx">>).
-define(BUCKET, <<"test_solr_upgrade_bucket">>).
-define(SEQMAX, 1000).
-define(CFG,
[{riak_core,
[
{ring_creation_size, 16},
{anti_entropy_build_limit, {100, 1000}},
{anti_entropy_concurrency, 8}
]},
{yokozuna,
[
{anti_entropy_tick, 1000},
{enabled, true}
]}
]).

confirm() ->
TestMetaData = riak_test_runner:metadata(),
OldVsn = proplists:get_value(upgrade_version, TestMetaData, previous),

[Node1, Node2|RestNodes] = Cluster = rt:build_cluster(lists:duplicate(4, {OldVsn, ?CFG})),

%% Generate keys, YZ only supports UTF-8 compatible keys
GenKeys = [<<N:64/integer>> || N <- lists:seq(1, ?SEQMAX),
not lists:any(
fun(E) -> E > 127 end,
binary_to_list(<<N:64/integer>>))],
KeyCount = length(GenKeys),
lager:info("KeyCount ~p", [KeyCount]),

OldPid = rt:pbc(rt:select_random(RestNodes)),

yokozuna_rt:write_data(Cluster, OldPid, ?INDEX, ?BUCKET, GenKeys),
%% wait for solr soft commit
timer:sleep(1100),

riakc_pb_socket:stop(OldPid),

HP1 = random_hp([Node1, Node2]),
yokozuna_rt:search_expect(HP1, ?INDEX, <<"*">>, <<"*">>, KeyCount),

%% Upgrade Node 1 and 2
lager:info("Upgrade to solr version 4.10.4 on Nodes 1 - 2"),
upgrade_to_current([Node1, Node2]),

lager:info("Write one more piece of data"),
Pid2 = rt:pbc(Node2),
ok = rt:pbc_write(Pid2, ?BUCKET, <<"foo">>, <<"foo">>, "text/plain"),
timer:sleep(1100),
riakc_pb_socket:stop(Pid2),

HP2 = random_hp(RestNodes),
yokozuna_rt:search_expect(HP2, ?INDEX, <<"*">>, <<"*">>, KeyCount + 1),

%% Upgrade Rest
lager:info("Upgrade to solr version 4.10.4 on Nodes 3 - 4"),
upgrade_to_current(RestNodes),

lager:info("Write one more piece of data"),
RandPid = rt:pbc(rt:select_random(RestNodes)),
ok = rt:pbc_write(RandPid, ?BUCKET, <<"food">>, <<"food">>, "text/plain"),
timer:sleep(1100),
riakc_pb_socket:stop(RandPid),

yokozuna_rt:expire_trees(Cluster),
yokozuna_rt:wait_for_aae(Cluster),
HP3 = random_hp(Cluster),
yokozuna_rt:search_expect(HP3, ?INDEX, <<"*">>, <<"*">>, KeyCount + 2),

lager:info("Downgrade cluster to previous version. Once upgraded, the
the index format will change, throwing an error, unless you
reindex (& resync the AAE trees) that core/search_index again."),
yokozuna_rt:rolling_upgrade(Cluster, previous),

yokozuna_rt:remove_index_dirs(Cluster, ?INDEX),
yokozuna_rt:check_exists(Cluster, ?INDEX),
yokozuna_rt:expire_trees(Cluster),
yokozuna_rt:wait_for_aae(Cluster),

HP4 = random_hp(Cluster),
yokozuna_rt:search_expect(HP4, ?INDEX, <<"*">>, <<"*">>, KeyCount + 2),

pass.

random_hp(Nodes) ->
rt:select_random(yokozuna_rt:host_entries(
rt:connection_info(Nodes))).

upgrade_to_current(Nodes) ->
[rt:upgrade(ANode, current) || ANode <- Nodes],
[rt:wait_for_service(ANode, riak_kv) || ANode <- Nodes],
[rt:wait_for_service(ANode, yokozuna) || ANode <- Nodes].

downgrade_to_previous(Nodes) ->
[rt:upgrade(ANode, previous) || ANode <- Nodes],
[rt:wait_for_service(ANode, riak_kv) || ANode <- Nodes],
[rt:wait_for_service(ANode, yokozuna) || ANode <- Nodes].