Skip to content

Commit adec4df

Browse files
Paul Cwolfsoftwaresystemsltd
andcommitted
fix: client TCP fallback when UDP discovery can't find leader
Inside LXC containers, UDP broadcast/unicast discovery often can't reach the host nodes due to network namespace isolation. After 15 seconds of failed UDP discovery, the client now tries direct TCP connections to each configured peer and sends a SyncRequest. If a peer is the leader, it responds with the full index and the client syncs immediately. This means clients in LXC/Docker containers can now find and sync with the leader even when UDP discovery is completely blocked. Co-Authored-By: CodeWolf <paul@wolf.uk.com> Co-Authored-By: Wolf Software Systems Ltd <paul@wolf.uk.com>
1 parent 10dc877 commit adec4df

2 files changed

Lines changed: 82 additions & 2 deletions

File tree

wolfdisk/src/cluster/state.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,24 @@ impl ClusterManager {
115115
self.peers.read().unwrap().values().cloned().collect()
116116
}
117117

118+
/// Get config reference
119+
pub fn config(&self) -> &crate::Config {
120+
&self.config
121+
}
122+
123+
/// Add a peer address as the known leader (used when TCP probe finds leader directly)
124+
pub fn add_peer_as_leader(&self, address: &str) {
125+
let peer_id = format!("leader@{}", address);
126+
*self.leader_id.write().unwrap() = Some(peer_id.clone());
127+
self.peers.write().unwrap().insert(peer_id.clone(), PeerInfo {
128+
node_id: peer_id,
129+
address: address.to_string(),
130+
is_leader: true,
131+
is_client: false,
132+
last_seen: std::time::Instant::now(),
133+
});
134+
}
135+
118136
/// Get current index version
119137
pub fn index_version(&self) -> u64 {
120138
*self.index_version.read().unwrap()

wolfdisk/src/main.rs

Lines changed: 64 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1526,21 +1526,83 @@ fn main() {
15261526

15271527
let mut leader_found = false;
15281528
let mut wait_count = 0u64;
1529+
let sync_peers = sync_cluster.config().cluster.peers.clone();
15291530
loop {
15301531
if wait_count >= max_wait_iterations {
15311532
break;
15321533
}
15331534
std::thread::sleep(std::time::Duration::from_millis(500));
15341535
wait_count += 1;
1535-
1536+
15361537
if let Some(lid) = sync_cluster.leader_id() {
15371538
// Don't try to sync from ourselves
15381539
if lid != sync_node_id {
15391540
leader_found = true;
15401541
break;
15411542
}
15421543
}
1543-
1544+
1545+
// After 15s with no leader from UDP discovery, try direct TCP probe to configured peers
1546+
if wait_count == 30 && !sync_peers.is_empty() {
1547+
info!("UDP discovery hasn't found leader — trying direct TCP probe to {} configured peers", sync_peers.len());
1548+
for peer_addr in &sync_peers {
1549+
info!("Probing peer at {} via TCP...", peer_addr);
1550+
match wolfdisk::network::peer::PeerConnection::connect("probe".to_string(), peer_addr) {
1551+
Ok(conn) => {
1552+
// Try a SyncRequest — if this peer is the leader, it'll respond
1553+
let msg = Message::SyncRequest(SyncRequestMsg { from_version: 0 });
1554+
match conn.request(&msg) {
1555+
Ok(Message::SyncResponse(response)) => {
1556+
info!("Direct TCP probe to {} succeeded! Got {} entries — this peer is the leader", peer_addr, response.entries.len());
1557+
// Apply the sync response directly
1558+
let mut added = 0usize;
1559+
{
1560+
let mut index = sync_file_index.write().unwrap();
1561+
let mut inode_tbl = sync_inode_table.write().unwrap();
1562+
let mut next_ino = sync_next_inode.write().unwrap();
1563+
for entry_msg in &response.entries {
1564+
let path = std::path::PathBuf::from(&entry_msg.path);
1565+
let chunk_refs: Vec<ChunkRef> = entry_msg.chunks.iter()
1566+
.map(|c| ChunkRef { hash: c.hash, offset: c.offset, size: c.size })
1567+
.collect();
1568+
let entry = FileEntry {
1569+
size: entry_msg.size,
1570+
is_dir: entry_msg.is_dir,
1571+
permissions: entry_msg.permissions,
1572+
uid: 0, gid: 0,
1573+
modified: std::time::UNIX_EPOCH + std::time::Duration::from_millis(entry_msg.modified_ms),
1574+
created: std::time::SystemTime::now(),
1575+
accessed: std::time::SystemTime::now(),
1576+
chunks: chunk_refs,
1577+
symlink_target: None,
1578+
};
1579+
if !index.contains(&path) {
1580+
if inode_tbl.get_inode(&path).is_none() {
1581+
let ino = *next_ino;
1582+
*next_ino += 1;
1583+
inode_tbl.insert(ino, path.clone());
1584+
}
1585+
added += 1;
1586+
}
1587+
index.insert(path, entry);
1588+
}
1589+
}
1590+
info!("Direct TCP sync complete: {} entries added from {}", added, peer_addr);
1591+
// Store the peer address as leader so future ops can find it
1592+
sync_cluster.add_peer_as_leader(peer_addr);
1593+
leader_found = true;
1594+
break;
1595+
}
1596+
Ok(_) => info!("Peer {} responded but is not leader", peer_addr),
1597+
Err(e) => info!("Peer {} SyncRequest failed: {} (may not be leader)", peer_addr, e),
1598+
}
1599+
}
1600+
Err(e) => info!("TCP connect to {} failed: {}", peer_addr, e),
1601+
}
1602+
}
1603+
if leader_found { break; }
1604+
}
1605+
15441606
// Log progress for clients waiting a long time
15451607
if sync_is_client && wait_count % 20 == 0 {
15461608
info!("Client waiting for leader... ({}s elapsed)", wait_count / 2);

0 commit comments

Comments
 (0)