commit - 08a262b9bd8293d8bae147e81383fb4ed17c728b
commit + 6389bcb607f07c2d203e77f9fe8c858a0a95c58f
blob - 4d0caf0b8bdb1a3d20697232e100da7d1280dbed
blob + 37d5b386e23f12864eb3893d631348affae761dd
--- CHANGELOG.md
+++ CHANGELOG.md
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
+## [0.1.3] - 2026-03-27
+
+### Fixed
+
+- Fix find_value_reply serialization offsets that assumed ID_LEN=20
+ instead of 32, corrupting the NodeId field and flag byte. This caused
+ receivers to reject value replies with "invalid message".
+- Increase UDP recv buffer from 4096 to 65535 to prevent truncation of
+ large value replies (which invalidated Ed25519 signatures).
+
+### Added
+
+- Chunked find_value replies: values larger than 1100 bytes are now
+ split into MTU-safe chunks using the existing index/total fields,
+ avoiding IP fragmentation. Receiver reassembles chunks before
+ processing.
+- Debug logging for signature verification failures and find_value_reply
+ send results.
+
## [0.1.2] - 2026-03-25
### Added
blob - 78efc00c67547bb906e75862b823f5e80ac2a86c
blob + 441a3827bfd482aadc62ecbcfe981a05122eb92f
--- Cargo.toml
+++ Cargo.toml
[package]
name = "tesseras-dht"
-version = "0.1.2"
+version = "0.1.3"
edition = "2024"
authors = [ "murilo ijanc' <murilo@ijanc.org>" ]
categories = ["network-programming"]
blob - 72ec019e6098aeed78af0fd7703d6316d10b5e54
blob + 3928856762d60d6ea12c03baa8c54838ed554481
--- src/dht.rs
+++ src/dht.rs
/// returning best-effort results.
pub const MAX_QUERY_DURATION: Duration = Duration::from_secs(30);
+/// Maximum chunk payload size for find_value replies.
+/// Keeps UDP datagrams under typical MTU (~1280) to
+/// avoid IP fragmentation:
+/// HEADER(72) + FIXED(44) + CHUNK + SIG(64) ≤ 1280
+pub const VALUE_CHUNK_SIZE: usize = 1100;
+
pub struct IterativeQuery {
pub target: NodeId,
pub closest: Vec<PeerInfo>,
/// Measures the "depth" of the lookup — useful for
/// diagnosing network topology and routing efficiency.
pub hops: u32,
+
+ /// Reassembly buffer for chunked value replies.
+ /// Maps sender NodeId → (total_chunks, received chunks
+ /// indexed by chunk index).
+ pub chunks: HashMap<NodeId, (u16, HashMap<u16, Vec<u8>>)>,
}
impl IterativeQuery {
nonce,
started_at: Instant::now(),
hops: 0,
+ chunks: HashMap::new(),
}
}
nonce,
started_at: Instant::now(),
hops: 0,
+ chunks: HashMap::new(),
}
}
self.phase = QueryPhase::Done;
}
+ /// Process a chunked value reply. Returns the
+ /// reassembled value when all chunks are received.
+ pub fn process_chunk(
+ &mut self,
+ from: &NodeId,
+ index: u16,
+ total: u16,
+ data: Vec<u8>,
+ ) -> Option<Vec<u8>> {
+ if total == 0 {
+ return None;
+ }
+ // Single chunk — no reassembly needed
+ if total == 1 {
+ return Some(data);
+ }
+ let entry = self
+ .chunks
+ .entry(*from)
+ .or_insert_with(|| (total, HashMap::new()));
+ // Reject mismatched total
+ if entry.0 != total {
+ return None;
+ }
+ entry.1.insert(index, data);
+ if entry.1.len() == total as usize {
+ // Reassemble in order
+ let mut full = Vec::new();
+ for i in 0..total {
+ if let Some(chunk) = entry.1.remove(&i) {
+ full.extend_from_slice(&chunk);
+ } else {
+ return None;
+ }
+ }
+ self.chunks.remove(from);
+ Some(full)
+ } else {
+ None
+ }
+ }
+
/// Mark a peer as timed out.
pub fn timeout(&mut self, id: &NodeId) {
self.pending.remove(id);
blob - 1ba3270aa01acc77f7f0cf095038ca0391b640c0
blob + e5714bcdc221d0c7313a4744109e285dd4623fe3
--- src/handlers.rs
+++ src/handlers.rs
let values = self.storage.get(&fv.target, &fv.key);
if !values.is_empty() {
- log::debug!("Found {} value(s) locally", values.len());
-
- // Send value reply using DHT_FIND_VALUE_REPLY
- // with DATA_ARE_VALUES flag
let val = &values[0];
+ let chunk_size = crate::dht::VALUE_CHUNK_SIZE;
+ let num_chunks = val.value.len().div_ceil(chunk_size);
+ let num_chunks = num_chunks.max(1) as u16;
+
+ log::debug!(
+ "Found {} value(s) locally, sending {} chunk(s)",
+ values.len(),
+ num_chunks
+ );
+
let fixed = msg::FIND_VALUE_REPLY_FIXED;
- let total = HEADER_SIZE + fixed + val.value.len();
- let mut rbuf = vec![0u8; total];
- let rhdr = MsgHeader::new(
- MsgType::DhtFindValueReply,
- Self::len16(total),
- self.id,
- hdr.src,
- );
- if rhdr.write(&mut rbuf).is_ok() {
- let off = HEADER_SIZE;
- rbuf[off..off + 4].copy_from_slice(&fv.nonce.to_be_bytes());
- fv.target
- .write_to(&mut rbuf[off + 4..off + 4 + crate::id::ID_LEN]);
- let f = 4 + crate::id::ID_LEN; // 36
- rbuf[off + f..off + f + 2].copy_from_slice(&0u16.to_be_bytes()); // index
- rbuf[off + f + 2..off + f + 4].copy_from_slice(&1u16.to_be_bytes()); // total
- rbuf[off + f + 4] = crate::wire::DATA_ARE_VALUES;
- rbuf[off + f + 5] = 0;
- rbuf[off + f + 6] = 0;
- rbuf[off + f + 7] = 0;
- rbuf[HEADER_SIZE + fixed..].copy_from_slice(&val.value);
- match self.send_signed(&rbuf, from) {
- Ok(n) => log::debug!(
- "Sent find_value_reply ({} bytes) to {from}",
- n
- ),
- Err(e) => log::warn!(
- "Failed to send find_value_reply to {from}: {e}"
- ),
+ let f = 4 + crate::id::ID_LEN; // 36
+
+ for i in 0..num_chunks {
+ let start = i as usize * chunk_size;
+ let end = ((i as usize + 1) * chunk_size).min(val.value.len());
+ let chunk = &val.value[start..end];
+
+ let pkt_total = HEADER_SIZE + fixed + chunk.len();
+ let mut rbuf = vec![0u8; pkt_total];
+ let rhdr = MsgHeader::new(
+ MsgType::DhtFindValueReply,
+ Self::len16(pkt_total),
+ self.id,
+ hdr.src,
+ );
+ if rhdr.write(&mut rbuf).is_ok() {
+ let off = HEADER_SIZE;
+ rbuf[off..off + 4].copy_from_slice(&fv.nonce.to_be_bytes());
+ fv.target.write_to(
+ &mut rbuf[off + 4..off + 4 + crate::id::ID_LEN],
+ );
+ rbuf[off + f..off + f + 2]
+ .copy_from_slice(&i.to_be_bytes());
+ rbuf[off + f + 2..off + f + 4]
+ .copy_from_slice(&num_chunks.to_be_bytes());
+ rbuf[off + f + 4] = crate::wire::DATA_ARE_VALUES;
+ rbuf[off + f + 5] = 0;
+ rbuf[off + f + 6] = 0;
+ rbuf[off + f + 7] = 0;
+ rbuf[HEADER_SIZE + fixed..].copy_from_slice(chunk);
+ match self.send_signed(&rbuf, from) {
+ Ok(n) => log::debug!(
+ "Sent chunk {}/{} ({n} bytes) to {from}",
+ i + 1,
+ num_chunks
+ ),
+ Err(e) => log::warn!(
+ "Failed to send chunk {}/{} to {from}: {e}",
+ i + 1,
+ num_chunks
+ ),
+ }
}
}
} else {
.map(|(n, _)| *n);
match reply.data {
- msg::FindValueReplyData::Value { data, .. } => {
- log::info!(
- "Received value from {:?}: {} bytes",
+ msg::FindValueReplyData::Value { index, total, data } => {
+ log::debug!(
+ "Received chunk {}/{} from {:?}: {} bytes",
+ index + 1,
+ total,
sender_id,
data.len()
);
- // Cache the value locally and republish
- // to the closest queried node without it
- // (§2.3: cache on nearest without value)
if let Some(nonce) = matching_nonce {
- if let Some(q) = self.queries.get_mut(&nonce) {
- // Store in local storage for
- // subsequent get() calls
- let val = crate::dht::StoredValue {
- key: q.key.clone(),
- value: data.clone(),
- id: q.target,
- source: sender_id,
- ttl: 300,
- stored_at: Instant::now(),
- is_unique: false,
- original: 0,
- recvd: std::collections::HashSet::new(),
- version: crate::dht::now_version(),
- };
- self.storage.store(val);
+ // Reassemble chunks
+ let reassembled =
+ self.queries.get_mut(&nonce).and_then(|q| {
+ q.process_chunk(&sender_id, index, total, data)
+ });
- // §2.3: store on nearest queried
- // node that didn't have the value
- let target = q.target;
- let key = q.key.clone();
- let nearest_without: Option<PeerInfo> = q
- .closest
- .iter()
- .find(|p| {
- q.queried.contains(&p.id) && p.id != sender_id
- })
- .cloned();
+ if let Some(full_value) = reassembled {
+ log::info!(
+ "Reassembled value from {:?}: {} bytes",
+ sender_id,
+ full_value.len()
+ );
- q.process_value(data.clone());
-
- if let Some(peer) = nearest_without {
- let store_msg = crate::msg::StoreMsg {
- id: target,
- from: self.id,
- key,
- value: data,
+ if let Some(q) = self.queries.get_mut(&nonce) {
+ // Cache locally
+ let val = crate::dht::StoredValue {
+ key: q.key.clone(),
+ value: full_value.clone(),
+ id: q.target,
+ source: sender_id,
ttl: 300,
+ stored_at: Instant::now(),
is_unique: false,
+ original: 0,
+ recvd: std::collections::HashSet::new(),
+ version: crate::dht::now_version(),
};
- if let Err(e) = self.send_store(&peer, &store_msg) {
- log::debug!("Republish-on-access failed: {e}");
- } else {
- log::debug!(
- "Republished value to {:?} (nearest without)",
- peer.id,
- );
+ self.storage.store(val);
+
+ // §2.3: store on nearest queried
+ // node that didn't have the value
+ let target = q.target;
+ let key = q.key.clone();
+ let nearest_without: Option<PeerInfo> = q
+ .closest
+ .iter()
+ .find(|p| {
+ q.queried.contains(&p.id)
+ && p.id != sender_id
+ })
+ .cloned();
+
+ q.process_value(full_value.clone());
+
+ if let Some(peer) = nearest_without {
+ let store_msg = crate::msg::StoreMsg {
+ id: target,
+ from: self.id,
+ key,
+ value: full_value,
+ ttl: 300,
+ is_unique: false,
+ };
+ if let Err(e) =
+ self.send_store(&peer, &store_msg)
+ {
+ log::debug!(
+ "Republish-on-access failed: {e}"
+ );
+ } else {
+ log::debug!(
+ "Republished value to {:?} (nearest without)",
+ peer.id,
+ );
+ }
}
}
}