commit 6389bcb607f07c2d203e77f9fe8c858a0a95c58f from: murilo ijanc date: Fri Mar 27 03:41:45 2026 UTC Bump version to 0.1.3 - Fix find_value_reply serialization offsets (ID_LEN=20 vs 32) - Chunked find_value replies to avoid IP fragmentation - Increase UDP recv buffer to 65535 - Debug logging for signature failures and send results commit - 08a262b9bd8293d8bae147e81383fb4ed17c728b commit + 6389bcb607f07c2d203e77f9fe8c858a0a95c58f blob - 4d0caf0b8bdb1a3d20697232e100da7d1280dbed blob + 37d5b386e23f12864eb3893d631348affae761dd --- CHANGELOG.md +++ CHANGELOG.md @@ -5,6 +5,25 @@ All notable changes to this project will be documented The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [0.1.3] - 2026-03-27 + +### Fixed + +- Fix find_value_reply serialization offsets that assumed ID_LEN=20 + instead of 32, corrupting the NodeId field and flag byte. This caused + receivers to reject value replies with "invalid message". +- Increase UDP recv buffer from 4096 to 65535 to prevent truncation of + large value replies (which invalidated Ed25519 signatures). + +### Added + +- Chunked find_value replies: values larger than 1100 bytes are now + split into MTU-safe chunks using the existing index/total fields, + avoiding IP fragmentation. Receiver reassembles chunks before + processing. +- Debug logging for signature verification failures and find_value_reply + send results. + ## [0.1.2] - 2026-03-25 ### Added blob - 78efc00c67547bb906e75862b823f5e80ac2a86c blob + 441a3827bfd482aadc62ecbcfe981a05122eb92f --- Cargo.toml +++ Cargo.toml @@ -1,6 +1,6 @@ [package] name = "tesseras-dht" -version = "0.1.2" +version = "0.1.3" edition = "2024" authors = [ "murilo ijanc' " ] categories = ["network-programming"] blob - 72ec019e6098aeed78af0fd7703d6316d10b5e54 blob + 3928856762d60d6ea12c03baa8c54838ed554481 --- src/dht.rs +++ src/dht.rs @@ -344,6 +344,12 @@ pub enum QueryPhase { /// returning best-effort results. pub const MAX_QUERY_DURATION: Duration = Duration::from_secs(30); +/// Maximum chunk payload size for find_value replies. +/// Keeps UDP datagrams under typical MTU (~1280) to +/// avoid IP fragmentation: +/// HEADER(72) + FIXED(44) + CHUNK + SIG(64) ≤ 1280 +pub const VALUE_CHUNK_SIZE: usize = 1100; + pub struct IterativeQuery { pub target: NodeId, pub closest: Vec, @@ -361,6 +367,11 @@ pub struct IterativeQuery { /// Measures the "depth" of the lookup — useful for /// diagnosing network topology and routing efficiency. pub hops: u32, + + /// Reassembly buffer for chunked value replies. + /// Maps sender NodeId → (total_chunks, received chunks + /// indexed by chunk index). + pub chunks: HashMap>)>, } impl IterativeQuery { @@ -378,6 +389,7 @@ impl IterativeQuery { nonce, started_at: Instant::now(), hops: 0, + chunks: HashMap::new(), } } @@ -395,6 +407,7 @@ impl IterativeQuery { nonce, started_at: Instant::now(), hops: 0, + chunks: HashMap::new(), } } @@ -461,6 +474,48 @@ impl IterativeQuery { self.phase = QueryPhase::Done; } + /// Process a chunked value reply. Returns the + /// reassembled value when all chunks are received. + pub fn process_chunk( + &mut self, + from: &NodeId, + index: u16, + total: u16, + data: Vec, + ) -> Option> { + if total == 0 { + return None; + } + // Single chunk — no reassembly needed + if total == 1 { + return Some(data); + } + let entry = self + .chunks + .entry(*from) + .or_insert_with(|| (total, HashMap::new())); + // Reject mismatched total + if entry.0 != total { + return None; + } + entry.1.insert(index, data); + if entry.1.len() == total as usize { + // Reassemble in order + let mut full = Vec::new(); + for i in 0..total { + if let Some(chunk) = entry.1.remove(&i) { + full.extend_from_slice(&chunk); + } else { + return None; + } + } + self.chunks.remove(from); + Some(full) + } else { + None + } + } + /// Mark a peer as timed out. pub fn timeout(&mut self, id: &NodeId) { self.pending.remove(id); blob - 1ba3270aa01acc77f7f0cf095038ca0391b640c0 blob + e5714bcdc221d0c7313a4744109e285dd4623fe3 --- src/handlers.rs +++ src/handlers.rs @@ -702,41 +702,60 @@ impl Node { let values = self.storage.get(&fv.target, &fv.key); if !values.is_empty() { - log::debug!("Found {} value(s) locally", values.len()); - - // Send value reply using DHT_FIND_VALUE_REPLY - // with DATA_ARE_VALUES flag let val = &values[0]; + let chunk_size = crate::dht::VALUE_CHUNK_SIZE; + let num_chunks = val.value.len().div_ceil(chunk_size); + let num_chunks = num_chunks.max(1) as u16; + + log::debug!( + "Found {} value(s) locally, sending {} chunk(s)", + values.len(), + num_chunks + ); + let fixed = msg::FIND_VALUE_REPLY_FIXED; - let total = HEADER_SIZE + fixed + val.value.len(); - let mut rbuf = vec![0u8; total]; - let rhdr = MsgHeader::new( - MsgType::DhtFindValueReply, - Self::len16(total), - self.id, - hdr.src, - ); - if rhdr.write(&mut rbuf).is_ok() { - let off = HEADER_SIZE; - rbuf[off..off + 4].copy_from_slice(&fv.nonce.to_be_bytes()); - fv.target - .write_to(&mut rbuf[off + 4..off + 4 + crate::id::ID_LEN]); - let f = 4 + crate::id::ID_LEN; // 36 - rbuf[off + f..off + f + 2].copy_from_slice(&0u16.to_be_bytes()); // index - rbuf[off + f + 2..off + f + 4].copy_from_slice(&1u16.to_be_bytes()); // total - rbuf[off + f + 4] = crate::wire::DATA_ARE_VALUES; - rbuf[off + f + 5] = 0; - rbuf[off + f + 6] = 0; - rbuf[off + f + 7] = 0; - rbuf[HEADER_SIZE + fixed..].copy_from_slice(&val.value); - match self.send_signed(&rbuf, from) { - Ok(n) => log::debug!( - "Sent find_value_reply ({} bytes) to {from}", - n - ), - Err(e) => log::warn!( - "Failed to send find_value_reply to {from}: {e}" - ), + let f = 4 + crate::id::ID_LEN; // 36 + + for i in 0..num_chunks { + let start = i as usize * chunk_size; + let end = ((i as usize + 1) * chunk_size).min(val.value.len()); + let chunk = &val.value[start..end]; + + let pkt_total = HEADER_SIZE + fixed + chunk.len(); + let mut rbuf = vec![0u8; pkt_total]; + let rhdr = MsgHeader::new( + MsgType::DhtFindValueReply, + Self::len16(pkt_total), + self.id, + hdr.src, + ); + if rhdr.write(&mut rbuf).is_ok() { + let off = HEADER_SIZE; + rbuf[off..off + 4].copy_from_slice(&fv.nonce.to_be_bytes()); + fv.target.write_to( + &mut rbuf[off + 4..off + 4 + crate::id::ID_LEN], + ); + rbuf[off + f..off + f + 2] + .copy_from_slice(&i.to_be_bytes()); + rbuf[off + f + 2..off + f + 4] + .copy_from_slice(&num_chunks.to_be_bytes()); + rbuf[off + f + 4] = crate::wire::DATA_ARE_VALUES; + rbuf[off + f + 5] = 0; + rbuf[off + f + 6] = 0; + rbuf[off + f + 7] = 0; + rbuf[HEADER_SIZE + fixed..].copy_from_slice(chunk); + match self.send_signed(&rbuf, from) { + Ok(n) => log::debug!( + "Sent chunk {}/{} ({n} bytes) to {from}", + i + 1, + num_chunks + ), + Err(e) => log::warn!( + "Failed to send chunk {}/{} to {from}: {e}", + i + 1, + num_chunks + ), + } } } } else { @@ -823,64 +842,81 @@ impl Node { .map(|(n, _)| *n); match reply.data { - msg::FindValueReplyData::Value { data, .. } => { - log::info!( - "Received value from {:?}: {} bytes", + msg::FindValueReplyData::Value { index, total, data } => { + log::debug!( + "Received chunk {}/{} from {:?}: {} bytes", + index + 1, + total, sender_id, data.len() ); - // Cache the value locally and republish - // to the closest queried node without it - // (§2.3: cache on nearest without value) if let Some(nonce) = matching_nonce { - if let Some(q) = self.queries.get_mut(&nonce) { - // Store in local storage for - // subsequent get() calls - let val = crate::dht::StoredValue { - key: q.key.clone(), - value: data.clone(), - id: q.target, - source: sender_id, - ttl: 300, - stored_at: Instant::now(), - is_unique: false, - original: 0, - recvd: std::collections::HashSet::new(), - version: crate::dht::now_version(), - }; - self.storage.store(val); + // Reassemble chunks + let reassembled = + self.queries.get_mut(&nonce).and_then(|q| { + q.process_chunk(&sender_id, index, total, data) + }); - // §2.3: store on nearest queried - // node that didn't have the value - let target = q.target; - let key = q.key.clone(); - let nearest_without: Option = q - .closest - .iter() - .find(|p| { - q.queried.contains(&p.id) && p.id != sender_id - }) - .cloned(); + if let Some(full_value) = reassembled { + log::info!( + "Reassembled value from {:?}: {} bytes", + sender_id, + full_value.len() + ); - q.process_value(data.clone()); - - if let Some(peer) = nearest_without { - let store_msg = crate::msg::StoreMsg { - id: target, - from: self.id, - key, - value: data, + if let Some(q) = self.queries.get_mut(&nonce) { + // Cache locally + let val = crate::dht::StoredValue { + key: q.key.clone(), + value: full_value.clone(), + id: q.target, + source: sender_id, ttl: 300, + stored_at: Instant::now(), is_unique: false, + original: 0, + recvd: std::collections::HashSet::new(), + version: crate::dht::now_version(), }; - if let Err(e) = self.send_store(&peer, &store_msg) { - log::debug!("Republish-on-access failed: {e}"); - } else { - log::debug!( - "Republished value to {:?} (nearest without)", - peer.id, - ); + self.storage.store(val); + + // §2.3: store on nearest queried + // node that didn't have the value + let target = q.target; + let key = q.key.clone(); + let nearest_without: Option = q + .closest + .iter() + .find(|p| { + q.queried.contains(&p.id) + && p.id != sender_id + }) + .cloned(); + + q.process_value(full_value.clone()); + + if let Some(peer) = nearest_without { + let store_msg = crate::msg::StoreMsg { + id: target, + from: self.id, + key, + value: full_value, + ttl: 300, + is_unique: false, + }; + if let Err(e) = + self.send_store(&peer, &store_msg) + { + log::debug!( + "Republish-on-access failed: {e}" + ); + } else { + log::debug!( + "Republished value to {:?} (nearest without)", + peer.id, + ); + } } } }