Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -8404,16 +8404,16 @@ func (t *testSubscriptionUpdaterChan) Update(data []byte) {
}

// empty method to satisfy the interface, not used in this tests
func (t *testSubscriptionUpdaterChan) UpdateSubscription(id resolve.SubscriptionIdentifier, data []byte) {
func (t *testSubscriptionUpdaterChan) UpdateSubscription(ctx *resolve.Context, id resolve.SubscriptionIdentifier, data []byte) {
}

// empty method to satisfy the interface, not used in this tests
func (t *testSubscriptionUpdaterChan) CloseSubscription(kind resolve.SubscriptionCloseKind, id resolve.SubscriptionIdentifier) {
}

// empty method to satisfy the interface, not used in this tests
func (t *testSubscriptionUpdaterChan) Subscriptions() map[context.Context]resolve.SubscriptionIdentifier {
return make(map[context.Context]resolve.SubscriptionIdentifier)
func (t *testSubscriptionUpdaterChan) Subscriptions() map[*resolve.Context]resolve.SubscriptionIdentifier {
return make(map[*resolve.Context]resolve.SubscriptionIdentifier)
}

func (t *testSubscriptionUpdaterChan) Complete() {
Expand Down Expand Up @@ -8546,12 +8546,12 @@ func (t *testSubscriptionUpdater) CloseSubscription(kind resolve.SubscriptionClo
}

// empty method to satisfy the interface, not used in this tests
func (t *testSubscriptionUpdater) Subscriptions() map[context.Context]resolve.SubscriptionIdentifier {
return make(map[context.Context]resolve.SubscriptionIdentifier)
func (t *testSubscriptionUpdater) Subscriptions() map[*resolve.Context]resolve.SubscriptionIdentifier {
return make(map[*resolve.Context]resolve.SubscriptionIdentifier)
}

// empty method to satisfy the interface, not used in this tests
func (t *testSubscriptionUpdater) UpdateSubscription(id resolve.SubscriptionIdentifier, data []byte) {
func (t *testSubscriptionUpdater) UpdateSubscription(ctx *resolve.Context, id resolve.SubscriptionIdentifier, data []byte) {
}

func TestSubscriptionSource_Start(t *testing.T) {
Expand Down
34 changes: 17 additions & 17 deletions v2/pkg/engine/resolve/resolve.go
Original file line number Diff line number Diff line change
Expand Up @@ -445,10 +445,10 @@ type trigger struct {
updater *subscriptionUpdater
}

func (t *trigger) subscriptionIds() map[context.Context]SubscriptionIdentifier {
subs := make(map[context.Context]SubscriptionIdentifier, len(t.subscriptions))
func (t *trigger) subscriptionIds() map[*Context]SubscriptionIdentifier {
subs := make(map[*Context]SubscriptionIdentifier, len(t.subscriptions))
for ctx, sub := range t.subscriptions {
subs[ctx.Context()] = sub.id
subs[ctx] = sub.id
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated
return subs
}
Expand Down Expand Up @@ -668,7 +668,7 @@ func (r *Resolver) handleEvent(event subscriptionEvent) {
case subscriptionEventKindRemoveClient:
r.handleRemoveClient(event.id.ConnectionID)
case subscriptionEventKindUpdateSubscription:
r.handleUpdateSubscription(event.triggerID, event.data, event.id)
r.handleUpdateSubscription(event.triggerID, event.data, event.id, event.subCtx)
case subscriptionEventKindTriggerUpdate:
r.handleTriggerUpdate(event.triggerID, event.data)
case subscriptionEventKindTriggerComplete:
Expand Down Expand Up @@ -756,7 +756,7 @@ func (r *Resolver) executeStartupHooks(add *addSubscription, updater *subscripti
Updater: func(data []byte) {
// Writing on the updater channel is safe but has to happen outside of the event loop
// to respect order and not block the event loop
updater.UpdateSubscription(add.id, data)
updater.UpdateSubscription(add.ctx, add.id, data)
},
}

Expand Down Expand Up @@ -994,7 +994,7 @@ func (r *Resolver) handleTriggerUpdate(id uint64, data []byte) {
}
}

func (r *Resolver) handleUpdateSubscription(id uint64, data []byte, subIdentifier SubscriptionIdentifier) {
func (r *Resolver) handleUpdateSubscription(id uint64, data []byte, subIdentifier SubscriptionIdentifier, subCtx *Context) {
trig, ok := r.triggers[id]
if !ok {
return
Expand All @@ -1004,13 +1004,11 @@ func (r *Resolver) handleUpdateSubscription(id uint64, data []byte, subIdentifie
fmt.Printf("resolver:trigger:subscription:update:%d:%d,%d\n", id, subIdentifier.ConnectionID, subIdentifier.SubscriptionID)
}

for c, s := range trig.subscriptions {
if s.id != subIdentifier {
continue
}
r.sendUpdateToSubscription(data, c, s)
break
s, ok := trig.subscriptions[subCtx]
if !ok {
return
}
r.sendUpdateToSubscription(data, subCtx, s)
}

func (r *Resolver) sendUpdateToSubscription(data []byte, c *Context, s *sub) {
Expand Down Expand Up @@ -1449,7 +1447,7 @@ type subscriptionUpdater struct {
triggerID uint64
ch chan subscriptionEvent
ctx context.Context
subsFn func() map[context.Context]SubscriptionIdentifier
subsFn func() map[*Context]SubscriptionIdentifier
}

func (s *subscriptionUpdater) Update(data []byte) {
Expand All @@ -1469,7 +1467,7 @@ func (s *subscriptionUpdater) Update(data []byte) {
}
}

func (s *subscriptionUpdater) UpdateSubscription(id SubscriptionIdentifier, data []byte) {
func (s *subscriptionUpdater) UpdateSubscription(ctx *Context, id SubscriptionIdentifier, data []byte) {
if s.debug {
fmt.Printf("resolver:subscription_updater:update:%d\n", s.triggerID)
}
Expand All @@ -1483,11 +1481,12 @@ func (s *subscriptionUpdater) UpdateSubscription(id SubscriptionIdentifier, data
kind: subscriptionEventKindUpdateSubscription,
data: data,
id: id,
subCtx: ctx,
}:
}
}

func (s *subscriptionUpdater) Subscriptions() map[context.Context]SubscriptionIdentifier {
func (s *subscriptionUpdater) Subscriptions() map[*Context]SubscriptionIdentifier {
return s.subsFn()
}

Expand Down Expand Up @@ -1567,6 +1566,7 @@ type subscriptionEvent struct {
data []byte
addSubscription *addSubscription
closeKind SubscriptionCloseKind
subCtx *Context
}

type addSubscription struct {
Expand Down Expand Up @@ -1599,13 +1599,13 @@ type SubscriptionUpdater interface {
// Update sends an update to the client. It is not guaranteed that the update is sent immediately.
Update(data []byte)
// UpdateSubscription sends an update to a single subscription. It is not guaranteed that the update is sent immediately.
UpdateSubscription(id SubscriptionIdentifier, data []byte)
UpdateSubscription(ctx *Context, id SubscriptionIdentifier, data []byte)
// Complete also takes care of cleaning up the trigger and all subscriptions. No more updates should be sent after calling Complete.
Complete()
// Close closes the subscription and cleans up the trigger and all subscriptions. No more updates should be sent after calling Close.
Close(kind SubscriptionCloseKind)
// CloseSubscription closes a single subscription. No more updates should be sent to that subscription after calling CloseSubscription.
CloseSubscription(kind SubscriptionCloseKind, id SubscriptionIdentifier)
// Subscriptions return all the subscriptions associated to this Updater
Subscriptions() map[context.Context]SubscriptionIdentifier
Subscriptions() map[*Context]SubscriptionIdentifier
}
Loading