7
7
"fmt"
8
8
"math"
9
9
"net/url"
10
- "strings"
11
10
"sync"
12
11
"time"
13
12
@@ -40,7 +39,8 @@ type SeaWatcher struct {
40
39
channels map [string ]* phx.Channel
41
40
42
41
// subscribed slugs/events
43
- subscriptions map [osmodels.EventType ]map [string ]func ()
42
+ // subscriptions map[osmodels.EventType]map[string]func()
43
+ subscriptions map [string ]map [osmodels.EventType ]func ()
44
44
45
45
// redis client
46
46
rdb rueidis.Client
@@ -51,9 +51,7 @@ type SeaWatcher struct {
51
51
}
52
52
53
53
var (
54
- // topX = map[string]*models.Bid{}.
55
-
56
- AvailableEventTypes = []osmodels.EventType {osmodels .ItemListed , osmodels .ItemReceivedBid , osmodels .ItemMetadataUpdated } // , osmodels.CollectionOffer} //, osmodels.ItemMetadataUpdated} // ItemMetadataUpdated, ItemCancelled
54
+ availableEventTypes = []osmodels.EventType {osmodels .ItemListed , osmodels .ItemReceivedBid , osmodels .ItemMetadataUpdated } // , osmodels.CollectionOffer} //, osmodels.ItemMetadataUpdated} // ItemMetadataUpdated, ItemCancelled
57
55
58
56
eventsReceivedTotal = promauto .NewCounter (prometheus.CounterOpts {
59
57
Name : "gloomberg_oswatcher_events_received_total" ,
@@ -80,13 +78,11 @@ func NewSeaWatcher(apiToken string, gb *gloomberg.Gloomberg) *SeaWatcher {
80
78
81
79
client := & SeaWatcher {
82
80
receivedEvents : make (chan map [string ]interface {}, 1024 ),
83
- subscriptions : make (map [osmodels. EventType ]map [string ]func (), 0 ),
81
+ subscriptions : make (map [string ]map [osmodels. EventType ]func (), 0 ),
84
82
85
- // phoenixSocket: socket,
86
83
channels : make (map [string ]* phx.Channel ),
87
84
88
- gb : gb ,
89
- // rdb: rdb,
85
+ gb : gb ,
90
86
rdb : gb .Rdb ,
91
87
92
88
mu : & sync.Mutex {},
@@ -118,11 +114,6 @@ func NewSeaWatcher(apiToken string, gb *gloomberg.Gloomberg) *SeaWatcher {
118
114
}
119
115
})
120
116
121
- // create subscriptions map/registry
122
- for _ , event := range AvailableEventTypes {
123
- client .subscriptions [event ] = make (map [string ]func (), 0 )
124
- }
125
-
126
117
if client .phoenixSocket != nil {
127
118
if err := client .connect (); err != nil {
128
119
socketError := errors .New ("opensea stream socket connection failed: " + err .Error ())
@@ -149,7 +140,7 @@ func (sw *SeaWatcher) EventChannel() chan map[string]interface{} {
149
140
return sw .receivedEvents
150
141
}
151
142
152
- func (sw * SeaWatcher ) ActiveSubscriptions () map [osmodels. EventType ]map [string ]func () {
143
+ func (sw * SeaWatcher ) ActiveSubscriptions () map [string ]map [osmodels. EventType ]func () {
153
144
return sw .subscriptions
154
145
}
155
146
@@ -289,85 +280,65 @@ func (sw *SeaWatcher) DecodeCollectionOfferEvent(itemEvent map[string]interface{
289
280
return collectionOfferEvent , err
290
281
}
291
282
292
- // func printItemReceivedOfferEvent(itemOfferEvent osmodels.ItemReceivedOfferEvent) {
293
- // priceWeiRaw, _, err := big.ParseFloat(itemOfferEvent.Payload.BasePrice, 10, 64, big.ToNearestEven)
294
- // if err != nil {
295
- // log.Infof("⚓️❌ werror parsing price: %+v | %s", itemOfferEvent.Payload.BasePrice, err.Error())
296
-
297
- // return
298
- // }
299
- // priceWei, _ := priceWeiRaw.Int(nil)
300
-
301
- // // nftID is a identification string in the format <chain>/<contract>/<tokenID>
302
- // nftID := strings.Split(itemOfferEvent.Payload.Item.NftID, "/")
303
- // if len(nftID) != 3 {
304
- // log.Infof("⚓️ 🤷♀️ error parsing nftID: %s", itemOfferEvent.Payload.Item.NftID)
305
-
306
- // return
307
- // }
308
- // eventType := osmodels.TxType[itemOfferEvent.StreamEvent]
309
- // collectionPrimaryStyle := lipgloss.NewStyle().Foreground(style.GenerateColorWithSeed(common.HexToAddress(nftID[1]).Hash().Big().Int64())).Bold(true)
310
- // collectionSecondaryStyle := lipgloss.NewStyle().Foreground(style.GenerateColorWithSeed(common.HexToAddress(nftID[1]).Big().Int64() ^ 2)).Bold(true)
311
- // // get tokenID
312
- // tID, _, _ := big.ParseFloat(nftID[2], 10, 64, big.ToNearestEven)
313
- // tokenID, _ := tID.Int(nil)
314
- // fmtTokenID := style.ShortenedTokenIDStyled(tokenID, collectionPrimaryStyle, collectionSecondaryStyle)
315
- // // for erc1155 tokens itemOfferEvent.Payload.Item.Metadata.Name is the item name
316
- // collectionName := strings.Split(itemOfferEvent.Payload.Item.Metadata.Name, " #")[0]
317
- // fmtToken := style.BoldStyle.Render(fmt.Sprintf("%s %s", collectionPrimaryStyle.Render(collectionName), fmtTokenID))
318
- // fmt.Println(itemOfferEvent.StreamEvent)
319
- // log.Infof("⚓️ %s | %sΞ %s ", eventType.Icon(), style.BoldStyle.Render(fmt.Sprintf("%5.3f", price.NewPrice(priceWei).Ether())), style.TerminalLink(itemOfferEvent.Payload.Item.Permalink, fmtToken))
320
- // }
321
-
322
- func (sw * SeaWatcher ) SubscribeForSlug (eventType osmodels.EventType , slug string ) bool {
283
+ func (sw * SeaWatcher ) SubscribeForSlug (slug string ) bool {
323
284
sw .mu .Lock ()
324
- alreadySubscribed := sw.subscriptions [eventType ][ slug ]
285
+ alreadySubscribed := sw .subscriptions [slug ]
325
286
326
287
if alreadySubscribed != nil {
327
288
sw .mu .Unlock ()
328
289
329
- log .Debugf ("⚓️ ☕️ already subscribed to %s for %s" , eventType , slug )
290
+ log .Debugf ("⚓️ ☕️ already subscribed to opensea events for %s" , slug )
330
291
331
292
return false
332
293
}
333
294
334
- sw.subscriptions [eventType ][slug ] = sw .on (eventType , slug , sw .eventHandler )
295
+ sw .subscriptions [slug ] = make (map [osmodels.EventType ]func ())
296
+
297
+ for _ , eventType := range availableEventTypes {
298
+ sw.subscriptions [slug ][eventType ] = sw .on (eventType , slug , sw .eventHandler )
299
+ }
335
300
sw .mu .Unlock ()
336
301
302
+ if collection := sw .gb .CollectionDB .GetCollectionForSlug (slug ); collection != nil {
303
+ log .Debugf ("⏮️ resetting stats for %s" , slug )
304
+
305
+ collection .ResetStats ()
306
+ }
307
+
337
308
return true
338
309
}
339
310
340
- func (sw * SeaWatcher ) UnubscribeForSlug (eventType osmodels. EventType , slug string ) bool {
311
+ func (sw * SeaWatcher ) UnubscribeForSlug (slug string ) bool {
341
312
sw .mu .Lock ()
342
- unsubscribe := sw.subscriptions [ eventType ] [slug ]
313
+ slugSubscriptions := sw .subscriptions [slug ]
343
314
sw .mu .Unlock ()
344
315
345
- if unsubscribe != nil {
316
+ if slugSubscriptions != nil {
346
317
// unsubscribe
347
- unsubscribe ()
318
+ for _ , unsubscribe := range slugSubscriptions {
319
+ unsubscribe ()
320
+ }
348
321
349
322
// remove slug
350
323
sw .mu .Lock ()
351
- sw.subscriptions [eventType ][ slug ] = nil
324
+ sw .subscriptions [slug ] = nil
352
325
sw .mu .Unlock ()
353
326
354
327
return true
355
328
}
356
329
357
- log .Debugf ("☕️ not subscribed to %s for %s (anymore)" , eventType , slug )
330
+ log .Debugf ("unsubscribed %s from opense events" , slug )
358
331
359
332
return false
360
333
}
361
334
362
335
func (sw * SeaWatcher ) IsSubscribed (slug string ) bool {
363
- for _ , eventType := range AvailableEventTypes {
364
- sw .mu .Lock ()
365
- alreadySubscribed , ok := sw.subscriptions [eventType ][slug ]
366
- sw .mu .Unlock ()
336
+ sw .mu .Lock ()
337
+ alreadySubscribed , ok := sw .subscriptions [slug ]
338
+ sw .mu .Unlock ()
367
339
368
- if ok && alreadySubscribed != nil {
369
- return true
370
- }
340
+ if ok && alreadySubscribed != nil {
341
+ return true
371
342
}
372
343
373
344
return false
@@ -466,7 +437,7 @@ func (sw *SeaWatcher) Run() {
466
437
return
467
438
}
468
439
469
- var action func (event osmodels. EventType , slug string ) bool
440
+ var action func (slug string ) bool
470
441
471
442
switch mgmtEvent .Action {
472
443
case models .Subscribe :
@@ -475,20 +446,6 @@ func (sw *SeaWatcher) Run() {
475
446
action = sw .UnubscribeForSlug
476
447
}
477
448
478
- // transform to string
479
- var events []string
480
- for _ , event := range AvailableEventTypes {
481
- events = append (events , string (event ))
482
- }
483
-
484
- // subscribe to which events?
485
- if len (mgmtEvent .Events ) == 0 {
486
- // subscribe to all available events if none are specified
487
- sw .Prf ("no events specified, subscribing to all available events (%+v)" , strings .Join (events , ", " ))
488
-
489
- mgmtEvent .Events = AvailableEventTypes
490
- }
491
-
492
449
newSubscriptions := make (map [string ][]osmodels.EventType , 0 )
493
450
newEventSubscriptions := 0
494
451
@@ -499,26 +456,18 @@ func (sw *SeaWatcher) Run() {
499
456
continue
500
457
}
501
458
502
- for _ , event := range mgmtEvent .Events {
503
- if action (event , slug ) {
504
- newEventSubscriptions ++
505
-
506
- if _ , ok := newSubscriptions [slug ]; ! ok {
507
- newSubscriptions [slug ] = make ([]osmodels.EventType , 0 )
508
- }
509
-
510
- newSubscriptions [slug ] = append (newSubscriptions [slug ], event )
459
+ if action (slug ) {
460
+ newEventSubscriptions ++
511
461
512
- time .Sleep (257 * time .Millisecond )
513
- }
462
+ time .Sleep (337 * time .Millisecond )
514
463
}
515
464
}
516
465
517
466
sw .Prf (
518
- "successfully subscribed to %s new collections/slugs (%d events in total) | total subscriptions : %s" ,
467
+ "successfully subscribed to %s new collections/slugs (%d events in total) | total subscribed collections : %s" ,
519
468
style .AlmostWhiteStyle .Render (fmt .Sprint (len (newSubscriptions ))),
520
469
newEventSubscriptions ,
521
- style .AlmostWhiteStyle .Render (fmt .Sprint (len (sw .ActiveSubscriptions ()[ osmodels . ItemListed ] ))),
470
+ style .AlmostWhiteStyle .Render (fmt .Sprint (len (sw .ActiveSubscriptions ()))),
522
471
)
523
472
524
473
default :
0 commit comments