Skip to content

Commit

Permalink
enhance grpc server, preparation to use general "Event" for all opens…
Browse files Browse the repository at this point in the history
…ea events
  • Loading branch information
benleb committed Jul 28, 2023
1 parent ab6f066 commit 98a6406
Show file tree
Hide file tree
Showing 6 changed files with 760 additions and 442 deletions.
156 changes: 103 additions & 53 deletions cmd/live.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,11 @@ import (
"github.com/spf13/viper"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/protobuf/encoding/protojson"
)

var manualParser = true

// liveCmd represents the live command.
var liveCmd = &cobra.Command{
Use: "live",
Expand Down Expand Up @@ -520,69 +523,116 @@ func testGRPC() {

gb.Prf("subscribing via grpc to: %s", style.BoldAlmostWhite(degendb.Listing.OpenseaEventName()))

subsriptionRequest := &seawatcher.SubscriptionRequest{EventTypes: []seawatcher.EventType{seawatcher.EventType_ITEM_LISTED}, Collections: gb.CollectionDB.OpenseaSlugs()} //nolint:nosnakecase
stream, err := client.GetItemListedEvents(context.Background(), subsriptionRequest)
if err != nil {
log.Errorf("client.GetEvents failed: %v", err)

return
}

for {
event, err := stream.Recv()
subsriptionRequest := &seawatcher.SubscriptionRequest{EventTypes: []seawatcher.EventType{seawatcher.EventType_ITEM_LISTED}, Collections: gb.CollectionDB.OpenseaSlugs()} //nolint:nosnakecase
stream, err := client.GetItemListedEvents(context.Background(), subsriptionRequest)
if err != nil {
if errors.Is(err, io.ErrUnexpectedEOF) || errors.Is(err, io.EOF) {
log.Errorf("io.EOF error: %v", err)
log.Errorf("client.GetEvents failed: %v", err)

break
}
return
}

log.Errorf("receiving event failed: %v", err)
for {
event, err := stream.Recv()
if err != nil {
if errors.Is(err, io.ErrUnexpectedEOF) || errors.Is(err, io.EOF) || errors.Is(err, io.ErrClosedPipe) {
log.Errorf("io.EOF error: %v", err)

time.Sleep(time.Second * 1)
}
log.Errorf("lets fucking reconnect! %v", stream.CloseSend())

basePrice, ok := new(big.Int).SetString(event.Payload.BasePrice, 10)
if !ok {
log.Errorf("error parsing base price: %v", err)
// //
// // reconnect?!
// conn.Close()

continue
}
// conn, err := grpc.Dial(grpcAddress, opts...)
// if err != nil {
// log.Errorf("reco fail to dial: %v", err)
// }
// defer conn.Close()
// client = seawatcher.NewSeaWatcherClient(conn)

// subsriptionRequest := &seawatcher.SubscriptionRequest{EventTypes: []seawatcher.EventType{seawatcher.EventType_ITEM_LISTED}, Collections: gb.CollectionDB.OpenseaSlugs()} //nolint:nosnakecase
// stream, err = client.GetItemListedEvents(context.Background(), subsriptionRequest)
// if err != nil {
// log.Errorf("reco client.GetEvents failed: %v", err)

// continue
// }
// //
// //

break
}

// transform event back to seawaModel.ItemListed
itemListedEvent := seawaModels.ItemListed{
EventType: strings.ToLower(event.EventType.String()),
SentAt: event.SentAt.AsTime(),
Payload: seawaModels.ItemListedPayload{
Item: seawaModels.Item{
NftID: *seawaModels.ParseNftID(event.Payload.Item.NftId),
Chain: seawaModels.Chain{Name: event.Payload.Item.Chain.Name},
Permalink: event.Payload.Item.Permalink,
Metadata: seawaModels.Metadata{
Name: event.Payload.Item.Metadata.Name,
ImageURL: event.Payload.Item.Metadata.ImageUrl,
AnimationURL: event.Payload.Item.Metadata.AnimationUrl,
MetadataURL: event.Payload.Item.Metadata.MetadataUrl,
log.Errorf("receiving event failed: %v", err)

time.Sleep(time.Second * 1)
}

basePrice, ok := new(big.Int).SetString(event.Payload.GetItemListed().Payload.BasePrice, 10)
if !ok {
log.Errorf("error parsing base price: %v", err)

continue
}

var itemListed seawaModels.ItemListed

log.Debugf("🐔 client received: %+v", protojson.Format(event))

if manualParser {
// transform event back to seawaModel.ItemListed
itemListed = seawaModels.ItemListed{
EventType: strings.ToLower(event.EventType.String()),
SentAt: event.Payload.GetItemListed().SentAt.AsTime(),
Payload: seawaModels.ItemListedPayload{
Item: seawaModels.Item{
NftID: *seawaModels.ParseNftID(event.Payload.GetItemListed().Payload.Item.NftId),
Chain: seawaModels.Chain{Name: event.Payload.GetItemListed().Payload.Item.Chain.Name},
Permalink: event.Payload.GetItemListed().Payload.Item.Permalink,
Metadata: seawaModels.Metadata{
Name: event.Payload.GetItemListed().Payload.Item.Metadata.Name,
ImageURL: event.Payload.GetItemListed().Payload.Item.Metadata.ImageUrl,
AnimationURL: event.Payload.GetItemListed().Payload.Item.Metadata.AnimationUrl,
MetadataURL: event.Payload.GetItemListed().Payload.Item.Metadata.MetadataUrl,
},
},
IsPrivate: event.Payload.GetItemListed().Payload.IsPrivate,
ListingDate: event.Payload.GetItemListed().Payload.ListingDate.AsTime(),
EventPayload: seawaModels.EventPayload{
EventTimestamp: event.Payload.GetItemListed().Payload.EventTimestamp.AsTime(),
BasePrice: basePrice,
Maker: seawaModels.Account{Address: common.HexToAddress(event.Payload.GetItemListed().Payload.Maker.Address)},
Taker: seawaModels.Account{Address: common.HexToAddress(event.Payload.GetItemListed().Payload.Taker.Address)},
Quantity: int(event.Payload.GetItemListed().Payload.Quantity),
OrderHash: common.HexToHash(event.Payload.GetItemListed().Payload.OrderHash),
ExpirationDate: event.Payload.GetItemListed().Payload.ExpirationDate.AsTime(),
CollectionCriteria: seawaModels.CollectionCriteria{Slug: event.Payload.GetItemListed().Payload.Collection.Slug},
PaymentToken: seawaModels.PaymentToken{Address: common.HexToAddress(event.Payload.GetItemListed().Payload.PaymentToken.Address), Symbol: event.Payload.GetItemListed().Payload.PaymentToken.Symbol, Decimals: int(event.Payload.GetItemListed().Payload.PaymentToken.Decimals)},
},
},
},
IsPrivate: event.Payload.IsPrivate,
ListingDate: event.Payload.ListingDate.AsTime(),
EventPayload: seawaModels.EventPayload{
EventTimestamp: event.Payload.EventTimestamp.AsTime(),
BasePrice: basePrice,
Maker: seawaModels.Account{Address: common.HexToAddress(event.Payload.Maker.Address)},
Taker: seawaModels.Account{Address: common.HexToAddress(event.Payload.Taker.Address)},
Quantity: int(event.Payload.Quantity),
OrderHash: common.HexToHash(event.Payload.OrderHash),
ExpirationDate: event.Payload.ExpirationDate.AsTime(),
CollectionCriteria: seawaModels.CollectionCriteria{Slug: event.Payload.Collection.Slug},
PaymentToken: seawaModels.PaymentToken{Address: common.HexToAddress(event.Payload.PaymentToken.Address), Symbol: event.Payload.PaymentToken.Symbol, Decimals: int(event.Payload.PaymentToken.Decimals)},
},
},
}
}
} // else {
// log.Printf("🐔 client received: %+v", protojson.Format(event))

// // transform event back to seawaModel.ItemListed
// rawEvent, err := protojson.Marshal(event)
// if err != nil {
// log.Errorf("error parsing base price: %v", err)

// continue
// }

// send event to the eventhub
gb.In.ItemListed <- &itemListedEvent
// if err := json.Unmarshal(rawEvent, &itemListed); err != nil {
// log.Errorf("error unmarshaling : %v", err)

// continue
// }
// }

// send event to the eventhub
gb.In.ItemListed <- &itemListed
}
}
}

Expand Down
6 changes: 3 additions & 3 deletions internal/degendb/event_type.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ type GBEventType struct {
openseaEventName string
}

func GetEventType(name string) *EventType {
func GetEventType(name string) *GBEventType {
if eventType, ok := eventTypes[name]; ok {
return &eventType
return eventType
}

return nil
Expand Down Expand Up @@ -68,7 +68,7 @@ var (
SaleTypes = mapset.NewSet[EventType](Sale, Purchase)

// map of lowercase_with_underscores openseaEventName to event type.
eventTypes = map[string]EventType{
eventTypes = map[string]*GBEventType{
"item_transferred": Transfer,
"item_sold": Sale,
"item_listed": Listing,
Expand Down
Loading

0 comments on commit 98a6406

Please sign in to comment.