Commit Diff


commit - 08a262b9bd8293d8bae147e81383fb4ed17c728b
commit + 6389bcb607f07c2d203e77f9fe8c858a0a95c58f
blob - 4d0caf0b8bdb1a3d20697232e100da7d1280dbed
blob + 37d5b386e23f12864eb3893d631348affae761dd
--- CHANGELOG.md
+++ CHANGELOG.md
@@ -5,6 +5,25 @@ All notable changes to this project will be documented
 The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/),
 and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
 
+## [0.1.3] - 2026-03-27
+
+### Fixed
+
+- Fix find_value_reply serialization offsets that assumed ID_LEN=20
+  instead of 32, corrupting the NodeId field and flag byte. This caused
+  receivers to reject value replies with "invalid message".
+- Increase UDP recv buffer from 4096 to 65535 to prevent truncation of
+  large value replies (which invalidated Ed25519 signatures).
+
+### Added
+
+- Chunked find_value replies: values larger than 1100 bytes are now
+  split into MTU-safe chunks using the existing index/total fields,
+  avoiding IP fragmentation. Receiver reassembles chunks before
+  processing.
+- Debug logging for signature verification failures and find_value_reply
+  send results.
+
 ## [0.1.2] - 2026-03-25
 
 ### Added
blob - 78efc00c67547bb906e75862b823f5e80ac2a86c
blob + 441a3827bfd482aadc62ecbcfe981a05122eb92f
--- Cargo.toml
+++ Cargo.toml
@@ -1,6 +1,6 @@
 [package]
 name = "tesseras-dht"
-version = "0.1.2"
+version = "0.1.3"
 edition = "2024"
 authors = [ "murilo ijanc' <murilo@ijanc.org>" ]
 categories = ["network-programming"]
blob - 72ec019e6098aeed78af0fd7703d6316d10b5e54
blob + 3928856762d60d6ea12c03baa8c54838ed554481
--- src/dht.rs
+++ src/dht.rs
@@ -344,6 +344,12 @@ pub enum QueryPhase {
 /// returning best-effort results.
 pub const MAX_QUERY_DURATION: Duration = Duration::from_secs(30);
 
+/// Maximum chunk payload size for find_value replies.
+/// Keeps UDP datagrams under typical MTU (~1280) to
+/// avoid IP fragmentation:
+/// HEADER(72) + FIXED(44) + CHUNK + SIG(64) ≤ 1280
+pub const VALUE_CHUNK_SIZE: usize = 1100;
+
 pub struct IterativeQuery {
     pub target: NodeId,
     pub closest: Vec<PeerInfo>,
@@ -361,6 +367,11 @@ pub struct IterativeQuery {
     /// Measures the "depth" of the lookup — useful for
     /// diagnosing network topology and routing efficiency.
     pub hops: u32,
+
+    /// Reassembly buffer for chunked value replies.
+    /// Maps sender NodeId → (total_chunks, received chunks
+    /// indexed by chunk index).
+    pub chunks: HashMap<NodeId, (u16, HashMap<u16, Vec<u8>>)>,
 }
 
 impl IterativeQuery {
@@ -378,6 +389,7 @@ impl IterativeQuery {
             nonce,
             started_at: Instant::now(),
             hops: 0,
+            chunks: HashMap::new(),
         }
     }
 
@@ -395,6 +407,7 @@ impl IterativeQuery {
             nonce,
             started_at: Instant::now(),
             hops: 0,
+            chunks: HashMap::new(),
         }
     }
 
@@ -461,6 +474,48 @@ impl IterativeQuery {
         self.phase = QueryPhase::Done;
     }
 
+    /// Process a chunked value reply. Returns the
+    /// reassembled value when all chunks are received.
+    pub fn process_chunk(
+        &mut self,
+        from: &NodeId,
+        index: u16,
+        total: u16,
+        data: Vec<u8>,
+    ) -> Option<Vec<u8>> {
+        if total == 0 {
+            return None;
+        }
+        // Single chunk — no reassembly needed
+        if total == 1 {
+            return Some(data);
+        }
+        let entry = self
+            .chunks
+            .entry(*from)
+            .or_insert_with(|| (total, HashMap::new()));
+        // Reject mismatched total
+        if entry.0 != total {
+            return None;
+        }
+        entry.1.insert(index, data);
+        if entry.1.len() == total as usize {
+            // Reassemble in order
+            let mut full = Vec::new();
+            for i in 0..total {
+                if let Some(chunk) = entry.1.remove(&i) {
+                    full.extend_from_slice(&chunk);
+                } else {
+                    return None;
+                }
+            }
+            self.chunks.remove(from);
+            Some(full)
+        } else {
+            None
+        }
+    }
+
     /// Mark a peer as timed out.
     pub fn timeout(&mut self, id: &NodeId) {
         self.pending.remove(id);
blob - 1ba3270aa01acc77f7f0cf095038ca0391b640c0
blob + e5714bcdc221d0c7313a4744109e285dd4623fe3
--- src/handlers.rs
+++ src/handlers.rs
@@ -702,41 +702,60 @@ impl Node {
         let values = self.storage.get(&fv.target, &fv.key);
 
         if !values.is_empty() {
-            log::debug!("Found {} value(s) locally", values.len());
-
-            // Send value reply using DHT_FIND_VALUE_REPLY
-            // with DATA_ARE_VALUES flag
             let val = &values[0];
+            let chunk_size = crate::dht::VALUE_CHUNK_SIZE;
+            let num_chunks = val.value.len().div_ceil(chunk_size);
+            let num_chunks = num_chunks.max(1) as u16;
+
+            log::debug!(
+                "Found {} value(s) locally, sending {} chunk(s)",
+                values.len(),
+                num_chunks
+            );
+
             let fixed = msg::FIND_VALUE_REPLY_FIXED;
-            let total = HEADER_SIZE + fixed + val.value.len();
-            let mut rbuf = vec![0u8; total];
-            let rhdr = MsgHeader::new(
-                MsgType::DhtFindValueReply,
-                Self::len16(total),
-                self.id,
-                hdr.src,
-            );
-            if rhdr.write(&mut rbuf).is_ok() {
-                let off = HEADER_SIZE;
-                rbuf[off..off + 4].copy_from_slice(&fv.nonce.to_be_bytes());
-                fv.target
-                    .write_to(&mut rbuf[off + 4..off + 4 + crate::id::ID_LEN]);
-                let f = 4 + crate::id::ID_LEN; // 36
-                rbuf[off + f..off + f + 2].copy_from_slice(&0u16.to_be_bytes()); // index
-                rbuf[off + f + 2..off + f + 4].copy_from_slice(&1u16.to_be_bytes()); // total
-                rbuf[off + f + 4] = crate::wire::DATA_ARE_VALUES;
-                rbuf[off + f + 5] = 0;
-                rbuf[off + f + 6] = 0;
-                rbuf[off + f + 7] = 0;
-                rbuf[HEADER_SIZE + fixed..].copy_from_slice(&val.value);
-                match self.send_signed(&rbuf, from) {
-                    Ok(n) => log::debug!(
-                        "Sent find_value_reply ({} bytes) to {from}",
-                        n
-                    ),
-                    Err(e) => log::warn!(
-                        "Failed to send find_value_reply to {from}: {e}"
-                    ),
+            let f = 4 + crate::id::ID_LEN; // 36
+
+            for i in 0..num_chunks {
+                let start = i as usize * chunk_size;
+                let end = ((i as usize + 1) * chunk_size).min(val.value.len());
+                let chunk = &val.value[start..end];
+
+                let pkt_total = HEADER_SIZE + fixed + chunk.len();
+                let mut rbuf = vec![0u8; pkt_total];
+                let rhdr = MsgHeader::new(
+                    MsgType::DhtFindValueReply,
+                    Self::len16(pkt_total),
+                    self.id,
+                    hdr.src,
+                );
+                if rhdr.write(&mut rbuf).is_ok() {
+                    let off = HEADER_SIZE;
+                    rbuf[off..off + 4].copy_from_slice(&fv.nonce.to_be_bytes());
+                    fv.target.write_to(
+                        &mut rbuf[off + 4..off + 4 + crate::id::ID_LEN],
+                    );
+                    rbuf[off + f..off + f + 2]
+                        .copy_from_slice(&i.to_be_bytes());
+                    rbuf[off + f + 2..off + f + 4]
+                        .copy_from_slice(&num_chunks.to_be_bytes());
+                    rbuf[off + f + 4] = crate::wire::DATA_ARE_VALUES;
+                    rbuf[off + f + 5] = 0;
+                    rbuf[off + f + 6] = 0;
+                    rbuf[off + f + 7] = 0;
+                    rbuf[HEADER_SIZE + fixed..].copy_from_slice(chunk);
+                    match self.send_signed(&rbuf, from) {
+                        Ok(n) => log::debug!(
+                            "Sent chunk {}/{} ({n} bytes) to {from}",
+                            i + 1,
+                            num_chunks
+                        ),
+                        Err(e) => log::warn!(
+                            "Failed to send chunk {}/{} to {from}: {e}",
+                            i + 1,
+                            num_chunks
+                        ),
+                    }
                 }
             }
         } else {
@@ -823,64 +842,81 @@ impl Node {
             .map(|(n, _)| *n);
 
         match reply.data {
-            msg::FindValueReplyData::Value { data, .. } => {
-                log::info!(
-                    "Received value from {:?}: {} bytes",
+            msg::FindValueReplyData::Value { index, total, data } => {
+                log::debug!(
+                    "Received chunk {}/{} from {:?}: {} bytes",
+                    index + 1,
+                    total,
                     sender_id,
                     data.len()
                 );
 
-                // Cache the value locally and republish
-                // to the closest queried node without it
-                // (§2.3: cache on nearest without value)
                 if let Some(nonce) = matching_nonce {
-                    if let Some(q) = self.queries.get_mut(&nonce) {
-                        // Store in local storage for
-                        // subsequent get() calls
-                        let val = crate::dht::StoredValue {
-                            key: q.key.clone(),
-                            value: data.clone(),
-                            id: q.target,
-                            source: sender_id,
-                            ttl: 300,
-                            stored_at: Instant::now(),
-                            is_unique: false,
-                            original: 0,
-                            recvd: std::collections::HashSet::new(),
-                            version: crate::dht::now_version(),
-                        };
-                        self.storage.store(val);
+                    // Reassemble chunks
+                    let reassembled =
+                        self.queries.get_mut(&nonce).and_then(|q| {
+                            q.process_chunk(&sender_id, index, total, data)
+                        });
 
-                        // §2.3: store on nearest queried
-                        // node that didn't have the value
-                        let target = q.target;
-                        let key = q.key.clone();
-                        let nearest_without: Option<PeerInfo> = q
-                            .closest
-                            .iter()
-                            .find(|p| {
-                                q.queried.contains(&p.id) && p.id != sender_id
-                            })
-                            .cloned();
+                    if let Some(full_value) = reassembled {
+                        log::info!(
+                            "Reassembled value from {:?}: {} bytes",
+                            sender_id,
+                            full_value.len()
+                        );
 
-                        q.process_value(data.clone());
-
-                        if let Some(peer) = nearest_without {
-                            let store_msg = crate::msg::StoreMsg {
-                                id: target,
-                                from: self.id,
-                                key,
-                                value: data,
+                        if let Some(q) = self.queries.get_mut(&nonce) {
+                            // Cache locally
+                            let val = crate::dht::StoredValue {
+                                key: q.key.clone(),
+                                value: full_value.clone(),
+                                id: q.target,
+                                source: sender_id,
                                 ttl: 300,
+                                stored_at: Instant::now(),
                                 is_unique: false,
+                                original: 0,
+                                recvd: std::collections::HashSet::new(),
+                                version: crate::dht::now_version(),
                             };
-                            if let Err(e) = self.send_store(&peer, &store_msg) {
-                                log::debug!("Republish-on-access failed: {e}");
-                            } else {
-                                log::debug!(
-                                    "Republished value to {:?} (nearest without)",
-                                    peer.id,
-                                );
+                            self.storage.store(val);
+
+                            // §2.3: store on nearest queried
+                            // node that didn't have the value
+                            let target = q.target;
+                            let key = q.key.clone();
+                            let nearest_without: Option<PeerInfo> = q
+                                .closest
+                                .iter()
+                                .find(|p| {
+                                    q.queried.contains(&p.id)
+                                        && p.id != sender_id
+                                })
+                                .cloned();
+
+                            q.process_value(full_value.clone());
+
+                            if let Some(peer) = nearest_without {
+                                let store_msg = crate::msg::StoreMsg {
+                                    id: target,
+                                    from: self.id,
+                                    key,
+                                    value: full_value,
+                                    ttl: 300,
+                                    is_unique: false,
+                                };
+                                if let Err(e) =
+                                    self.send_store(&peer, &store_msg)
+                                {
+                                    log::debug!(
+                                        "Republish-on-access failed: {e}"
+                                    );
+                                } else {
+                                    log::debug!(
+                                        "Republished value to {:?} (nearest without)",
+                                        peer.id,
+                                    );
+                                }
                             }
                         }
                     }