Skip to content

Commit 7571e5f

Browse files
committed
fix: propagte job progress events and support synchronous events
Fixes two bugs with the Queue class: 1. When joining an existing job, progress events were not received by the second context 2. If a progress event was emitted before any async work was done in a queue job under the concurrency threshold, it was missed because the job result recipient had not been registered yet
1 parent 2ccd234 commit 7571e5f

5 files changed

Lines changed: 96 additions & 11 deletions

File tree

packages/utils/package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@
6363
"netmask": "^2.0.2",
6464
"p-defer": "^4.0.1",
6565
"p-event": "^7.0.0",
66+
"progress-events": "^1.1.0",
6667
"race-signal": "^2.0.0",
6768
"uint8-varint": "^2.0.4",
6869
"uint8arraylist": "^2.4.8",

packages/utils/src/queue/index.ts

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import { debounce } from '../debounce.js'
66
import { QueueFullError } from '../errors.js'
77
import { Job } from './job.js'
88
import type { AbortOptions, Metrics } from '@libp2p/interface'
9+
import type { ProgressOptions } from 'progress-events'
910

1011
export type { Job, JobTimeline } from './job.js'
1112
export type { JobRecipient } from './recipient.js'
@@ -118,7 +119,7 @@ export interface QueueEvents<JobReturnType, JobOptions extends AbortOptions = Ab
118119
* 1. Items remain at the head of the queue while they are running so `queue.size` includes `queue.pending` items - this is so interested parties can join the results of a queue item while it is running
119120
* 2. The options for a job are stored separately to the job in order for them to be modified while they are still in the queue
120121
*/
121-
export class Queue<JobReturnType = unknown, JobOptions extends AbortOptions = AbortOptions> extends TypedEventEmitter<QueueEvents<JobReturnType, JobOptions>> {
122+
export class Queue<JobReturnType = unknown, JobOptions extends AbortOptions & ProgressOptions = AbortOptions> extends TypedEventEmitter<QueueEvents<JobReturnType, JobOptions>> {
122123
public concurrency: number
123124
public maxSize: number
124125
public queue: Array<Job<JobOptions, JobReturnType>>
@@ -257,9 +258,8 @@ export class Queue<JobReturnType = unknown, JobOptions extends AbortOptions = Ab
257258
const job = new Job<JobOptions, JobReturnType>(fn, options)
258259
this.enqueue(job)
259260
this.safeDispatchEvent('add')
260-
this.tryToStartAnother()
261261

262-
return job.join(options)
262+
const result = job.join(options)
263263
.then(result => {
264264
this.safeDispatchEvent('completed', { detail: result })
265265
this.safeDispatchEvent('success', { detail: { job, result } })
@@ -281,6 +281,10 @@ export class Queue<JobReturnType = unknown, JobOptions extends AbortOptions = Ab
281281

282282
throw err
283283
})
284+
285+
this.tryToStartAnother()
286+
287+
return result
284288
}
285289

286290
/**

packages/utils/src/queue/job.ts

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import { raceSignal } from 'race-signal'
44
import { JobRecipient } from './recipient.js'
55
import type { JobStatus } from './index.js'
66
import type { AbortOptions } from '@libp2p/interface'
7+
import type { ProgressOptions } from 'progress-events'
78

89
/**
910
* Returns a random string
@@ -18,7 +19,7 @@ export interface JobTimeline {
1819
finished?: number
1920
}
2021

21-
export class Job <JobOptions extends AbortOptions = AbortOptions, JobReturnType = unknown> {
22+
export class Job <JobOptions extends AbortOptions & ProgressOptions = AbortOptions, JobReturnType = unknown> {
2223
public id: string
2324
public fn: (options: JobOptions) => Promise<JobReturnType>
2425
public options: JobOptions
@@ -59,11 +60,11 @@ export class Job <JobOptions extends AbortOptions = AbortOptions, JobReturnType
5960
}
6061
}
6162

62-
async join (options: AbortOptions = {}): Promise<JobReturnType> {
63-
const recipient = new JobRecipient<JobReturnType>(options.signal)
63+
async join (options?: Partial<Pick<JobOptions, 'signal' | 'onProgress'>>): Promise<JobReturnType> {
64+
const recipient = new JobRecipient<JobReturnType>(options)
6465
this.recipients.push(recipient)
6566

66-
options.signal?.addEventListener('abort', this.onAbort)
67+
options?.signal?.addEventListener('abort', this.onAbort)
6768

6869
return recipient.deferred.promise
6970
}
@@ -77,7 +78,12 @@ export class Job <JobOptions extends AbortOptions = AbortOptions, JobReturnType
7778

7879
const result = await raceSignal(this.fn({
7980
...(this.options ?? {}),
80-
signal: this.controller.signal
81+
signal: this.controller.signal,
82+
onProgress: (evt: any): void => {
83+
this.recipients.forEach(recipient => {
84+
recipient.onProgress?.(evt)
85+
})
86+
}
8187
}), this.controller.signal)
8288

8389
this.recipients.forEach(recipient => {

packages/utils/src/queue/recipient.ts

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,17 @@
11
import { AbortError } from '@libp2p/interface'
22
import pDefer from 'p-defer'
3+
import type { AbortOptions } from '@libp2p/interface'
34
import type { DeferredPromise } from 'p-defer'
5+
import type { ProgressOptions, ProgressEventListener } from 'progress-events'
46

5-
export class JobRecipient<JobReturnType> {
7+
export class JobRecipient<JobReturnType, JobOptions extends AbortOptions & ProgressOptions = any> {
68
public deferred: DeferredPromise<JobReturnType>
79
public signal?: AbortSignal
10+
public onProgress?: ProgressEventListener
811

9-
constructor (signal?: AbortSignal) {
10-
this.signal = signal
12+
constructor (options?: Partial<Pick<JobOptions, 'signal' | 'onProgress'>>) {
13+
this.signal = options?.signal
14+
this.onProgress = options?.onProgress
1115
this.deferred = pDefer()
1216

1317
this.onAbort = this.onAbort.bind(this)

packages/utils/test/queue.spec.ts

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,11 @@ import { expect } from 'aegir/chai'
22
import delay from 'delay'
33
import all from 'it-all'
44
import pDefer from 'p-defer'
5+
import { CustomProgressEvent } from 'progress-events'
56
import { Queue } from '../src/queue/index.js'
67
import { TestSignal } from './fixtures/test-signal.js'
78
import type { AbortOptions } from '@libp2p/interface'
9+
import type { ProgressOptions } from 'progress-events'
810

911
const fixture = Symbol('fixture')
1012

@@ -864,4 +866,72 @@ describe('queue', () => {
864866
await expect(p).to.eventually.equal('hello')
865867
await expect(queue.add(job)).to.eventually.equal('hello')
866868
})
869+
870+
it('should forward progress events to multiple consumers', async () => {
871+
interface ProgressJobOptions extends AbortOptions, ProgressOptions {
872+
873+
}
874+
const queue = new Queue<string, ProgressJobOptions>({
875+
concurrency: 1,
876+
maxSize: 1
877+
})
878+
queue.pause()
879+
880+
const job = async (options: ProgressJobOptions): Promise<string> => {
881+
options.onProgress?.(new CustomProgressEvent('before'))
882+
await delay(100)
883+
options.onProgress?.(new CustomProgressEvent('after'))
884+
return 'hello'
885+
}
886+
887+
const events: any[] = []
888+
889+
const p1 = queue.add(job, {
890+
onProgress: (evt) => {
891+
events.push(evt)
892+
}
893+
})
894+
895+
const p2 = queue.queue[0].join({
896+
onProgress: (evt) => {
897+
events.push(evt)
898+
}
899+
})
900+
901+
queue.resume()
902+
903+
await Promise.all([
904+
p1,
905+
p2
906+
])
907+
908+
expect(events).to.have.lengthOf(4)
909+
})
910+
911+
it('should consume synchronous progress events', async () => {
912+
interface ProgressJobOptions extends AbortOptions, ProgressOptions {
913+
914+
}
915+
const queue = new Queue<string, ProgressJobOptions>({
916+
concurrency: 1,
917+
maxSize: 1
918+
})
919+
920+
const events: any[] = []
921+
922+
const job = async (options: ProgressJobOptions): Promise<string> => {
923+
options.onProgress?.(new CustomProgressEvent('before'))
924+
await delay(100)
925+
options.onProgress?.(new CustomProgressEvent('after'))
926+
return 'hello'
927+
}
928+
929+
await queue.add(job, {
930+
onProgress: (evt) => {
931+
events.push(evt)
932+
}
933+
})
934+
935+
expect(events).to.have.lengthOf(2)
936+
})
867937
})

0 commit comments

Comments
 (0)