diff --git a/README.md b/README.md index 8211eb5..7783a4f 100644 --- a/README.md +++ b/README.md @@ -29,16 +29,15 @@ and [h3](https://h3geo.org/) (a hexagonal hierarchical geospatial indexing syste ## Project structure -| Directory | Description | -|-----------------------------------------------------|-------------------------------------------| -| [`./cmd`](./cmd) | CLI for making gRPC requests | -| [`./idl`](./idl/coop/drivers/dispatch/v1beta1) | Protobufs (Interface Definition Language) | -| [`./internal/app`](./internal/app) | App dependency injection / initialization | -| [`./internal/distance`](internal/service/distance) | Google Maps Distance Matrix logic | -| [`./internal/idl`](./internal/idl) | Auto-generated protobufs | -| [`./internal/models`](./internal/models) | Auto-generated ORM / models | -| [`./internal/service`](./internal/service) | Service layer / Business logic | -| [`./schema`](./schema) | SQL migration scripts | +| Directory | Description | +|--------------------------------------------------|-------------------------------------------| +| [`./cmd`](./cmd) | CLI for making gRPC requests | +| [`./idl`](./idl/coop/drivers/dispatch/v1beta1) | Protobufs (Interface Definition Language) | +| [`./internal/app`](./internal/app) | App dependency injection / initialization | +| [`./internal/idl`](./internal/idl) | Auto-generated protobufs | +| [`./internal/models`](./internal/models) | Auto-generated ORM / models | +| [`./internal/service`](./internal/service) | Service layer / Business logic | +| [`./schema`](./schema) | SQL migration scripts | ## How does it work diff --git a/go.mod b/go.mod index 1e21728..0f72fd7 100644 --- a/go.mod +++ b/go.mod @@ -4,13 +4,17 @@ go 1.18 require ( github.com/XSAM/otelsql v0.14.1 + github.com/codingsince1985/geo-golang v1.8.1 github.com/envoyproxy/protoc-gen-validate v0.1.0 github.com/friendsofgo/errors v0.9.2 github.com/go-ozzo/ozzo-validation/v4 v4.3.0 + github.com/gojuno/go.osrm v0.1.0 github.com/google/go-cmp v0.5.8 github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 github.com/kat-co/vala v0.0.0-20170210184112-42e1d8b61f12 + github.com/kr/pretty v0.2.0 github.com/lib/pq v1.10.5 + github.com/paulmach/go.geo v0.0.0-20180829195134-22b514266d33 github.com/rs/xid v1.4.0 github.com/sethvargo/go-envconfig v0.6.0 github.com/spf13/cobra v1.4.0 @@ -39,6 +43,7 @@ require ( ) require ( + github.com/benbjohnson/clock v1.3.0 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.1.2 // indirect github.com/davecgh/go-spew v1.1.1 // indirect @@ -52,9 +57,11 @@ require ( github.com/google/uuid v1.3.0 // indirect github.com/hashicorp/hcl v1.0.0 // indirect github.com/inconshreveable/mousetrap v1.0.0 // indirect + github.com/kr/text v0.1.0 // indirect github.com/magiconair/properties v1.8.5 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect github.com/mitchellh/mapstructure v1.4.2 // indirect + github.com/paulmach/go.geojson v1.4.0 // indirect github.com/pelletier/go-toml v1.9.4 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus/client_golang v1.12.1 // indirect diff --git a/go.sum b/go.sum index e3ae13d..4f176c0 100644 --- a/go.sum +++ b/go.sum @@ -95,6 +95,8 @@ github.com/cncf/xds/go v0.0.0-20210922020428-25de7278fc84/go.mod h1:eXthEFrGJvWH github.com/cncf/xds/go v0.0.0-20211001041855-01bcc9b48dfe/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= github.com/cncf/xds/go v0.0.0-20211011173535-cb28da3451f1/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= github.com/cockroachdb/apd v1.1.0/go.mod h1:8Sl8LxpKi29FqWXR16WEFZRNSz3SoPzUzeMeY4+DwBQ= +github.com/codingsince1985/geo-golang v1.8.1 h1:6B+Ce5QkbSglCtesiNRkSYMRDDQsYrv4XKM3jJVdyTw= +github.com/codingsince1985/geo-golang v1.8.1/go.mod h1:Ue7HAjKwwCAbqB5Q0YskqqnIX8XjMHL5Jq2fsSrI2T8= github.com/coreos/go-semver v0.3.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= github.com/coreos/go-systemd/v22 v22.3.2/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= github.com/cpuguy83/go-md2man/v2 v2.0.0/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= @@ -151,6 +153,8 @@ github.com/gofrs/uuid v3.2.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRx github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= +github.com/gojuno/go.osrm v0.1.0 h1:M9RH1raBe7D72preWxFOw9WafGftGBQyoGNkG6upAT0= +github.com/gojuno/go.osrm v0.1.0/go.mod h1:XPCHB/Ir2/vHnqhKlfUxIiUGHFtTzgrRxD89JdkJhrs= github.com/golang-sql/civil v0.0.0-20190719163853-cb61b32ac6fe/go.mod h1:8vg3r2VgvsThLBIFL93Qb5yWzgyZWhEmBwUJWevAkK0= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= @@ -335,6 +339,11 @@ github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRW github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= +github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ= +github.com/paulmach/go.geo v0.0.0-20180829195134-22b514266d33 h1:doG/0aLlWE6E4ndyQlkAQrPwaojghwz1IlmH0kjTdyk= +github.com/paulmach/go.geo v0.0.0-20180829195134-22b514266d33/go.mod h1:btFYk/ltlMU7ZKguHS7zQrwHYCtLoXGTaa44OsPbEVw= +github.com/paulmach/go.geojson v1.4.0 h1:5x5moCkCtDo5x8af62P9IOAYGQcYHtxz2QJ3x1DoCgY= +github.com/paulmach/go.geojson v1.4.0/go.mod h1:YaKx1hKpWF+T2oj2lFJPsW/t1Q5e1jQI61eoQSTwpIs= github.com/pelletier/go-toml v1.9.3/go.mod h1:u1nR/EPcESfeI/szUZKdtJ0xRNbUoANCkoOuaOx1Y+c= github.com/pelletier/go-toml v1.9.4 h1:tjENF6MfZAg8e4ZmZTeWaWiT2vXtsoO6+iuOjFhECwM= github.com/pelletier/go-toml v1.9.4/go.mod h1:u1nR/EPcESfeI/szUZKdtJ0xRNbUoANCkoOuaOx1Y+c= diff --git a/internal/app/service/service.go b/internal/app/service/service.go index 160bd42..4aa45ba 100644 --- a/internal/app/service/service.go +++ b/internal/app/service/service.go @@ -3,33 +3,32 @@ package service import ( "database/sql" "fmt" - "github.com/friendsofgo/errors" "github.com/kevinmichaelchen/api-dispatch/internal/service" "github.com/kevinmichaelchen/api-dispatch/internal/service/db" - "github.com/kevinmichaelchen/api-dispatch/internal/service/distance" + "github.com/kevinmichaelchen/api-dispatch/internal/service/geo" "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" "go.uber.org/fx" "go.uber.org/zap" - "googlemaps.github.io/maps" + gmaps "googlemaps.github.io/maps" "os" ) var Module = fx.Module("service", fx.Provide( NewService, - NewDistanceService, + NewGeoService, NewMapsClient, NewDataStore, ), ) -type Params struct { +type ServiceParams struct { fx.In DataStore *db.Store - DistanceService *distance.Service `optional:"true"` + DistanceService *geo.Service } -func NewService(p Params) *service.Service { +func NewService(p ServiceParams) *service.Service { return service.NewService(p.DataStore, p.DistanceService) } @@ -37,14 +36,15 @@ func NewDataStore(sqlDB *sql.DB) *db.Store { return db.NewStore(sqlDB) } -func NewMapsClient() (*maps.Client, error) { +func NewMapsClient(logger *zap.Logger) (*gmaps.Client, error) { apiKey := os.Getenv("API_KEY") if apiKey == "" { - return nil, errors.New("missing API_KEY for Google Maps") + logger.Warn("Missing API Key for Google Maps... Initializing in degraded state...") + return nil, nil } - c, err := maps.NewClient( - maps.WithAPIKey(apiKey), - maps.WithHTTPClient(otelhttp.DefaultClient), + c, err := gmaps.NewClient( + gmaps.WithAPIKey(apiKey), + gmaps.WithHTTPClient(otelhttp.DefaultClient), ) if err != nil { return nil, fmt.Errorf("failed to build Google Maps client: %w", err) @@ -52,9 +52,11 @@ func NewMapsClient() (*maps.Client, error) { return c, nil } -func NewDistanceService(logger *zap.Logger, client *maps.Client) (*distance.Service, error) { - if client == nil { - return nil, errors.New("no maps client") - } - return distance.NewService(client), nil +type GeoServiceParams struct { + fx.In + GoogleClient *gmaps.Client `optional:"true"` +} + +func NewGeoService(logger *zap.Logger, p GeoServiceParams) (*geo.Service, error) { + return geo.NewService(p.GoogleClient, otelhttp.DefaultClient), nil } diff --git a/internal/service/dispatch.go b/internal/service/dispatch.go index 2abcb00..6e2846b 100644 --- a/internal/service/dispatch.go +++ b/internal/service/dispatch.go @@ -4,9 +4,10 @@ import ( "context" "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap/ctxzap" "github.com/kevinmichaelchen/api-dispatch/internal/idl/coop/drivers/dispatch/v1beta1" - "github.com/kevinmichaelchen/api-dispatch/internal/service/distance" "github.com/kevinmichaelchen/api-dispatch/internal/service/money" "github.com/kevinmichaelchen/api-dispatch/internal/service/ranking" + "github.com/kevinmichaelchen/api-dispatch/pkg/maps" + "github.com/kevinmichaelchen/api-dispatch/pkg/maps/distance" "go.uber.org/zap" "google.golang.org/protobuf/types/known/durationpb" "google.golang.org/protobuf/types/known/timestamppb" @@ -23,15 +24,11 @@ func (s *Service) GetNearestDrivers( ctx context.Context, req *v1beta1.GetNearestDriversRequest, ) (*v1beta1.GetNearestDriversResponse, error) { - logger := ctxzap.Extract(ctx) - err := validate(req, req) if err != nil { return nil, err } - trafficAware := s.distanceSvc != nil - // Query database nearby, err := s.dataStore.GetNearbyDriverLocations(ctx, req.GetPickupLocation()) if err != nil { @@ -55,32 +52,17 @@ func (s *Service) GetNearestDrivers( results = results[:maxResults] } - // In the event we have no Google Maps client and are operating in a - // degraded state, k-ring-sorting is still pretty good. + // The initial sort will be based on H3 resolutions and k-rings results = ranking.SortResultsByKRing(results) - // Enrich results with distance/duration info from Google Maps API + // Enrich results (e.g., with distance/duration info, among other things) var driverLocations []*v1beta1.LatLng for _, result := range results { driverLocations = append(driverLocations, result.GetLocation()) } - var pickupAddress string - if trafficAware { - out, err := s.distanceSvc.BetweenPoints(ctx, distance.BetweenPointsInput{ - PickupLocations: []*v1beta1.LatLng{req.GetPickupLocation()}, - DriverLocations: driverLocations, - }) - if err != nil { - return nil, err - } - for i, info := range out.Info { - logger.Info("received distance matrix info", zap.Any("info", info)) - results[i].Duration = durationpb.New(info.Duration) - results[i].DistanceMeters = float64(info.DistanceMeters) - // the driver is always the origin - results[i].Address = info.OriginAddress - pickupAddress = info.DestinationAddress - } + matrixOut, err := s.enrichNearbyDrivers(ctx, results, driverLocations, req.GetPickupLocation()) + if err != nil { + return nil, err } // Final ranking/sorting pass @@ -94,7 +76,7 @@ func (s *Service) GetNearestDrivers( return &v1beta1.GetNearestDriversResponse{ Results: results, - PickupAddress: pickupAddress, + PickupAddress: matrixOut.DestinationAddresses[0], }, nil } @@ -108,8 +90,6 @@ func (s *Service) GetNearestTrips( return nil, err } - trafficAware := s.distanceSvc != nil - // Query database nearby, err := s.dataStore.GetNearbyTrips(ctx, req.GetDriverLocation()) if err != nil { @@ -134,34 +114,19 @@ func (s *Service) GetNearestTrips( results = results[:maxResults] } - // In the event we have no Google Maps client and are operating in a - // degraded state, k-ring-sorting is still pretty good. + // The initial sort will be based on H3 resolutions and k-rings results = ranking.SortResultsByKRing(results) - // Enrich results with distance/duration info from Google Maps API - var locations []*v1beta1.LatLng + // Enrich results (e.g., with distance/duration info, among other things) + var pickupLocations []*v1beta1.LatLng for _, result := range results { - locations = append(locations, result.GetLocation()) + pickupLocations = append(pickupLocations, result.GetLocation()) } - if trafficAware { - out, err := s.distanceSvc.BetweenPoints(ctx, distance.BetweenPointsInput{ - PickupLocations: locations, - DriverLocations: []*v1beta1.LatLng{req.GetDriverLocation()}, - }) - if err != nil { - return nil, err - } - for i, info := range out.Info { - results[i].Duration = durationpb.New(info.Duration) - results[i].DistanceMeters = float64(info.DistanceMeters) - // the driver is always the origin, the pickup is the destination - results[i].Address = info.DestinationAddress - } + _, err = s.enrichNearbyTrips(ctx, results, req.GetDriverLocation(), pickupLocations) + if err != nil { + return nil, err } - // Enrich results - enrichTripsWithFakeData(results) - // Final ranking/sorting pass results = ranking.RankTrips(results) @@ -176,13 +141,69 @@ func (s *Service) GetNearestTrips( }, nil } -func enrichTripsWithFakeData(in []*v1beta1.SearchResult) { - for idx := range in { - e := in[idx] +func (s *Service) enrichNearbyDrivers( + ctx context.Context, + results []*v1beta1.SearchResult, + driverLocations []*v1beta1.LatLng, + pickupLocation *v1beta1.LatLng, +) (*distance.MatrixResponse, error) { + logger := ctxzap.Extract(ctx) + + out, err := s.distanceSvc.BetweenPoints(ctx, distance.BetweenPointsInput{ + // the driver location(s) is/are always the origin(s) + Origins: toLatLngs(driverLocations), + Destinations: toLatLngs([]*v1beta1.LatLng{pickupLocation}), + }) + if err != nil { + return nil, err + } + + for i, row := range out.Rows { + for _, elem := range row.Elements { + logger.Info("Got Distance Matrix element", zap.Any("elem", elem)) + results[i].Duration = durationpb.New(elem.Duration) + results[i].DistanceMeters = float64(elem.Distance) + results[i].Address = out.OriginAddresses[i] + } + } + + return out, nil +} + +func (s *Service) enrichNearbyTrips( + ctx context.Context, + results []*v1beta1.SearchResult, + driverLocation *v1beta1.LatLng, + pickupLocations []*v1beta1.LatLng, +) (*distance.MatrixResponse, error) { + logger := ctxzap.Extract(ctx) + + out, err := s.distanceSvc.BetweenPoints(ctx, distance.BetweenPointsInput{ + // the driver location(s) is/are always the origin(s) + Origins: toLatLngs([]*v1beta1.LatLng{driverLocation}), + Destinations: toLatLngs(pickupLocations), + }) + if err != nil { + return nil, err + } + + for idx := range results { + e := results[idx] t := e.GetTrip() t.ScheduledFor = timestamppb.New(randomTime()) t.ExpectedPayment = randomMoney() } + + for _, row := range out.Rows { + for i, elem := range row.Elements { + logger.Info("Got Distance Matrix element", zap.Any("elem", elem)) + results[i].Duration = durationpb.New(elem.Duration) + results[i].DistanceMeters = float64(elem.Distance) + results[i].Address = out.DestinationAddresses[i] + } + } + + return out, nil } func randomTime() time.Time { @@ -197,3 +218,14 @@ func randomMoney() *v1beta1.Money { f := float64(randomUnits) + (float64(randomCents) / float64(100)) return money.ConvertFloatToMoney(f) } + +func toLatLngs(in []*v1beta1.LatLng) []maps.LatLng { + var out []maps.LatLng + for _, e := range in { + out = append(out, maps.LatLng{ + Lat: e.GetLatitude(), + Lng: e.GetLongitude(), + }) + } + return out +} diff --git a/internal/service/distance/distance.go b/internal/service/distance/distance.go deleted file mode 100644 index 4a6ab59..0000000 --- a/internal/service/distance/distance.go +++ /dev/null @@ -1,99 +0,0 @@ -package distance - -import ( - "context" - "github.com/kevinmichaelchen/api-dispatch/internal/idl/coop/drivers/dispatch/v1beta1" - "googlemaps.github.io/maps" - "time" -) - -type Service struct { - client *maps.Client -} - -func NewService(client *maps.Client) *Service { - return &Service{ - client: client, - } -} - -type BetweenPointsInput struct { - PickupLocations []*v1beta1.LatLng - DriverLocations []*v1beta1.LatLng -} - -type BetweenPointsOutput struct { - Info []Info -} - -type Info struct { - DistanceMeters int - Duration time.Duration - OriginAddress string - DestinationAddress string -} - -func (s *Service) BetweenPoints(ctx context.Context, in BetweenPointsInput) (*BetweenPointsOutput, error) { - driverPlaceIDs, err := locationsToPlaceIDs(ctx, s.client, in.DriverLocations) - if err != nil { - return nil, err - } - - pickupPlaceIDs, err := locationsToPlaceIDs(ctx, s.client, in.PickupLocations) - if err != nil { - return nil, err - } - - res, err := betweenPlaces(ctx, s.client, betweenPlacesInput{ - originPlaceIDs: driverPlaceIDs, - destinationPlaceIDs: pickupPlaceIDs, - }) - if err != nil { - return nil, err - } - - var out []Info - for i, fromOrigin := range res.Rows { - for j, toDestination := range fromOrigin.Elements { - out = append(out, Info{ - DistanceMeters: toDestination.Distance.Meters, - Duration: toDestination.Duration, - OriginAddress: res.OriginAddresses[i], - DestinationAddress: res.DestinationAddresses[j], - }) - } - } - - return &BetweenPointsOutput{ - Info: out, - }, nil -} - -type betweenPlacesInput struct { - originPlaceIDs []string - destinationPlaceIDs []string -} - -func betweenPlaces(ctx context.Context, c *maps.Client, in betweenPlacesInput) (*maps.DistanceMatrixResponse, error) { - var origins []string - for _, placeID := range in.originPlaceIDs { - origins = append(origins, "place_id:"+placeID) - } - var destinations []string - for _, placeID := range in.destinationPlaceIDs { - destinations = append(destinations, "place_id:"+placeID) - } - return c.DistanceMatrix(ctx, &maps.DistanceMatrixRequest{ - Origins: origins, - Destinations: destinations, - Mode: "", - Language: "", - Avoid: "", - Units: "", - DepartureTime: "", - ArrivalTime: "", - TrafficModel: "", - TransitMode: nil, - TransitRoutingPreference: "", - }) -} diff --git a/internal/service/distance/reverse_geocode.go b/internal/service/distance/reverse_geocode.go deleted file mode 100644 index 9595128..0000000 --- a/internal/service/distance/reverse_geocode.go +++ /dev/null @@ -1,163 +0,0 @@ -package distance - -import ( - "context" - "errors" - "fmt" - "github.com/kevinmichaelchen/api-dispatch/internal/idl/coop/drivers/dispatch/v1beta1" - "golang.org/x/sync/errgroup" - "googlemaps.github.io/maps" - "strconv" - "sync" - "sync/atomic" -) - -const ( - parallelizationFactor = 10 -) - -var errNoResults = errors.New("no results found for coordinates") - -type Place struct { - ID string - Address string - Types []string -} - -type LatLng struct { - Lat float64 - Lng float64 -} - -// locationsToPlaceIDs reverse geocodes a list of geographic coordinates. -// It uses parallelization with the errgroup package, since Google Maps does not -// offer a way to reverse geocode in bulk, and since each individual request -// would take roughly 150 ms. -// Inspired by: -// https://www.fullstory.com/blog/why-errgroup-withcontext-in-golang-server-handlers/ -func locationsToPlaceIDs(ctx context.Context, c *maps.Client, locations []*v1beta1.LatLng) ([]string, error) { - g, ctx := errgroup.WithContext(ctx) - - locationsChan := make(chan *v1beta1.LatLng) - - // Step 1: Produce - g.Go(func() error { - defer close(locationsChan) - for _, location := range locations { - select { - case <-ctx.Done(): - return ctx.Err() - case locationsChan <- location: - } - } - return nil - }) - - type Result struct { - LatLng LatLng - GeocodingResults []maps.GeocodingResult - } - results := make(chan Result) - - // Step 2: Map - nWorkers := parallelizationFactor - workers := int32(nWorkers) - for i := 0; i < nWorkers; i++ { - g.Go(func() error { - defer func() { - // Last one out closes shop - if atomic.AddInt32(&workers, -1) == 0 { - close(results) - } - }() - - for location := range locationsChan { - geocodingResults, err := reverseGeocode(ctx, c, location) - if err != nil { - return fmt.Errorf("failed to reverse geocode location: %w", err) - } else { - result := Result{ - LatLng: LatLng{ - Lat: location.GetLatitude(), - Lng: location.GetLongitude(), - }, - GeocodingResults: geocodingResults, - } - select { - case <-ctx.Done(): - return ctx.Err() - case results <- result: - } - } - } - return nil - }) - } - - // Step 3: Reduce - // A normal Go map isn't thread-safe, so we use sync.Map - ret := new(sync.Map) - g.Go(func() error { - for result := range results { - ret.Store(result.LatLng, result.GeocodingResults) - } - return nil - }) - - // Wait blocks until all function calls from the Go method have returned, then - // returns the first non-nil error (if any) from them. - err := g.Wait() - if err != nil { - return nil, err - } - - // Step 4: Convert sync.Map into a list - // The order of outputs has to correspond with the order of inputs - // e.g., if the input was [point1, point2] then the output should be - // [place1ID, place2ID] - var out []string - for _, location := range locations { - lat := location.GetLatitude() - lng := location.GetLongitude() - val, ok := ret.Load(LatLng{Lat: lat, Lng: lng}) - if !ok { - return nil, fmt.Errorf("could not find lat/lng for (%s, %s)", - strconv.FormatFloat(lat, 'f', -1, 64), - strconv.FormatFloat(lng, 'f', -1, 64), - ) - } - geocodingResults, ok := val.([]maps.GeocodingResult) - if !ok { - return nil, errors.New("expected sync.Map values to be []maps.GeocodingResult") - } - if len(geocodingResults) == 0 { - return nil, errors.New("sync.Map value was empty slice of []maps.GeocodingResult") - } - - // Assuming Google Maps API returns places ordered in such a way that - // the first element is the most salient/relevant. - out = append(out, geocodingResults[0].PlaceID) - } - return out, nil -} - -func reverseGeocode(ctx context.Context, c *maps.Client, location *v1beta1.LatLng) ([]maps.GeocodingResult, error) { - results, err := c.ReverseGeocode(ctx, &maps.GeocodingRequest{ - LatLng: &maps.LatLng{ - Lat: location.GetLatitude(), - Lng: location.GetLongitude(), - }, - ResultType: nil, - LocationType: nil, - PlaceID: "", - Language: "", - Custom: nil, - }) - if err != nil { - return nil, err - } - if len(results) == 0 { - return nil, errNoResults - } - return results, err -} diff --git a/internal/service/geo/geo.go b/internal/service/geo/geo.go new file mode 100644 index 0000000..befd5bb --- /dev/null +++ b/internal/service/geo/geo.go @@ -0,0 +1,47 @@ +package geo + +import ( + "context" + validation "github.com/go-ozzo/ozzo-validation/v4" + "github.com/kevinmichaelchen/api-dispatch/pkg/maps/distance" + "github.com/kevinmichaelchen/api-dispatch/pkg/maps/distance/google" + "github.com/kevinmichaelchen/api-dispatch/pkg/maps/distance/osrm" + gMaps "googlemaps.github.io/maps" + "net/http" +) + +type Service struct { + googleClient *gMaps.Client + httpClient *http.Client +} + +func NewService(client *gMaps.Client, httpClient *http.Client) *Service { + return &Service{ + googleClient: client, + httpClient: httpClient, + } +} + +func (s *Service) BetweenPoints(ctx context.Context, in distance.BetweenPointsInput) (*distance.MatrixResponse, error) { + err := validate(in) + if err != nil { + return nil, err + } + + // TODO if len(origins) > 25 || len(destinations) > 25, we need to partition/batch + + // Use Google Maps if there's an API key available + if s.googleClient != nil { + return google.BetweenPoints(ctx, s.googleClient, in) + } + + // Otherwise we'll back to using Open Source Routing Machine (OSRM) + return osrm.BetweenPoints(ctx, s.httpClient, in) +} + +func validate(i distance.BetweenPointsInput) error { + return validation.ValidateStruct(&i, + validation.Field(&i.Destinations, validation.Required, validation.Length(1, 25)), + validation.Field(&i.Origins, validation.Required, validation.Length(1, 25)), + ) +} diff --git a/internal/service/service.go b/internal/service/service.go index 99c85c0..d33e46c 100644 --- a/internal/service/service.go +++ b/internal/service/service.go @@ -4,7 +4,7 @@ import ( "context" "github.com/kevinmichaelchen/api-dispatch/internal/idl/coop/drivers/dispatch/v1beta1" "github.com/kevinmichaelchen/api-dispatch/internal/service/db" - "github.com/kevinmichaelchen/api-dispatch/internal/service/distance" + "github.com/kevinmichaelchen/api-dispatch/internal/service/geo" "github.com/kevinmichaelchen/api-dispatch/internal/service/health" "google.golang.org/grpc/codes" healthV1 "google.golang.org/grpc/health/grpc_health_v1" @@ -14,10 +14,10 @@ import ( type Service struct { dataStore *db.Store - distanceSvc *distance.Service + distanceSvc *geo.Service } -func NewService(dataStore *db.Store, distanceSvc *distance.Service) *Service { +func NewService(dataStore *db.Store, distanceSvc *geo.Service) *Service { return &Service{dataStore: dataStore, distanceSvc: distanceSvc} } diff --git a/pkg/maps/distance/distance.go b/pkg/maps/distance/distance.go new file mode 100644 index 0000000..4f3cd7e --- /dev/null +++ b/pkg/maps/distance/distance.go @@ -0,0 +1,46 @@ +package distance + +import ( + "github.com/kevinmichaelchen/api-dispatch/pkg/maps" + "time" +) + +type BetweenPointsInput struct { + Destinations []maps.LatLng + Origins []maps.LatLng +} + +type BetweenPlacesInput struct { + Origins []string + Destinations []string +} + +// MatrixResponse represents a Distance Matrix API response. +type MatrixResponse struct { + // OriginAddresses contains an array of addresses as returned by the API from + // your original request. + OriginAddresses []string `json:"origin_addresses"` + // DestinationAddresses contains an array of addresses as returned by the API + // from your original request. + DestinationAddresses []string `json:"destination_addresses"` + // Rows contains an array of elements. + Rows []MatrixElementsRow `json:"rows"` +} + +// MatrixElementsRow is a row of distance elements. +type MatrixElementsRow struct { + Elements []MatrixElement `json:"elements"` +} + +// MatrixElement is the travel distance and time for a pair of origin and +// destination. +type MatrixElement struct { + Status string `json:"status"` + // Duration is the length of time it takes to travel this route. + Duration time.Duration `json:"duration"` + // DurationInTraffic is the length of time it takes to travel this route + // considering traffic. + DurationInTraffic time.Duration `json:"duration_in_traffic"` + // Distance is the total distance (in meters) of this route. + Distance int `json:"distance"` +} diff --git a/pkg/maps/distance/google/google.go b/pkg/maps/distance/google/google.go new file mode 100644 index 0000000..3ae4180 --- /dev/null +++ b/pkg/maps/distance/google/google.go @@ -0,0 +1,103 @@ +package google + +import ( + "context" + "github.com/kevinmichaelchen/api-dispatch/pkg/maps/distance" + "github.com/kevinmichaelchen/api-dispatch/pkg/maps/geocode" + "github.com/kevinmichaelchen/api-dispatch/pkg/maps/geocode/google" + "googlemaps.github.io/maps" +) + +func BetweenPoints( + ctx context.Context, + client *maps.Client, + in distance.BetweenPointsInput) (*distance.MatrixResponse, error) { + + // Batch reverse-geocode the origins + geocoder := google.NewGeocoder(client) + + parallelizationFactor := 10 + + originsOut, err := geocode.BatchReverseGeocode(ctx, geocoder, in.Origins, parallelizationFactor) + if err != nil { + return nil, err + } + + destinationsOut, err := geocode.BatchReverseGeocode(ctx, geocoder, in.Destinations, parallelizationFactor) + if err != nil { + return nil, err + } + + var origins []string + for _, e := range originsOut { + origins = append(origins, e.PlaceID) + } + + var destinations []string + for _, e := range destinationsOut { + destinations = append(destinations, e.PlaceID) + } + + res, err := BetweenPlaces(ctx, client, distance.BetweenPlacesInput{ + Origins: origins, + Destinations: destinations, + }) + if err != nil { + return nil, err + } + + // TODO throw in some reverse-geocoding for origins+destination addresses + + return toRes(res), nil +} + +func BetweenPlaces(ctx context.Context, c *maps.Client, in distance.BetweenPlacesInput) (*maps.DistanceMatrixResponse, error) { + var origins []string + for _, placeID := range in.Origins { + origins = append(origins, "place_id:"+placeID) + } + var destinations []string + for _, placeID := range in.Destinations { + destinations = append(destinations, "place_id:"+placeID) + } + return c.DistanceMatrix(ctx, &maps.DistanceMatrixRequest{ + Origins: origins, + Destinations: destinations, + Mode: "", + Language: "", + Avoid: "", + Units: "", + DepartureTime: "", + ArrivalTime: "", + TrafficModel: "", + TransitMode: nil, + TransitRoutingPreference: "", + }) +} + +func toRes(res *maps.DistanceMatrixResponse) *distance.MatrixResponse { + var rows []distance.MatrixElementsRow + for i := range res.Rows { + row := res.Rows[i] + var elements []distance.MatrixElement + for j := range row.Elements { + elem := row.Elements[j] + elements = append(elements, toElem(elem)) + } + rows = append(rows, distance.MatrixElementsRow{Elements: elements}) + } + return &distance.MatrixResponse{ + OriginAddresses: nil, + DestinationAddresses: nil, + Rows: rows, + } +} + +func toElem(res *maps.DistanceMatrixElement) distance.MatrixElement { + return distance.MatrixElement{ + Status: res.Status, + Duration: res.Duration, + DurationInTraffic: res.DurationInTraffic, + Distance: res.Distance.Meters, + } +} diff --git a/pkg/maps/distance/osrm/distance.go b/pkg/maps/distance/osrm/distance.go new file mode 100644 index 0000000..eaacfe5 --- /dev/null +++ b/pkg/maps/distance/osrm/distance.go @@ -0,0 +1,123 @@ +package osrm + +import ( + "context" + "errors" + "fmt" + osrm "github.com/gojuno/go.osrm" + "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap/ctxzap" + "github.com/kevinmichaelchen/api-dispatch/pkg/maps/distance" + "github.com/kevinmichaelchen/api-dispatch/pkg/maps/geocode" + osrm2 "github.com/kevinmichaelchen/api-dispatch/pkg/maps/geocode/osrm" + geo "github.com/paulmach/go.geo" + "go.uber.org/zap" + "net/http" + "time" +) + +var ( + errFailedRequest = errors.New("failed OSRM request") +) + +func BetweenPoints( + ctx context.Context, + httpClient *http.Client, + in distance.BetweenPointsInput) (*distance.MatrixResponse, error) { + logger := ctxzap.Extract(ctx) + + // Batch reverse-geocode all locations + geocoder := osrm2.NewGeocoder(httpClient) + parallelizationFactor := 10 + geocodeOut, err := geocode.BatchReverseGeocode( + ctx, + geocoder, + append(in.Origins, in.Destinations...), + parallelizationFactor) + if err != nil { + return nil, err + } + + serverURL := "https://router.project-osrm.org" + client := osrm.NewWithConfig(osrm.Config{ + ServerURL: serverURL, + Client: httpClient, + }) + + res, err := client.Table(ctx, toTableReq(in)) + + if err != nil { + return nil, fmt.Errorf("failed OSRM request: %w", err) + } + + if res.Code != "Ok" { + logger.Error("received non-ok OSRM response", + zap.String("error.code", res.Code), + zap.String("error.msg", res.Message), + ) + return nil, errFailedRequest + } + + return fromTableRes(res, geocodeOut[:len(in.Origins)], geocodeOut[len(in.Origins):]), nil +} + +func toTableReq(in distance.BetweenPointsInput) osrm.TableRequest { + var pointSet geo.PointSet + for _, p := range in.Origins { + pointSet = append(pointSet, geo.Point{p.Lng, p.Lat}) + } + for _, p := range in.Destinations { + pointSet = append(pointSet, geo.Point{p.Lng, p.Lat}) + } + return osrm.TableRequest{ + Profile: "car", + Coordinates: osrm.NewGeometryFromPointSet(pointSet), + Sources: makeRange(0, len(in.Origins)-1), + Destinations: makeRange(len(in.Origins), len(in.Origins)+len(in.Destinations)-1), + } +} + +func makeRange(min, max int) []int { + a := make([]int, max-min+1) + for i := range a { + a[i] = min + i + } + return a +} + +func fromTableRes( + res *osrm.TableResponse, + originsOut []*geocode.ReverseGeocodeOutput, + destinationsOut []*geocode.ReverseGeocodeOutput, +) *distance.MatrixResponse { + var rows []distance.MatrixElementsRow + for i := range res.Durations { + origin := res.Durations[i] + var elements []distance.MatrixElement + for j := range origin { + destination := origin[j] + duration := time.Duration(destination) * time.Second + elements = append(elements, distance.MatrixElement{ + Status: "", + Duration: duration, + DurationInTraffic: 0, + Distance: 0, + }) + } + rows = append(rows, distance.MatrixElementsRow{Elements: elements}) + } + var originAddresses []string + for idx := range originsOut { + geoResults := originsOut[idx] + originAddresses = append(originAddresses, geoResults.FormattedAddress) + } + var destinationAddresses []string + for idx := range destinationsOut { + geoResults := destinationsOut[idx] + destinationAddresses = append(destinationAddresses, geoResults.FormattedAddress) + } + return &distance.MatrixResponse{ + OriginAddresses: originAddresses, + DestinationAddresses: destinationAddresses, + Rows: rows, + } +} diff --git a/pkg/maps/distance/osrm/distance_test.go b/pkg/maps/distance/osrm/distance_test.go new file mode 100644 index 0000000..0a4a3ec --- /dev/null +++ b/pkg/maps/distance/osrm/distance_test.go @@ -0,0 +1,34 @@ +package osrm + +import ( + "context" + "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap/ctxzap" + "github.com/kevinmichaelchen/api-dispatch/pkg/maps" + "github.com/kevinmichaelchen/api-dispatch/pkg/maps/distance" + "github.com/stretchr/testify/require" + "go.uber.org/zap/zaptest" + "net/http" + "testing" + "time" +) + +func TestBetweenPoints(t *testing.T) { + a := maps.LatLng{ + Lat: 40.791680675548136, + Lng: -73.9650115649754, + } + b := maps.LatLng{ + Lat: 40.76866089218841, + Lng: -73.98145413365043, + } + + ctx := ctxzap.ToContext(context.Background(), zaptest.NewLogger(t)) + res, err := BetweenPoints(ctx, new(http.Client), distance.BetweenPointsInput{ + Destinations: []maps.LatLng{a}, + Origins: []maps.LatLng{b}, + }) + require.NoError(t, err) + require.Len(t, res.Rows, 1) + require.Len(t, res.Rows[0].Elements, 1) + require.Greater(t, res.Rows[0].Elements[0].Duration, time.Duration(0)) +} diff --git a/pkg/maps/distance/osrm/doc.go b/pkg/maps/distance/osrm/doc.go new file mode 100644 index 0000000..46bcbbb --- /dev/null +++ b/pkg/maps/distance/osrm/doc.go @@ -0,0 +1,13 @@ +/* +Package osrm provides functions over the Open Source Routing Machine project. + +We use OSRM's Table service when we want to quickly determine the duration +and/or distance between two points (without traffic awareness). This could be +useful for saving money/requests to Google Maps API every time an anonymous end +user sees an estimate quote for a potential trip. +https://github.com/Project-OSRM/osrm-backend/blob/master/docs/http.md#table-service + +We use OpenStreetMap's /reverse endpoint for reverse geocoding. +https://nominatim.org/release-docs/develop/api/Reverse/ +*/ +package osrm diff --git a/pkg/maps/geocode/geocode.go b/pkg/maps/geocode/geocode.go new file mode 100644 index 0000000..0226d91 --- /dev/null +++ b/pkg/maps/geocode/geocode.go @@ -0,0 +1,137 @@ +package geocode + +import ( + "context" + "fmt" + "github.com/codingsince1985/geo-golang" + "github.com/kevinmichaelchen/api-dispatch/pkg/maps" + "golang.org/x/sync/errgroup" + "strconv" + "sync" + "sync/atomic" +) + +// ReverseGeocodeOutput is a representation of a reverse-geocode request. +// It should be generic enough to work for various APIs: +// https://github.com/codingsince1985/geo-golang +type ReverseGeocodeOutput struct { + PlaceID string + + // Embedded Address struct from codingsince1985/geo-golang + geo.Address +} + +type ReverseGeocoder interface { + ReverseGeocode(ctx context.Context, location maps.LatLng) (*ReverseGeocodeOutput, error) +} + +// BatchReverseGeocode reverse-geocodes a list of geographic coordinates. +// +// It uses parallelization with the errgroup package, since some APIs (e.g., +// Google Maps) do not offer a way to reverse geocode in bulk, and since each +// individual request would take roughly 150 ms. +// Inspired by: +// https://www.fullstory.com/blog/why-errgroup-withcontext-in-golang-server-handlers/ +func BatchReverseGeocode( + ctx context.Context, + reverseGeocoder ReverseGeocoder, + locations []maps.LatLng, + parallelizationFactor int, +) ([]*ReverseGeocodeOutput, error) { + g, ctx := errgroup.WithContext(ctx) + + locationsChan := make(chan maps.LatLng) + + // Step 1: Produce + g.Go(func() error { + defer close(locationsChan) + for _, location := range locations { + select { + case <-ctx.Done(): + return ctx.Err() + case locationsChan <- location: + } + } + return nil + }) + + type Result struct { + LatLng maps.LatLng + GeocodingResults *ReverseGeocodeOutput + } + results := make(chan Result) + + // Step 2: Map + nWorkers := parallelizationFactor + workers := int32(nWorkers) + for i := 0; i < nWorkers; i++ { + g.Go(func() error { + defer func() { + // Last one out closes shop + if atomic.AddInt32(&workers, -1) == 0 { + close(results) + } + }() + + for location := range locationsChan { + geocodingResults, err := reverseGeocoder.ReverseGeocode(ctx, location) + if err != nil { + return fmt.Errorf("failed to reverse geocode location: %w", err) + } else { + result := Result{ + LatLng: maps.LatLng{ + Lat: location.Lat, + Lng: location.Lng, + }, + GeocodingResults: geocodingResults, + } + select { + case <-ctx.Done(): + return ctx.Err() + case results <- result: + } + } + } + return nil + }) + } + + // Step 3: Reduce + // A normal Go map isn't thread-safe, so we use sync.Map + ret := new(sync.Map) + g.Go(func() error { + for result := range results { + ret.Store(result.LatLng, result.GeocodingResults) + } + return nil + }) + + // Wait blocks until all function calls from the Go method have returned, then + // returns the first non-nil error (if any) from them. + err := g.Wait() + if err != nil { + return nil, err + } + + // Step 4: Convert sync.Map into a list + // The order of outputs has to correspond with the order of inputs + // e.g., if the input was [point1, point2] then the output should be + // [place1ID, place2ID] + var out []*ReverseGeocodeOutput + for _, location := range locations { + val, ok := ret.Load(location) + if !ok { + return nil, fmt.Errorf("could not find lat/lng for (%s, %s)", + strconv.FormatFloat(location.Lat, 'f', -1, 64), + strconv.FormatFloat(location.Lng, 'f', -1, 64), + ) + } + geocodingResults, ok := val.(*ReverseGeocodeOutput) + if !ok { + return nil, fmt.Errorf("expected sync.Map values to be *ReverseGeocodeOutput, not %T", val) + } + + out = append(out, geocodingResults) + } + return out, nil +} diff --git a/pkg/maps/geocode/google/google.go b/pkg/maps/geocode/google/google.go new file mode 100644 index 0000000..04a2ca3 --- /dev/null +++ b/pkg/maps/geocode/google/google.go @@ -0,0 +1,65 @@ +package google + +import ( + "context" + "errors" + "github.com/codingsince1985/geo-golang" + maps "github.com/kevinmichaelchen/api-dispatch/pkg/maps" + "github.com/kevinmichaelchen/api-dispatch/pkg/maps/geocode" + gMaps "googlemaps.github.io/maps" +) + +var errNoResults = errors.New("no results found for coordinates") + +type Geocoder struct { + client *gMaps.Client +} + +func NewGeocoder(client *gMaps.Client) *Geocoder { + return &Geocoder{client: client} +} + +func (g *Geocoder) ReverseGeocode(ctx context.Context, location maps.LatLng) (*geocode.ReverseGeocodeOutput, error) { + results, err := g.client.ReverseGeocode(ctx, &gMaps.GeocodingRequest{ + LatLng: &gMaps.LatLng{ + Lat: location.Lat, + Lng: location.Lng, + }, + ResultType: nil, + LocationType: nil, + PlaceID: "", + Language: "", + Custom: nil, + }) + if err != nil { + return nil, err + } + if len(results) == 0 { + return nil, errNoResults + } + return toReverseGeocodeOutput(results), err +} + +func toReverseGeocodeOutput(in []gMaps.GeocodingResult) *geocode.ReverseGeocodeOutput { + // Assuming Google Maps API returns places ordered in such a way that + // the first element is the most salient/relevant. + bestResult := in[0] + return &geocode.ReverseGeocodeOutput{ + PlaceID: bestResult.PlaceID, + Address: geo.Address{ + FormattedAddress: bestResult.FormattedAddress, + // TODO set other components + Street: "", + HouseNumber: "", + Suburb: "", + Postcode: "", + State: "", + StateCode: "", + StateDistrict: "", + County: "", + Country: "", + CountryCode: "", + City: "", + }, + } +} diff --git a/pkg/maps/geocode/osrm/reverse_geocode.go b/pkg/maps/geocode/osrm/reverse_geocode.go new file mode 100644 index 0000000..41c6b62 --- /dev/null +++ b/pkg/maps/geocode/osrm/reverse_geocode.go @@ -0,0 +1,30 @@ +package osrm + +import ( + "context" + "github.com/codingsince1985/geo-golang/openstreetmap" + "github.com/kevinmichaelchen/api-dispatch/pkg/maps" + "github.com/kevinmichaelchen/api-dispatch/pkg/maps/geocode" + "net/http" +) + +type Geocoder struct { + client *http.Client +} + +func NewGeocoder(client *http.Client) *Geocoder { + return &Geocoder{client: client} +} + +func (g *Geocoder) ReverseGeocode(ctx context.Context, location maps.LatLng) (*geocode.ReverseGeocodeOutput, error) { + geocoder := openstreetmap.Geocoder() + + address, err := geocoder.ReverseGeocode(location.Lat, location.Lng) + if err != nil { + return nil, err + } + + return &geocode.ReverseGeocodeOutput{ + Address: *address, + }, nil +} diff --git a/pkg/maps/geocode/osrm/reverse_geocode_test.go b/pkg/maps/geocode/osrm/reverse_geocode_test.go new file mode 100644 index 0000000..f27fa06 --- /dev/null +++ b/pkg/maps/geocode/osrm/reverse_geocode_test.go @@ -0,0 +1,20 @@ +package osrm + +import ( + "context" + "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap/ctxzap" + "github.com/kevinmichaelchen/api-dispatch/pkg/maps" + "github.com/kr/pretty" + "github.com/stretchr/testify/require" + "go.uber.org/zap/zaptest" + "net/http" + "testing" +) + +func TestReverseGeocode(t *testing.T) { + g := NewGeocoder(new(http.Client)) + ctx := ctxzap.ToContext(context.Background(), zaptest.NewLogger(t)) + out, err := g.ReverseGeocode(ctx, maps.LatLng{Lat: -37.813611, Lng: 144.963056}) + require.NoError(t, err) + pretty.Println(out) +} diff --git a/pkg/maps/maps.go b/pkg/maps/maps.go new file mode 100644 index 0000000..b8f9245 --- /dev/null +++ b/pkg/maps/maps.go @@ -0,0 +1,6 @@ +package maps + +type LatLng struct { + Lat float64 + Lng float64 +}