Skip to content

Commit 2151144

Browse files
paschal533dozyio
andauthored
fix(connection-manager): add per-address timeout (#3412)
* fix(connection-manager): add per-address timeout to prevent slow addresses exhausting the dial budget --------- Co-authored-by: dozyio <37986489+dozyio@users.noreply.github.com>
1 parent f21efd1 commit 2151144

4 files changed

Lines changed: 196 additions & 3 deletions

File tree

packages/libp2p/src/connection-manager/constants.defaults.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,11 @@
33
*/
44
export const DIAL_TIMEOUT = 10_000
55

6+
/**
7+
* @see https://libp2p.github.io/js-libp2p/interfaces/libp2p.index.ConnectionManagerInit.html#addressDialTimeout
8+
*/
9+
export const ADDRESS_DIAL_TIMEOUT = 6_000
10+
611
/**
712
* @see https://libp2p.github.io/js-libp2p/interfaces/libp2p.index.ConnectionManagerInit.html#connectionCloseTimeout
813
*/

packages/libp2p/src/connection-manager/dial-queue.ts

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import { DialDeniedError, NoValidAddressesError } from '../errors.js'
1212
import { getPeerAddress } from '../get-peer.js'
1313
import { defaultAddressSorter } from './address-sorter.js'
1414
import {
15+
ADDRESS_DIAL_TIMEOUT,
1516
DIAL_TIMEOUT,
1617
MAX_PARALLEL_DIALS,
1718
MAX_PEER_ADDRS_TO_DIAL,
@@ -45,6 +46,7 @@ interface DialerInit {
4546
maxDialQueueLength?: number
4647
maxPeerAddrsToDial?: number
4748
dialTimeout?: number
49+
addressDialTimeout?: number
4850
resolvers?: Record<string, MultiaddrResolver>
4951
connections?: PeerMap<Connection[]>
5052
}
@@ -54,6 +56,7 @@ const defaultOptions = {
5456
maxDialQueueLength: MAX_DIAL_QUEUE_LENGTH,
5557
maxPeerAddrsToDial: MAX_PEER_ADDRS_TO_DIAL,
5658
dialTimeout: DIAL_TIMEOUT,
59+
addressDialTimeout: ADDRESS_DIAL_TIMEOUT,
5760
resolvers: {
5861
dnsaddr: dnsaddrResolver
5962
}
@@ -77,6 +80,7 @@ export class DialQueue {
7780
private readonly maxPeerAddrsToDial: number
7881
private readonly maxDialQueueLength: number
7982
private readonly dialTimeout: number
83+
private readonly addressDialTimeout: number
8084
private shutDownController: AbortController
8185
private readonly connections: PeerMap<Connection[]>
8286
private readonly log: Logger
@@ -87,6 +91,7 @@ export class DialQueue {
8791
this.maxPeerAddrsToDial = init.maxPeerAddrsToDial ?? defaultOptions.maxPeerAddrsToDial
8892
this.maxDialQueueLength = init.maxDialQueueLength ?? defaultOptions.maxDialQueueLength
8993
this.dialTimeout = init.dialTimeout ?? defaultOptions.dialTimeout
94+
this.addressDialTimeout = init.addressDialTimeout ?? defaultOptions.addressDialTimeout
9095
this.connections = init.connections ?? new PeerMap()
9196
this.log = components.logger.forComponent('libp2p:connection-manager:dial-queue')
9297
this.components = components
@@ -279,11 +284,17 @@ export class DialQueue {
279284

280285
dialed++
281286

287+
// create a per-address signal so a single slow/unreachable address
288+
// cannot consume the entire dialTimeout budget when multiple addresses
289+
// are available - the outer batch signal can still abort everything
290+
const addressSignal = anySignal([signal, AbortSignal.timeout(this.addressDialTimeout)])
291+
setMaxListeners(Infinity, addressSignal)
292+
282293
try {
283294
// try to dial the address
284295
const conn = await this.components.transportManager.dial(address.multiaddr, {
285296
...options,
286-
signal
297+
signal: addressSignal
287298
})
288299

289300
this.log('dial to %a succeeded', address.multiaddr)
@@ -323,12 +334,16 @@ export class DialQueue {
323334
}
324335
}
325336

326-
// the user/dial timeout/shutdown controller signal aborted
337+
// the user/batch timeout/shutdown controller signal aborted - stop
338+
// trying further addresses for this peer
327339
if (signal.aborted) {
328340
throw new TimeoutError(err.message)
329341
}
330342

331343
errors.push(err)
344+
} finally {
345+
// unregister listeners on parent signals immediately to avoid leaks
346+
addressSignal.clear()
332347
}
333348
}
334349
}

packages/libp2p/src/connection-manager/index.ts

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import { pEvent } from 'p-event'
66
import { CustomProgressEvent } from 'progress-events'
77
import { getPeerAddress } from '../get-peer.js'
88
import { ConnectionPruner } from './connection-pruner.js'
9-
import { DIAL_TIMEOUT, INBOUND_CONNECTION_THRESHOLD, MAX_CONNECTIONS, MAX_DIAL_QUEUE_LENGTH, MAX_INCOMING_PENDING_CONNECTIONS, MAX_PARALLEL_DIALS, MAX_PEER_ADDRS_TO_DIAL } from './constants.js'
9+
import { ADDRESS_DIAL_TIMEOUT, DIAL_TIMEOUT, INBOUND_CONNECTION_THRESHOLD, MAX_CONNECTIONS, MAX_DIAL_QUEUE_LENGTH, MAX_INCOMING_PENDING_CONNECTIONS, MAX_PARALLEL_DIALS, MAX_PEER_ADDRS_TO_DIAL } from './constants.js'
1010
import { DialQueue } from './dial-queue.js'
1111
import { ReconnectQueue } from './reconnect-queue.js'
1212
import { dnsaddrResolver } from './resolvers/index.ts'
@@ -66,6 +66,16 @@ export interface ConnectionManagerInit {
6666
*/
6767
dialTimeout?: number
6868

69+
/**
70+
* How long a single address dial attempt is allowed to take before the
71+
* dialer moves on to the next address. This prevents a single slow or
72+
* unreachable address from consuming the entire `dialTimeout` budget when
73+
* multiple addresses are available for a peer.
74+
*
75+
* @default 6_000
76+
*/
77+
addressDialTimeout?: number
78+
6979
/**
7080
* How many ms to wait when closing a connection if an abort signal is not
7181
* passed
@@ -261,6 +271,7 @@ export class DefaultConnectionManager implements ConnectionManager, Startable {
261271
maxDialQueueLength: init.maxDialQueueLength ?? MAX_DIAL_QUEUE_LENGTH,
262272
maxPeerAddrsToDial: init.maxPeerAddrsToDial ?? MAX_PEER_ADDRS_TO_DIAL,
263273
dialTimeout: init.dialTimeout ?? DIAL_TIMEOUT,
274+
addressDialTimeout: init.addressDialTimeout ?? ADDRESS_DIAL_TIMEOUT,
264275
resolvers: init.resolvers ?? {
265276
dnsaddr: dnsaddrResolver
266277
},

packages/libp2p/test/connection-manager/dial-queue.spec.ts

Lines changed: 162 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -430,6 +430,168 @@ describe('dial queue', () => {
430430
expect(components.transportManager.dial.callCount).to.equal(1, 'should have coalesced multiple dials to same dial')
431431
})
432432

433+
describe('addressDialTimeout', () => {
434+
// helper: returns a transport stub that hangs until the signal fires
435+
function hangUntilAborted (options?: { signal?: AbortSignal }): Promise<Connection> {
436+
return new Promise<Connection>((_resolve, reject) => {
437+
options?.signal?.addEventListener('abort', () => { reject(options.signal?.reason) }, { once: true })
438+
})
439+
}
440+
441+
it('should move on to the next address when one address dial hangs', async () => {
442+
const connection = stubInterface<Connection>()
443+
const addressDialTimeout = 100
444+
const dialledAddrs: string[] = []
445+
446+
components.transportManager.dialTransportForMultiaddr.returns(stubInterface<Transport>())
447+
components.transportManager.dial.callsFake(async (ma, options) => {
448+
dialledAddrs.push(ma.toString())
449+
450+
if (ma.toString().includes('1231')) {
451+
return hangUntilAborted(options)
452+
}
453+
454+
return connection
455+
})
456+
457+
dialer = new DialQueue(components, {
458+
addressDialTimeout,
459+
dialTimeout: 5000
460+
})
461+
462+
const start = Date.now()
463+
const conn = await dialer.dial([
464+
multiaddr('/ip4/127.0.0.1/tcp/1231'),
465+
multiaddr('/ip4/127.0.0.1/tcp/1232')
466+
])
467+
const elapsed = Date.now() - start
468+
469+
expect(conn).to.equal(connection)
470+
// both addresses must have been attempted
471+
expect(dialledAddrs).to.include('/ip4/127.0.0.1/tcp/1231')
472+
expect(dialledAddrs).to.include('/ip4/127.0.0.1/tcp/1232')
473+
// elapsed >= addressDialTimeout (first hung) and well under dialTimeout
474+
expect(elapsed).to.be.greaterThanOrEqual(addressDialTimeout)
475+
expect(elapsed).to.be.lessThan(addressDialTimeout * 4)
476+
})
477+
478+
it('should try all addresses when multiple dials hang sequentially before one succeeds', async () => {
479+
// This is the exact scenario from the bug report:
480+
// [/ip4/10.2.0.2, /ip4/127.0.0.1, /ip4/172.20.5.94] where only the last is reachable.
481+
// With the old code the first address consumed the entire dialTimeout.
482+
const connection = stubInterface<Connection>()
483+
const addressDialTimeout = 100
484+
const dialledAddrs: string[] = []
485+
let dialsAborted = 0
486+
487+
components.transportManager.dialTransportForMultiaddr.returns(stubInterface<Transport>())
488+
components.transportManager.dial.callsFake(async (ma, options) => {
489+
const maStr = ma.toString()
490+
dialledAddrs.push(maStr)
491+
492+
if (!maStr.includes('1234')) {
493+
// first three addresses hang – will be cut by per-address timeout
494+
options?.signal?.addEventListener('abort', () => { dialsAborted++ }, { once: true })
495+
return hangUntilAborted(options)
496+
}
497+
498+
return connection
499+
})
500+
501+
dialer = new DialQueue(components, {
502+
addressDialTimeout,
503+
dialTimeout: 10_000
504+
})
505+
506+
const conn = await dialer.dial([
507+
multiaddr('/ip4/127.0.0.1/tcp/1231'),
508+
multiaddr('/ip4/127.0.0.1/tcp/1232'),
509+
multiaddr('/ip4/127.0.0.1/tcp/1233'),
510+
multiaddr('/ip4/127.0.0.1/tcp/1234')
511+
])
512+
513+
expect(conn).to.equal(connection)
514+
// all four addresses were attempted in order
515+
expect(dialledAddrs).to.deep.equal([
516+
'/ip4/127.0.0.1/tcp/1231',
517+
'/ip4/127.0.0.1/tcp/1232',
518+
'/ip4/127.0.0.1/tcp/1233',
519+
'/ip4/127.0.0.1/tcp/1234'
520+
])
521+
// per-address signal was aborted for each of the three hung dials
522+
expect(dialsAborted).to.equal(3)
523+
})
524+
525+
it('should throw an AggregateError when every address times out per-address before the batch timeout', async () => {
526+
const addressDialTimeout = 100
527+
528+
components.transportManager.dialTransportForMultiaddr.returns(stubInterface<Transport>())
529+
components.transportManager.dial.callsFake(async (ma, options) => hangUntilAborted(options))
530+
531+
dialer = new DialQueue(components, {
532+
addressDialTimeout,
533+
dialTimeout: 10_000 // batch timeout is far away
534+
})
535+
536+
const err = await dialer.dial([
537+
multiaddr('/ip4/127.0.0.1/tcp/1231'),
538+
multiaddr('/ip4/127.0.0.1/tcp/1232')
539+
]).catch(e => e)
540+
541+
// each address timed out individually → AggregateError, not TimeoutError
542+
expect(err).to.have.property('name', 'AggregateError')
543+
})
544+
545+
it('should throw a TimeoutError when the batch timeout fires before the per-address timeout', async () => {
546+
const dialTimeout = 100
547+
const addressDialTimeout = 5000 // much longer than dialTimeout
548+
549+
components.transportManager.dialTransportForMultiaddr.returns(stubInterface<Transport>())
550+
components.transportManager.dial.callsFake(async (ma, options) => hangUntilAborted(options))
551+
552+
dialer = new DialQueue(components, {
553+
addressDialTimeout,
554+
dialTimeout
555+
})
556+
557+
const err = await dialer.dial([
558+
multiaddr('/ip4/127.0.0.1/tcp/1231'),
559+
multiaddr('/ip4/127.0.0.1/tcp/1232')
560+
]).catch(e => e)
561+
562+
// batch timeout fires → name is 'TimeoutError' (not AggregateError)
563+
// Note: the error is the native DOMException from AbortSignal.timeout(),
564+
// not our custom TimeoutError class, because JobRecipient rejects with
565+
// the raw signal reason before our callback's throw can propagate.
566+
expect(err).to.have.property('name', 'TimeoutError')
567+
})
568+
569+
it('should not delay dials that succeed within the addressDialTimeout', async () => {
570+
const connection = stubInterface<Connection>()
571+
const addressDialTimeout = 500
572+
573+
components.transportManager.dialTransportForMultiaddr.returns(stubInterface<Transport>())
574+
components.transportManager.dial.callsFake(async () => {
575+
await delay(10) // quick success, well within addressDialTimeout
576+
return connection
577+
})
578+
579+
dialer = new DialQueue(components, {
580+
addressDialTimeout,
581+
dialTimeout: 5000
582+
})
583+
584+
const conn = await dialer.dial([
585+
multiaddr('/ip4/127.0.0.1/tcp/1231'),
586+
multiaddr('/ip4/127.0.0.1/tcp/1232')
587+
])
588+
589+
expect(conn).to.equal(connection)
590+
// first address succeeded – second address should never have been dialled
591+
expect(components.transportManager.dial.callCount).to.equal(1)
592+
})
593+
})
594+
433595
it('should continue dial when new addresses are discovered', async () => {
434596
const remotePeer = peerIdFromPrivateKey(await generateKeyPair('Ed25519'))
435597
const ma1 = multiaddr(`/ip6/2001:db8:1:2:3:4:5:6/tcp/123/p2p/${remotePeer}`)

0 commit comments

Comments
 (0)