diff --git a/packages/libp2p/src/connection-manager/constants.defaults.ts b/packages/libp2p/src/connection-manager/constants.defaults.ts index 868f17d770..1adc0a319f 100644 --- a/packages/libp2p/src/connection-manager/constants.defaults.ts +++ b/packages/libp2p/src/connection-manager/constants.defaults.ts @@ -3,6 +3,11 @@ */ export const DIAL_TIMEOUT = 10_000 +/** + * @see https://libp2p.github.io/js-libp2p/interfaces/libp2p.index.ConnectionManagerInit.html#addressDialTimeout + */ +export const ADDRESS_DIAL_TIMEOUT = 6_000 + /** * @see https://libp2p.github.io/js-libp2p/interfaces/libp2p.index.ConnectionManagerInit.html#connectionCloseTimeout */ diff --git a/packages/libp2p/src/connection-manager/dial-queue.ts b/packages/libp2p/src/connection-manager/dial-queue.ts index e0ae88d866..dc40c52dbb 100644 --- a/packages/libp2p/src/connection-manager/dial-queue.ts +++ b/packages/libp2p/src/connection-manager/dial-queue.ts @@ -12,6 +12,7 @@ import { DialDeniedError, NoValidAddressesError } from '../errors.js' import { getPeerAddress } from '../get-peer.js' import { defaultAddressSorter } from './address-sorter.js' import { + ADDRESS_DIAL_TIMEOUT, DIAL_TIMEOUT, MAX_PARALLEL_DIALS, MAX_PEER_ADDRS_TO_DIAL, @@ -45,6 +46,7 @@ interface DialerInit { maxDialQueueLength?: number maxPeerAddrsToDial?: number dialTimeout?: number + addressDialTimeout?: number resolvers?: Record connections?: PeerMap } @@ -54,6 +56,7 @@ const defaultOptions = { maxDialQueueLength: MAX_DIAL_QUEUE_LENGTH, maxPeerAddrsToDial: MAX_PEER_ADDRS_TO_DIAL, dialTimeout: DIAL_TIMEOUT, + addressDialTimeout: ADDRESS_DIAL_TIMEOUT, resolvers: { dnsaddr: dnsaddrResolver } @@ -77,6 +80,7 @@ export class DialQueue { private readonly maxPeerAddrsToDial: number private readonly maxDialQueueLength: number private readonly dialTimeout: number + private readonly addressDialTimeout: number private shutDownController: AbortController private readonly connections: PeerMap private readonly log: Logger @@ -87,6 +91,7 @@ export class DialQueue { this.maxPeerAddrsToDial = init.maxPeerAddrsToDial ?? defaultOptions.maxPeerAddrsToDial this.maxDialQueueLength = init.maxDialQueueLength ?? defaultOptions.maxDialQueueLength this.dialTimeout = init.dialTimeout ?? defaultOptions.dialTimeout + this.addressDialTimeout = init.addressDialTimeout ?? defaultOptions.addressDialTimeout this.connections = init.connections ?? new PeerMap() this.log = components.logger.forComponent('libp2p:connection-manager:dial-queue') this.components = components @@ -279,11 +284,17 @@ export class DialQueue { dialed++ + // create a per-address signal so a single slow/unreachable address + // cannot consume the entire dialTimeout budget when multiple addresses + // are available - the outer batch signal can still abort everything + const addressSignal = anySignal([signal, AbortSignal.timeout(this.addressDialTimeout)]) + setMaxListeners(Infinity, addressSignal) + try { // try to dial the address const conn = await this.components.transportManager.dial(address.multiaddr, { ...options, - signal + signal: addressSignal }) this.log('dial to %a succeeded', address.multiaddr) @@ -323,12 +334,16 @@ export class DialQueue { } } - // the user/dial timeout/shutdown controller signal aborted + // the user/batch timeout/shutdown controller signal aborted - stop + // trying further addresses for this peer if (signal.aborted) { throw new TimeoutError(err.message) } errors.push(err) + } finally { + // unregister listeners on parent signals immediately to avoid leaks + addressSignal.clear() } } } diff --git a/packages/libp2p/src/connection-manager/index.ts b/packages/libp2p/src/connection-manager/index.ts index e1c32da355..d99d89ee9d 100644 --- a/packages/libp2p/src/connection-manager/index.ts +++ b/packages/libp2p/src/connection-manager/index.ts @@ -6,7 +6,7 @@ import { pEvent } from 'p-event' import { CustomProgressEvent } from 'progress-events' import { getPeerAddress } from '../get-peer.js' import { ConnectionPruner } from './connection-pruner.js' -import { DIAL_TIMEOUT, INBOUND_CONNECTION_THRESHOLD, MAX_CONNECTIONS, MAX_DIAL_QUEUE_LENGTH, MAX_INCOMING_PENDING_CONNECTIONS, MAX_PARALLEL_DIALS, MAX_PEER_ADDRS_TO_DIAL } from './constants.js' +import { ADDRESS_DIAL_TIMEOUT, DIAL_TIMEOUT, INBOUND_CONNECTION_THRESHOLD, MAX_CONNECTIONS, MAX_DIAL_QUEUE_LENGTH, MAX_INCOMING_PENDING_CONNECTIONS, MAX_PARALLEL_DIALS, MAX_PEER_ADDRS_TO_DIAL } from './constants.js' import { DialQueue } from './dial-queue.js' import { ReconnectQueue } from './reconnect-queue.js' import { dnsaddrResolver } from './resolvers/index.ts' @@ -66,6 +66,16 @@ export interface ConnectionManagerInit { */ dialTimeout?: number + /** + * How long a single address dial attempt is allowed to take before the + * dialer moves on to the next address. This prevents a single slow or + * unreachable address from consuming the entire `dialTimeout` budget when + * multiple addresses are available for a peer. + * + * @default 6_000 + */ + addressDialTimeout?: number + /** * How many ms to wait when closing a connection if an abort signal is not * passed @@ -261,6 +271,7 @@ export class DefaultConnectionManager implements ConnectionManager, Startable { maxDialQueueLength: init.maxDialQueueLength ?? MAX_DIAL_QUEUE_LENGTH, maxPeerAddrsToDial: init.maxPeerAddrsToDial ?? MAX_PEER_ADDRS_TO_DIAL, dialTimeout: init.dialTimeout ?? DIAL_TIMEOUT, + addressDialTimeout: init.addressDialTimeout ?? ADDRESS_DIAL_TIMEOUT, resolvers: init.resolvers ?? { dnsaddr: dnsaddrResolver }, diff --git a/packages/libp2p/test/connection-manager/dial-queue.spec.ts b/packages/libp2p/test/connection-manager/dial-queue.spec.ts index 09c4d92ca8..a9898cb508 100644 --- a/packages/libp2p/test/connection-manager/dial-queue.spec.ts +++ b/packages/libp2p/test/connection-manager/dial-queue.spec.ts @@ -430,6 +430,168 @@ describe('dial queue', () => { expect(components.transportManager.dial.callCount).to.equal(1, 'should have coalesced multiple dials to same dial') }) + describe('addressDialTimeout', () => { + // helper: returns a transport stub that hangs until the signal fires + function hangUntilAborted (options?: { signal?: AbortSignal }): Promise { + return new Promise((_resolve, reject) => { + options?.signal?.addEventListener('abort', () => { reject(options.signal?.reason) }, { once: true }) + }) + } + + it('should move on to the next address when one address dial hangs', async () => { + const connection = stubInterface() + const addressDialTimeout = 100 + const dialledAddrs: string[] = [] + + components.transportManager.dialTransportForMultiaddr.returns(stubInterface()) + components.transportManager.dial.callsFake(async (ma, options) => { + dialledAddrs.push(ma.toString()) + + if (ma.toString().includes('1231')) { + return hangUntilAborted(options) + } + + return connection + }) + + dialer = new DialQueue(components, { + addressDialTimeout, + dialTimeout: 5000 + }) + + const start = Date.now() + const conn = await dialer.dial([ + multiaddr('/ip4/127.0.0.1/tcp/1231'), + multiaddr('/ip4/127.0.0.1/tcp/1232') + ]) + const elapsed = Date.now() - start + + expect(conn).to.equal(connection) + // both addresses must have been attempted + expect(dialledAddrs).to.include('/ip4/127.0.0.1/tcp/1231') + expect(dialledAddrs).to.include('/ip4/127.0.0.1/tcp/1232') + // elapsed >= addressDialTimeout (first hung) and well under dialTimeout + expect(elapsed).to.be.greaterThanOrEqual(addressDialTimeout) + expect(elapsed).to.be.lessThan(addressDialTimeout * 4) + }) + + it('should try all addresses when multiple dials hang sequentially before one succeeds', async () => { + // This is the exact scenario from the bug report: + // [/ip4/10.2.0.2, /ip4/127.0.0.1, /ip4/172.20.5.94] where only the last is reachable. + // With the old code the first address consumed the entire dialTimeout. + const connection = stubInterface() + const addressDialTimeout = 100 + const dialledAddrs: string[] = [] + let dialsAborted = 0 + + components.transportManager.dialTransportForMultiaddr.returns(stubInterface()) + components.transportManager.dial.callsFake(async (ma, options) => { + const maStr = ma.toString() + dialledAddrs.push(maStr) + + if (!maStr.includes('1234')) { + // first three addresses hang – will be cut by per-address timeout + options?.signal?.addEventListener('abort', () => { dialsAborted++ }, { once: true }) + return hangUntilAborted(options) + } + + return connection + }) + + dialer = new DialQueue(components, { + addressDialTimeout, + dialTimeout: 10_000 + }) + + const conn = await dialer.dial([ + multiaddr('/ip4/127.0.0.1/tcp/1231'), + multiaddr('/ip4/127.0.0.1/tcp/1232'), + multiaddr('/ip4/127.0.0.1/tcp/1233'), + multiaddr('/ip4/127.0.0.1/tcp/1234') + ]) + + expect(conn).to.equal(connection) + // all four addresses were attempted in order + expect(dialledAddrs).to.deep.equal([ + '/ip4/127.0.0.1/tcp/1231', + '/ip4/127.0.0.1/tcp/1232', + '/ip4/127.0.0.1/tcp/1233', + '/ip4/127.0.0.1/tcp/1234' + ]) + // per-address signal was aborted for each of the three hung dials + expect(dialsAborted).to.equal(3) + }) + + it('should throw an AggregateError when every address times out per-address before the batch timeout', async () => { + const addressDialTimeout = 100 + + components.transportManager.dialTransportForMultiaddr.returns(stubInterface()) + components.transportManager.dial.callsFake(async (ma, options) => hangUntilAborted(options)) + + dialer = new DialQueue(components, { + addressDialTimeout, + dialTimeout: 10_000 // batch timeout is far away + }) + + const err = await dialer.dial([ + multiaddr('/ip4/127.0.0.1/tcp/1231'), + multiaddr('/ip4/127.0.0.1/tcp/1232') + ]).catch(e => e) + + // each address timed out individually → AggregateError, not TimeoutError + expect(err).to.have.property('name', 'AggregateError') + }) + + it('should throw a TimeoutError when the batch timeout fires before the per-address timeout', async () => { + const dialTimeout = 100 + const addressDialTimeout = 5000 // much longer than dialTimeout + + components.transportManager.dialTransportForMultiaddr.returns(stubInterface()) + components.transportManager.dial.callsFake(async (ma, options) => hangUntilAborted(options)) + + dialer = new DialQueue(components, { + addressDialTimeout, + dialTimeout + }) + + const err = await dialer.dial([ + multiaddr('/ip4/127.0.0.1/tcp/1231'), + multiaddr('/ip4/127.0.0.1/tcp/1232') + ]).catch(e => e) + + // batch timeout fires → name is 'TimeoutError' (not AggregateError) + // Note: the error is the native DOMException from AbortSignal.timeout(), + // not our custom TimeoutError class, because JobRecipient rejects with + // the raw signal reason before our callback's throw can propagate. + expect(err).to.have.property('name', 'TimeoutError') + }) + + it('should not delay dials that succeed within the addressDialTimeout', async () => { + const connection = stubInterface() + const addressDialTimeout = 500 + + components.transportManager.dialTransportForMultiaddr.returns(stubInterface()) + components.transportManager.dial.callsFake(async () => { + await delay(10) // quick success, well within addressDialTimeout + return connection + }) + + dialer = new DialQueue(components, { + addressDialTimeout, + dialTimeout: 5000 + }) + + const conn = await dialer.dial([ + multiaddr('/ip4/127.0.0.1/tcp/1231'), + multiaddr('/ip4/127.0.0.1/tcp/1232') + ]) + + expect(conn).to.equal(connection) + // first address succeeded – second address should never have been dialled + expect(components.transportManager.dial.callCount).to.equal(1) + }) + }) + it('should continue dial when new addresses are discovered', async () => { const remotePeer = peerIdFromPrivateKey(await generateKeyPair('Ed25519')) const ma1 = multiaddr(`/ip6/2001:db8:1:2:3:4:5:6/tcp/123/p2p/${remotePeer}`)