Skip to content

Commit b345aee

Browse files
committed
fix: use direct map lookup to find subscription
1 parent 5396c66 commit b345aee

1 file changed

Lines changed: 17 additions & 17 deletions

File tree

v2/pkg/engine/resolve/resolve.go

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -445,10 +445,10 @@ type trigger struct {
445445
updater *subscriptionUpdater
446446
}
447447

448-
func (t *trigger) subscriptionIds() map[context.Context]SubscriptionIdentifier {
449-
subs := make(map[context.Context]SubscriptionIdentifier, len(t.subscriptions))
448+
func (t *trigger) subscriptionIds() map[*Context]SubscriptionIdentifier {
449+
subs := make(map[*Context]SubscriptionIdentifier, len(t.subscriptions))
450450
for ctx, sub := range t.subscriptions {
451-
subs[ctx.Context()] = sub.id
451+
subs[ctx] = sub.id
452452
}
453453
return subs
454454
}
@@ -668,7 +668,7 @@ func (r *Resolver) handleEvent(event subscriptionEvent) {
668668
case subscriptionEventKindRemoveClient:
669669
r.handleRemoveClient(event.id.ConnectionID)
670670
case subscriptionEventKindUpdateSubscription:
671-
r.handleUpdateSubscription(event.triggerID, event.data, event.id)
671+
r.handleUpdateSubscription(event.triggerID, event.data, event.id, event.subCtx)
672672
case subscriptionEventKindTriggerUpdate:
673673
r.handleTriggerUpdate(event.triggerID, event.data)
674674
case subscriptionEventKindTriggerComplete:
@@ -756,7 +756,7 @@ func (r *Resolver) executeStartupHooks(add *addSubscription, updater *subscripti
756756
Updater: func(data []byte) {
757757
// Writing on the updater channel is safe but has to happen outside of the event loop
758758
// to respect order and not block the event loop
759-
updater.UpdateSubscription(add.id, data)
759+
updater.UpdateSubscription(add.ctx, add.id, data)
760760
},
761761
}
762762

@@ -994,7 +994,7 @@ func (r *Resolver) handleTriggerUpdate(id uint64, data []byte) {
994994
}
995995
}
996996

997-
func (r *Resolver) handleUpdateSubscription(id uint64, data []byte, subIdentifier SubscriptionIdentifier) {
997+
func (r *Resolver) handleUpdateSubscription(id uint64, data []byte, subIdentifier SubscriptionIdentifier, subCtx *Context) {
998998
trig, ok := r.triggers[id]
999999
if !ok {
10001000
return
@@ -1004,13 +1004,11 @@ func (r *Resolver) handleUpdateSubscription(id uint64, data []byte, subIdentifie
10041004
fmt.Printf("resolver:trigger:subscription:update:%d:%d,%d\n", id, subIdentifier.ConnectionID, subIdentifier.SubscriptionID)
10051005
}
10061006

1007-
for c, s := range trig.subscriptions {
1008-
if s.id != subIdentifier {
1009-
continue
1010-
}
1011-
r.sendUpdateToSubscription(data, c, s)
1012-
break
1007+
s, ok := trig.subscriptions[subCtx]
1008+
if !ok {
1009+
return
10131010
}
1011+
r.sendUpdateToSubscription(data, subCtx, s)
10141012
}
10151013

10161014
func (r *Resolver) sendUpdateToSubscription(data []byte, c *Context, s *sub) {
@@ -1449,7 +1447,7 @@ type subscriptionUpdater struct {
14491447
triggerID uint64
14501448
ch chan subscriptionEvent
14511449
ctx context.Context
1452-
subsFn func() map[context.Context]SubscriptionIdentifier
1450+
subsFn func() map[*Context]SubscriptionIdentifier
14531451
}
14541452

14551453
func (s *subscriptionUpdater) Update(data []byte) {
@@ -1469,7 +1467,7 @@ func (s *subscriptionUpdater) Update(data []byte) {
14691467
}
14701468
}
14711469

1472-
func (s *subscriptionUpdater) UpdateSubscription(id SubscriptionIdentifier, data []byte) {
1470+
func (s *subscriptionUpdater) UpdateSubscription(ctx *Context, id SubscriptionIdentifier, data []byte) {
14731471
if s.debug {
14741472
fmt.Printf("resolver:subscription_updater:update:%d\n", s.triggerID)
14751473
}
@@ -1483,11 +1481,12 @@ func (s *subscriptionUpdater) UpdateSubscription(id SubscriptionIdentifier, data
14831481
kind: subscriptionEventKindUpdateSubscription,
14841482
data: data,
14851483
id: id,
1484+
subCtx: ctx,
14861485
}:
14871486
}
14881487
}
14891488

1490-
func (s *subscriptionUpdater) Subscriptions() map[context.Context]SubscriptionIdentifier {
1489+
func (s *subscriptionUpdater) Subscriptions() map[*Context]SubscriptionIdentifier {
14911490
return s.subsFn()
14921491
}
14931492

@@ -1567,6 +1566,7 @@ type subscriptionEvent struct {
15671566
data []byte
15681567
addSubscription *addSubscription
15691568
closeKind SubscriptionCloseKind
1569+
subCtx *Context
15701570
}
15711571

15721572
type addSubscription struct {
@@ -1599,13 +1599,13 @@ type SubscriptionUpdater interface {
15991599
// Update sends an update to the client. It is not guaranteed that the update is sent immediately.
16001600
Update(data []byte)
16011601
// UpdateSubscription sends an update to a single subscription. It is not guaranteed that the update is sent immediately.
1602-
UpdateSubscription(id SubscriptionIdentifier, data []byte)
1602+
UpdateSubscription(ctx *Context, id SubscriptionIdentifier, data []byte)
16031603
// Complete also takes care of cleaning up the trigger and all subscriptions. No more updates should be sent after calling Complete.
16041604
Complete()
16051605
// Close closes the subscription and cleans up the trigger and all subscriptions. No more updates should be sent after calling Close.
16061606
Close(kind SubscriptionCloseKind)
16071607
// CloseSubscription closes a single subscription. No more updates should be sent to that subscription after calling CloseSubscription.
16081608
CloseSubscription(kind SubscriptionCloseKind, id SubscriptionIdentifier)
16091609
// Subscriptions return all the subscriptions associated to this Updater
1610-
Subscriptions() map[context.Context]SubscriptionIdentifier
1610+
Subscriptions() map[*Context]SubscriptionIdentifier
16111611
}

0 commit comments

Comments
 (0)