Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for GEORADIUS #3756

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 117 additions & 2 deletions src/server/zset_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2846,7 +2846,7 @@ void GeoSearchStoreGeneric(ConnectionContext* cntx, const GeoShape& shape_ref, s
DCHECK(geo_ops.store == GeoStoreType::kStoreDist || geo_ops.store == GeoStoreType::kStoreHash);
ShardId dest_shard = Shard(geo_ops.store_key, shard_set->size());
DVLOG(1) << "store shard:" << dest_shard << ", key " << geo_ops.store_key;
AddResult add_result;
OpResult<AddResult> add_result;
vector<ScoredMemberView> smvec;
for (const auto& p : ga) {
if (geo_ops.store == GeoStoreType::kStoreDist) {
Expand Down Expand Up @@ -3004,6 +3004,118 @@ void ZSetFamily::GeoSearch(CmdArgList args, ConnectionContext* cntx) {
GeoSearchStoreGeneric(cntx, shape, key, member, geo_ops);
}

void ZSetFamily::GeoRadius(CmdArgList args, ConnectionContext* cntx) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a lot of code! 🙂

Quoting Redis docs:

It can be replaced by GEOSEARCH and GEOSEARCHSTORE with the BYRADIUS and FROMMEMBER arguments when migrating or writing new code.

So they already hint that GeoRadius is just a specification of GeoSearch. Maybe we can unite them somehow? They share count, asc/desc, withcord, withdist, storedist... Not mentioning parsing lon/lat and units

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a lot of code! 🙂

Quoting Redis docs:

It can be replaced by GEOSEARCH and GEOSEARCHSTORE with the BYRADIUS and FROMMEMBER arguments when migrating or writing new code.

So they already hint that GeoRadius is just a specification of GeoSearch. Maybe we can unite them somehow? They share count, asc/desc, withcord, withdist, storedist... Not mentioning parsing lon/lat and units

Thanks for reviewing. Updated as you suggested.

// parse arguments
string_view key = ArgS(args, 0);
GeoShape shape = {};
GeoSearchOpts geo_ops;

if (3 < args.size()) {
string_view longitude_str = ArgS(args, 1);
string_view latitude_str = ArgS(args, 2);
pair<double, double> longlat;
if (!ParseLongLat(longitude_str, latitude_str, &longlat)) {
string err =
absl::StrCat("-ERR invalid longitude,latitude pair ", longitude_str, ",", latitude_str);
return cntx->SendError(err, kSyntaxErrType);
}
shape.xy[0] = longlat.first;
shape.xy[1] = longlat.second;
} else {
return cntx->SendError(kSyntaxErr);
}

if (5 < args.size()) {
if (!ParseDouble(ArgS(args, 3), &shape.t.radius)) {
return cntx->SendError(kInvalidFloatErr);
}
string_view unit = ArgS(args, 4);
shape.conversion = ExtractUnit(unit);
geo_ops.conversion = shape.conversion;
if (shape.conversion == -1) {
return cntx->SendError("unsupported unit provided. please use M, KM, FT, MI");
}
shape.type = CIRCULAR_TYPE;
}

for (size_t i = 5; i < args.size(); ++i) {
ToUpper(&args[i]);

string_view cur_arg = ArgS(args, i);
if (cur_arg == "ASC") {
if (geo_ops.sorting != Sorting::kUnsorted) {
return cntx->SendError(kAscDescErr);
} else {
geo_ops.sorting = Sorting::kAsc;
}
} else if (cur_arg == "DESC") {
if (geo_ops.sorting != Sorting::kUnsorted) {
return cntx->SendError(kAscDescErr);
} else {
geo_ops.sorting = Sorting::kDesc;
}
} else if (cur_arg == "COUNT") {
if (i + 1 < args.size() && absl::SimpleAtoi(ArgS(args, i + 1), &geo_ops.count)) {
i++;
} else {
return cntx->SendError(kSyntaxErr);
}
if (i + 1 < args.size() && ArgS(args, i + 1) == "ANY") {
geo_ops.any = true;
i++;
}
} else if (cur_arg == "WITHCOORD") {
if (geo_ops.store != GeoStoreType::kNoStore) {
return cntx->SendError(kStoreCompatErr);
}
geo_ops.withcoord = true;
} else if (cur_arg == "WITHDIST") {
if (geo_ops.store != GeoStoreType::kNoStore) {
return cntx->SendError(kStoreCompatErr);
}
geo_ops.withdist = true;
} else if (cur_arg == "WITHHASH") {
if (geo_ops.store != GeoStoreType::kNoStore) {
return cntx->SendError(kStoreCompatErr);
}
geo_ops.withhash = true;
} else if (cur_arg == "STORE") {
if (geo_ops.store != GeoStoreType::kNoStore) {
return cntx->SendError(kStoreTypeErr);
} else if (geo_ops.withcoord || geo_ops.withdist || geo_ops.withhash) {
return cntx->SendError(kStoreCompatErr);
}
if (i + 1 < args.size()) {
geo_ops.store_key = ArgS(args, i + 1);
geo_ops.store = GeoStoreType::kStoreHash;
i++;
} else {
return cntx->SendError(kSyntaxErr);
}
} else if (cur_arg == "STOREDIST") {
if (geo_ops.store != GeoStoreType::kNoStore) {
return cntx->SendError(kStoreTypeErr);
} else if (geo_ops.withcoord || geo_ops.withdist || geo_ops.withhash) {
return cntx->SendError(kStoreCompatErr);
}
if (i + 1 < args.size()) {
geo_ops.store_key = ArgS(args, i + 1);
geo_ops.store = GeoStoreType::kStoreDist;
i++;
} else {
return cntx->SendError(kSyntaxErr);
}
} else {
return cntx->SendError(kSyntaxErr);
}
}
// parsing completed

// member must be empty
string_view member;
GeoSearchStoreGeneric(cntx, shape, key, member, geo_ops);
}

void ZSetFamily::GeoRadiusByMember(CmdArgList args, ConnectionContext* cntx) {
GeoShape shape = {};
GeoSearchOpts geo_ops;
Expand Down Expand Up @@ -3139,6 +3251,7 @@ constexpr uint32_t kGeoPos = READ | GEO | SLOW;
constexpr uint32_t kGeoDist = READ | GEO | SLOW;
constexpr uint32_t kGeoSearch = READ | GEO | SLOW;
constexpr uint32_t kGeoRadiusByMember = WRITE | GEO | SLOW;
constexpr uint32_t kGeoRadius = WRITE | GEO | SLOW;
} // namespace acl

void ZSetFamily::Register(CommandRegistry* registry) {
Expand Down Expand Up @@ -3191,7 +3304,9 @@ void ZSetFamily::Register(CommandRegistry* registry) {
<< CI{"GEODIST", CO::READONLY, -4, 1, 1, acl::kGeoDist}.HFUNC(GeoDist)
<< CI{"GEOSEARCH", CO::READONLY, -4, 1, 1, acl::kGeoSearch}.HFUNC(GeoSearch)
<< CI{"GEORADIUSBYMEMBER", CO::WRITE | CO::STORE_LAST_KEY, -4, 1, 1, acl::kGeoRadiusByMember}
.HFUNC(GeoRadiusByMember);
.HFUNC(GeoRadiusByMember)
<< CI{"GEORADIUS", CO::WRITE | CO::STORE_LAST_KEY, -4, 1, 1, acl::kGeoRadius}.HFUNC(
GeoRadius);
}

} // namespace dfly
1 change: 1 addition & 0 deletions src/server/zset_family.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ class ZSetFamily {
static void GeoDist(CmdArgList args, ConnectionContext* cntx);
static void GeoSearch(CmdArgList args, ConnectionContext* cntx);
static void GeoRadiusByMember(CmdArgList args, ConnectionContext* cntx);
static void GeoRadius(CmdArgList args, ConnectionContext* cntx);
};

} // namespace dfly
55 changes: 55 additions & 0 deletions src/server/zset_family_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1148,6 +1148,61 @@ TEST_F(ZSetFamilyTest, GeoRadiusByMember) {
"WITHHASH and WITHCOORDS options"));
}

// GEORADIUS key longitude latitude radius <m | km | ft | mi>
// [WITHCOORD] [WITHDIST] [WITHHASH] [COUNT count [ANY]] [ASC | DESC]
// [STORE key | STOREDIST key]

TEST_F(ZSetFamilyTest, GeoRadius) {
EXPECT_EQ(10, CheckedInt({"geoadd", "Europe", "13.4050", "52.5200", "Berlin", "3.7038",
"40.4168", "Madrid", "9.1427", "38.7369", "Lisbon", "2.3522",
"48.8566", "Paris", "16.3738", "48.2082", "Vienna", "4.8952",
"52.3702", "Amsterdam", "10.7522", "59.9139", "Oslo", "23.7275",
"37.9838", "Athens", "19.0402", "47.4979", "Budapest", "6.2603",
"53.3498", "Dublin"}));

auto resp = Run({"GEORADIUS", "invalid_key", "16.3738", "48.2082", "900", "KM"});
EXPECT_THAT(resp.GetVec().empty(), true);

resp = Run({"GEORADIUS", "America", "13.4050", "52.5200", "500", "KM", "WITHCOORD", "WITHDIST"});
EXPECT_THAT(resp.GetVec().empty(), true);

resp = Run({"GEORADIUS", "Europe", "130.4050", "52.5200", "10", "KM", "WITHCOORD", "WITHDIST"});
EXPECT_THAT(resp.GetVec().empty(), true);

resp = Run({"GEORADIUS", "Europe", "13.4050", "52.5200", "500", "KM", "COUNT", "3", "WITHCOORD",
"WITHDIST"});
EXPECT_THAT(
resp,
RespArray(ElementsAre(
RespArray(ElementsAre("Berlin", DoubleArg(0.00017343178521311378),
RespArray(ElementsAre(DoubleArg(13.4050), DoubleArg(52.5200))))),
RespArray(ElementsAre("Dublin", DoubleArg(487.5619030644293),
RespArray(ElementsAre(DoubleArg(6.2603), DoubleArg(53.3498))))))));

resp = Run(
{"GEORADIUS", "Europe", "13.4050", "52.5200", "500", "KM", "DESC", "WITHCOORD", "WITHDIST"});
EXPECT_THAT(
resp,
RespArray(ElementsAre(
RespArray(ElementsAre("Dublin", DoubleArg(487.5619030644293),
RespArray(ElementsAre(DoubleArg(6.2603), DoubleArg(53.3498))))),
RespArray(ElementsAre("Berlin", DoubleArg(0.00017343178521311378),
RespArray(ElementsAre(DoubleArg(13.4050), DoubleArg(52.5200))))))));

EXPECT_EQ(2, CheckedInt({"GEORADIUS", "Europe", "3.7038", "40.4168", "700", "KM", "STORE",
"store_key"}));
resp = Run({"ZRANGE", "store_key", "0", "-1"});
Copy link
Contributor Author

@azuredream azuredream Sep 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test case failed because zrange returned [] while it should be ["Madrid", "Lisbon"].
I used GDB to debug it. During GEORADIUS STORE, the result was found but didn't trigger OpAdd.
That's because only shard0 went through store_cb while shard->size() was 2.

    auto store_cb = [&](Transaction* t, EngineShard* shard) {
      if (shard->shard_id() == dest_shard) {      //0!=1
        ZParams zparams;
        zparams.override = true;
        add_result =
            OpAdd(t->GetOpArgs(shard), zparams, geo_ops.store_key, ScoredMemberSpan{smvec}).value();
      }
      return OpStatus::OK;
    };

GEORADIUSMEMBER has an identity test case at line# 1134 and it passed. GDB showed that both shard0 and shard1 went through store_cb, and data was written to shard1.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are the keys involved in this operation?

Copy link
Contributor Author

@azuredream azuredream Sep 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are the keys involved in this operation?

2 keys. From key was "Europe", writing to store_key. Please see the trimmed test case below.
{"GEORADIUSBYMEMBER", "Europe", "Madrid", "700", "KM", "STORE", "store_key"}
{"GEORADIUS", "Europe", "3.7038", "40.4168", "700", "KM", "STORE", "store_key"}

I added some prints in GeoSearchStoreGeneric().
Based on the test logs below, test case for GeoRadiusByMember was able to see both shard 1 and 0.
But test case for GeoRadius was not able to see shard 1.

Test Logs:

[ RUN      ] ZSetFamilyTest.GeoRadiusByMember

from key: Europe, from shard: 0
 member checking:0
match
 member checking:1
dest key:store_key ,dest shard:1
smvec:
 3.47177e+15 Madrid
 3.47312e+15 Lisbon
checking:0
checking:1
match
[       OK ] ZSetFamilyTest.GeoRadiusByMember (18 ms)
[ RUN      ] ZSetFamilyTest.GeoRadius

from key: Europe, from shard: 0
no member checking:0
match
dest key:store_key ,dest shard:1
smvec:
 3.47177e+15 Madrid
 3.47312e+15 Lisbon
checking:0
.../dragonfly/src/server/zset_family_test.cc:1191: Failure
Value of: resp
Expected: resp array (value: 16-byte object <E4-E0 06-6D D5-55 00-00 CE-E0 06-6D D5-55 00-00>)
  Actual: [] (of type facade::RespExpr), whose given property is Vec: []

Trimmed Test cases:

TEST_F(ZSetFamilyTest, GeoRadiusByMember) {
  EXPECT_EQ(10, CheckedInt({"geoadd",  "Europe",    "13.4050", "52.5200", "Berlin",   "3.7038",
                            "40.4168", "Madrid",    "9.1427",  "38.7369", "Lisbon",   "2.3522",
                            "48.8566", "Paris",     "16.3738", "48.2082", "Vienna",   "4.8952",
                            "52.3702", "Amsterdam", "10.7522", "59.9139", "Oslo",     "23.7275",
                            "37.9838", "Athens",    "19.0402", "47.4979", "Budapest", "6.2603",
                            "53.3498", "Dublin"}));
  EXPECT_EQ(
      2, CheckedInt({"GEORADIUSBYMEMBER", "Europe", "Madrid", "700", "KM", "STORE", "store_key"}));
  auto resp = Run({"ZRANGE", "store_key", "0", "-1"});
  EXPECT_THAT(resp, RespArray(ElementsAre("Madrid", "Lisbon")));
  resp = Run({"ZRANGE", "store_key", "0", "-1", "WITHSCORES"});
  EXPECT_THAT(resp,
              RespArray(ElementsAre("Madrid", "3471766229222696", "Lisbon", "3473121093062745")));
}

TEST_F(ZSetFamilyTest, GeoRadius) {
  EXPECT_EQ(10, CheckedInt({"geoadd",  "Europe",    "13.4050", "52.5200", "Berlin",   "3.7038",
                            "40.4168", "Madrid",    "9.1427",  "38.7369", "Lisbon",   "2.3522",
                            "48.8566", "Paris",     "16.3738", "48.2082", "Vienna",   "4.8952",
                            "52.3702", "Amsterdam", "10.7522", "59.9139", "Oslo",     "23.7275",
                            "37.9838", "Athens",    "19.0402", "47.4979", "Budapest", "6.2603",
                            "53.3498", "Dublin"}));
  EXPECT_EQ(2, CheckedInt({"GEORADIUS", "Europe", "3.7038", "40.4168", "700", "KM", "STORE",
                           "store_key"}));
  auto resp = Run({"ZRANGE", "store_key", "0", "-1"});
  EXPECT_THAT(resp, RespArray(ElementsAre("Madrid", "Lisbon")));
  resp = Run({"ZRANGE", "store_key", "0", "-1", "WITHSCORES"});
  EXPECT_THAT(resp,
              RespArray(ElementsAre("Madrid", "3471766229222696", "Lisbon", "3473121093062745")));
}
void GeoSearchStoreGeneric(ConnectionContext* cntx, const GeoShape& shape_ref, string_view key,
                           string_view member, const GeoSearchOpts& geo_ops) {
  GeoShape* shape = &(const_cast<GeoShape&>(shape_ref));
  auto* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder());

  std::cout<<std::endl;
  ShardId from_shard = Shard(key, shard_set->size());
  std::cout<<"from key: "<<key<<", from shard: "<<from_shard<<std::endl;

  if (!member.empty()) {
    // get shape.xy from member
    OpResult<double> member_score;
    auto cb = [&](Transaction* t, EngineShard* shard) {
      std::cout<<" member checking:"<<shard->shard_id()<<std::endl;
      if (shard->shard_id() == from_shard) {
        std::cout<<"match"<<std::endl;
        member_score = OpScore(t->GetOpArgs(shard), key, member);
      }
      return OpStatus::OK;
    };
    cntx->transaction->Execute(std::move(cb), false);
    auto member_sts = member_score.status();
    if (member_sts != OpStatus::OK) {
      std::cout<<"xxx"<<std::endl;
      cntx->transaction->Conclude();
      switch (member_sts) {
        case OpStatus::WRONG_TYPE:
          return cntx->SendError(kWrongTypeErr);
        case OpStatus::KEY_NOTFOUND:
          return rb->StartArray(0);
        case OpStatus::MEMBER_NOTFOUND:
          return cntx->SendError(kMemberNotFound);
        default:
          return cntx->SendError(member_sts);
      }
    }
    ScoreToLongLat(*member_score, shape->xy);
  } else {
    // verify key is valid
    OpResult<void> result;
    auto cb = [&](Transaction* t, EngineShard* shard) {
      std::cout<<"no member checking:"<<shard->shard_id()<<std::endl;
      if (shard->shard_id() == from_shard) {
        std::cout<<"match"<<std::endl;
        result = OpKeyExisted(t->GetOpArgs(shard), key);
      }
      return OpStatus::OK;
    };
    cntx->transaction->Execute(std::move(cb), false);
    auto result_sts = result.status();
    if (result_sts != OpStatus::OK) {
      std::cout<<"!!!"<<std::endl;
      cntx->transaction->Conclude();
      switch (result_sts) {
        case OpStatus::WRONG_TYPE:
          return cntx->SendError(kWrongTypeErr);
        case OpStatus::KEY_NOTFOUND:
          return rb->StartArray(0);
        default:
          return cntx->SendError(result_sts);
      }
    }
  }
...
  } else {
    // case 3: write mode, !kNoStore
    DCHECK(geo_ops.store == GeoStoreType::kStoreDist || geo_ops.store == GeoStoreType::kStoreHash);
    ShardId dest_shard = Shard(geo_ops.store_key, shard_set->size());
    std::cout<<"dest key:"<<geo_ops.store_key<<" ,dest shard:"<<dest_shard<<std::endl;
    DVLOG(1) << "store shard:" << dest_shard << ", key " << geo_ops.store_key;
    OpResult<AddResult> add_result;
    vector<ScoredMemberView> smvec;
    for (const auto& p : ga) {
      if (geo_ops.store == GeoStoreType::kStoreDist) {
        smvec.emplace_back(p.dist / geo_ops.conversion, p.member);
      } else {
        DCHECK(geo_ops.store == GeoStoreType::kStoreHash);
        smvec.emplace_back(p.score, p.member);
      }
    }
    std::cout<<"smvec:"<<std::endl;
    for (auto& si: smvec) {
      std::cout<<" "<<si.first<<" "<<si.second<<std::endl;
    }

    auto store_cb = [&](Transaction* t, EngineShard* shard) {
      std::cout<<"checking:"<<shard->shard_id()<<std::endl;
      if (shard->shard_id() == dest_shard) {
        std::cout<<"match"<<std::endl;
        ZParams zparams;
        zparams.override = true;
        add_result =
            OpAdd(t->GetOpArgs(shard), zparams, geo_ops.store_key, ScoredMemberSpan{smvec}).value();
      }
      return OpStatus::OK;
    };
    cntx->transaction->Execute(std::move(store_cb), true);

    rb->SendLong(smvec.size());
  }
}

EXPECT_THAT(resp, RespArray(ElementsAre("Madrid", "Lisbon")));
resp = Run({"ZRANGE", "store_key", "0", "-1", "WITHSCORES"});
EXPECT_THAT(resp,
RespArray(ElementsAre("Madrid", "3471766229222696", "Lisbon", "3473121093062745")));

EXPECT_EQ(2, CheckedInt({"GEORADIUS", "Europe", "3.7038", "40.4168", "700", "KM", "STOREDIST",
"store_dist_key"}));
resp = Run({"ZRANGE", "store_dist_key", "0", "-1", "WITHSCORES"});
EXPECT_THAT(resp, RespArray(ElementsAre("Madrid", "0", "Lisbon", "502.20769462704084")));
}

TEST_F(ZSetFamilyTest, RangeLimit) {
auto resp = Run({"ZRANGEBYSCORE", "", "0.0", "0.0", "limit", "0"});
EXPECT_THAT(resp, ErrArg("syntax error"));
Expand Down
Loading