diff --git a/src/main/java/io/lettuce/core/cluster/RedisAdvancedClusterReactiveCommandsImpl.java b/src/main/java/io/lettuce/core/cluster/RedisAdvancedClusterReactiveCommandsImpl.java index 0ebb1475e..85c56b2dc 100644 --- a/src/main/java/io/lettuce/core/cluster/RedisAdvancedClusterReactiveCommandsImpl.java +++ b/src/main/java/io/lettuce/core/cluster/RedisAdvancedClusterReactiveCommandsImpl.java @@ -256,7 +256,6 @@ public Flux> mget(K... keys) { @SuppressWarnings({ "unchecked", "rawtypes" }) public Flux> mget(Iterable keys) { - List keyList = LettuceLists.newList(keys); Map> partitioned = SlotHash.partition(codec, keyList); @@ -264,37 +263,25 @@ public Flux> mget(Iterable keys) { return super.mget(keyList); } - List>> publishers = new ArrayList<>(); - - for (Map.Entry> entry : partitioned.entrySet()) { - publishers.add(super.mget(entry.getValue())); - } - - Flux> fluxes = Flux.mergeSequential(publishers); + List>> publishers = partitioned.values().stream().map(super::mget) + .collect(Collectors.toList()); - Mono>> map = fluxes.collectList().map(vs -> { - - KeyValue[] values = new KeyValue[vs.size()]; + return Flux.mergeSequential(publishers).collectList().map(results -> { + KeyValue[] values = new KeyValue[keyList.size()]; int offset = 0; - for (Map.Entry> entry : partitioned.entrySet()) { + for (List partitionKeys : partitioned.values()) { for (int i = 0; i < keyList.size(); i++) { - - int index = entry.getValue().indexOf(keyList.get(i)); - if (index == -1) { - continue; + int index = partitionKeys.indexOf(keyList.get(i)); + if (index != -1) { + values[i] = results.get(offset + index); } - - values[i] = vs.get(offset + index); } - - offset += entry.getValue().size(); + offset += partitionKeys.size(); } return Arrays.asList(values); - }); - - return map.flatMapIterable(keyValues -> keyValues); + }).flatMapMany(Flux::fromIterable); } @Override