Commit Diff


commit - 15c4eb2a119a29e3d1fc610bbd76e0e1a3f3db46
commit + 08f93dead1349baf90feec44722032cee6e6fc42
blob - 37d5b386e23f12864eb3893d631348affae761dd
blob + 2c4dc3cd750579f5a57d5353a107bab6ac3c6b4c
--- CHANGELOG.md
+++ CHANGELOG.md
@@ -5,6 +5,31 @@ All notable changes to this project will be documented
 The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/),
 and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
 
+## [0.1.4] - 2026-03-27
+
+### Fixed
+
+- Fix iterative query flood: `timeout()` now marks timed-out peers as
+  queried, preventing `next_to_query()` from re-selecting them every
+  poll cycle. This caused find_node/find_value storms that led to
+  spurious bans.
+- Fix ping flood: `probe_liveness()` and `check_node_activity()` now
+  skip peers that already have a pending ping in flight.
+- Increase ban threshold from 3 to 10 failures to tolerate NAT rebinds
+  and UDP packet loss without banning legitimate peers.
+- Handle NAT rebind gracefully: when a known peer sends from a new port,
+  clear failures and bans on the old address before updating.
+
+### Added
+
+- `set_delete_callback()`: application-level callback invoked when a
+  remote delete (store TTL=0) is received. Only the original source
+  (owner) can delete values via `remove_by_owner()`.
+- `remove_by_owner()` on `DhtStorage`: only removes values where the
+  requester matches the stored source, preventing unauthorized deletes.
+- TTL protection: non-owner stores can only extend TTL, never reduce it.
+- `lookups_started` metric is now properly incremented.
+
 ## [0.1.3] - 2026-03-27
 
 ### Fixed
blob - 104f8eba06ec37ff6c7989231959d66af32a7f5c
blob + ba17db8fdaf17961ebeebefb6a245ad81aff8d89
--- Cargo.lock
+++ Cargo.lock
@@ -474,7 +474,7 @@ dependencies = [
 
 [[package]]
 name = "tesseras-dht"
-version = "0.1.3"
+version = "0.1.4"
 dependencies = [
  "ed25519-dalek",
  "env_logger",
blob - 441a3827bfd482aadc62ecbcfe981a05122eb92f
blob + d438b981b1f3e99540ac4d3a493eb6e580ff187b
--- Cargo.toml
+++ Cargo.toml
@@ -1,6 +1,6 @@
 [package]
 name = "tesseras-dht"
-version = "0.1.3"
+version = "0.1.4"
 edition = "2024"
 authors = [ "murilo ijanc' <murilo@ijanc.org>" ]
 categories = ["network-programming"]
blob - 482724296d5585e96ad451f1537d45e7c8b1baf9
blob + 1ee3a613bc03eaf2186e29eb7494f4fd2417eec5
--- src/banlist.rs
+++ src/banlist.rs
@@ -9,7 +9,8 @@ use std::net::SocketAddr;
 use std::time::{Duration, Instant};
 
 /// Default number of failures before banning a peer.
-const DEFAULT_BAN_THRESHOLD: u32 = 3;
+/// High enough to tolerate NAT rebinds and UDP loss.
+const DEFAULT_BAN_THRESHOLD: u32 = 10;
 
 /// Default ban duration (3 hours, matching gonode).
 const DEFAULT_BAN_DURATION: Duration = Duration::from_secs(3 * 3600);
@@ -145,6 +146,7 @@ mod tests {
     #[test]
     fn ban_after_threshold() {
         let mut bl = BanList::new();
+        bl.set_threshold(3);
         let a = addr(1000);
         assert!(!bl.record_failure(a));
         assert!(!bl.record_failure(a));
@@ -167,6 +169,7 @@ mod tests {
     #[test]
     fn ban_expires() {
         let mut bl = BanList::new();
+        bl.set_threshold(3);
         bl.set_ban_duration(Duration::from_millis(1));
         let a = addr(1000);
         bl.record_failure(a);
@@ -180,6 +183,7 @@ mod tests {
     #[test]
     fn cleanup_removes_expired() {
         let mut bl = BanList::new();
+        bl.set_threshold(3);
         bl.set_ban_duration(Duration::from_millis(1));
         let a = addr(1000);
         bl.record_failure(a);
@@ -202,6 +206,7 @@ mod tests {
     #[test]
     fn independent_peers() {
         let mut bl = BanList::new();
+        bl.set_threshold(3);
         let a = addr(1000);
         let b = addr(2000);
         bl.record_failure(a);
blob - 3928856762d60d6ea12c03baa8c54838ed554481
blob + 793eb9c14ab0703818fdfa293e7d7f6e1e56a6cf
--- src/dht.rs
+++ src/dht.rs
@@ -196,9 +196,13 @@ impl DhtStorage {
                 );
                 return;
             }
-            entry[pos].ttl = val.ttl;
-            entry[pos].stored_at = val.stored_at;
-            entry[pos].version = val.version;
+            // Only the original source can reduce TTL;
+            // other nodes can only extend it.
+            if val.source == entry[pos].source || val.ttl >= entry[pos].ttl {
+                entry[pos].ttl = val.ttl;
+                entry[pos].stored_at = val.stored_at;
+                entry[pos].version = val.version;
+            }
         } else {
             // Don't add if existing data is unique
             if entry.len() == 1 && entry[0].is_unique {
@@ -221,6 +225,34 @@ impl DhtStorage {
         }
     }
 
+    /// Remove values only if the requester is the
+    /// original source. Returns true if any were removed.
+    pub fn remove_by_owner(
+        &mut self,
+        id: &NodeId,
+        raw_key: &[u8],
+        owner: &NodeId,
+    ) -> bool {
+        let key = StorageKey {
+            raw: raw_key.to_vec(),
+        };
+        let inner = match self.data.get_mut(id) {
+            Some(i) => i,
+            None => return false,
+        };
+        let had_values = inner.get(&key).is_some_and(|v| !v.is_empty());
+        if let Some(vals) = inner.get_mut(&key) {
+            vals.retain(|v| v.source != *owner);
+            if vals.is_empty() {
+                inner.remove(&key);
+            }
+        }
+        if inner.is_empty() {
+            self.data.remove(id);
+        }
+        had_values
+    }
+
     /// Get all values for a target ID and key.
     pub fn get(&self, id: &NodeId, raw_key: &[u8]) -> Vec<StoredValue> {
         let key = StorageKey {
@@ -519,6 +551,7 @@ impl IterativeQuery {
     /// Mark a peer as timed out.
     pub fn timeout(&mut self, id: &NodeId) {
         self.pending.remove(id);
+        self.queried.insert(*id);
         if self.pending.is_empty() && self.next_to_query().is_empty() {
             self.phase = QueryPhase::Done;
         }
blob - e5714bcdc221d0c7313a4744109e285dd4623fe3
blob + 133a7a192b029ab729c22d2795822515267af1c5
--- src/handlers.rs
+++ src/handlers.rs
@@ -66,6 +66,15 @@ impl Node {
             hdr.src
         );
 
+        // If we know this peer at a different address
+        // (NAT rebind), clear failures/ban on the old
+        // address before updating.
+        if let Some(known) = self.peers.get(&hdr.src) {
+            if known.addr != from {
+                self.ban_list.unban(&known.addr);
+            }
+        }
+
         // Register sender as peer and record successful
         // communication (clears ban list failures)
         self.peers.add(PeerInfo::new(hdr.src, from));
@@ -657,7 +666,23 @@ impl Node {
         self.dht_table.add(PeerInfo::new(hdr.src, from));
 
         if store.ttl == 0 {
-            self.storage.remove(&store.id, &store.key);
+            if self
+                .storage
+                .remove_by_owner(&store.id, &store.key, &store.from)
+            {
+                log::debug!(
+                    "Delete from {:?}: authorized (owner match)",
+                    store.from
+                );
+                if let Some(cb) = &self.delete_callback {
+                    cb(&store.key);
+                }
+            } else {
+                log::debug!(
+                    "Delete from {:?}: rejected (not owner)",
+                    store.from
+                );
+            }
         } else {
             let val = crate::dht::StoredValue {
                 key: store.key,
blob - 2f43775cdc914a772bfcefab4c32fa24b04f2ccf
blob + f360ffdd416ceadadd8cfffd8dca7c0f7f640719
--- src/net.rs
+++ src/net.rs
@@ -141,6 +141,9 @@ impl Node {
         self.send_query_batch(nonce)?;
 
         self.queries.insert(nonce, query);
+        self.metrics
+            .lookups_started
+            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
         Ok(nonce)
     }
 
@@ -325,11 +328,19 @@ impl Node {
             return;
         }
 
+        // Collect IDs that already have a pending ping
+        let pending_ids: std::collections::HashSet<crate::id::NodeId> =
+            self.pending_pings.values().map(|(id, _)| *id).collect();
+
         for peer in &lru_peers {
             // Skip if we've seen them recently
             if peer.last_seen.elapsed() < Duration::from_secs(30) {
                 continue;
             }
+            // Skip if a ping is already in flight
+            if pending_ids.contains(&peer.id) {
+                continue;
+            }
             let nonce = self.alloc_nonce();
             let size = msg::PING_MSG_SIZE;
             let mut buf = [0u8; msg::PING_MSG_SIZE];
@@ -583,6 +594,9 @@ impl Node {
             return;
         }
 
+        let pending_ids: std::collections::HashSet<crate::id::NodeId> =
+            self.pending_pings.values().map(|(id, _)| *id).collect();
+
         let mut pinged = 0u32;
         for peer in &lru_peers {
             // Only ping peers not seen for >60s
@@ -593,6 +607,10 @@ impl Node {
             if self.ban_list.is_banned(&peer.addr) {
                 continue;
             }
+            // Skip if a ping is already in flight
+            if pending_ids.contains(&peer.id) {
+                continue;
+            }
 
             let nonce = self.alloc_nonce();
             let size = crate::msg::PING_MSG_SIZE;
blob - fe347f36129ee1beceba8f6645e7160c0d07ccc5
blob + 4dca6a0ec1418f96fc37ed38b2464c8fd3c0bb88
--- src/node.rs
+++ src/node.rs
@@ -32,6 +32,7 @@ const DEFAULT_POLL_TIMEOUT: Duration = Duration::from_
 type DgramCallback = Box<dyn Fn(&[u8], &NodeId) + Send>;
 type RdpCallback =
     Box<dyn Fn(i32, &crate::rdp::RdpAddr, crate::rdp::RdpEvent) + Send>;
+type DeleteCallback = Box<dyn Fn(&[u8]) + Send>;
 
 /// The tesseras-dht node.
 ///
@@ -58,6 +59,7 @@ pub struct Node {
     pub(crate) is_dtun: bool,
     pub(crate) dgram_callback: Option<DgramCallback>,
     pub(crate) rdp_callback: Option<RdpCallback>,
+    pub(crate) delete_callback: Option<DeleteCallback>,
 
     /// Active iterative queries keyed by nonce.
     pub(crate) queries: HashMap<u32, IterativeQuery>,
@@ -247,6 +249,7 @@ impl Node {
             is_dtun: true,
             dgram_callback: None,
             rdp_callback: None,
+            delete_callback: None,
             queries: HashMap::new(),
             last_refresh: Instant::now(),
             last_restore: Instant::now(),
@@ -357,8 +360,8 @@ impl Node {
     pub fn put(&mut self, key: &[u8], value: &[u8], ttl: u16, is_unique: bool) {
         let target_id = NodeId::from_key(key);
         log::debug!(
-            "put: key={} target={}",
-            String::from_utf8_lossy(key),
+            "put: key={:02x?} target={}",
+            &key[..key.len().min(8)],
             target_id
         );
 
@@ -671,6 +674,9 @@ impl Node {
         // Send initial batch
         self.send_query_batch(nonce)?;
 
+        self.metrics
+            .lookups_started
+            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
         Ok(nonce)
     }
 
@@ -805,6 +811,16 @@ impl Node {
         self.rdp_callback = None;
     }
 
+    /// Set a callback invoked when a remote delete
+    /// (store TTL=0) arrives. The callback receives the
+    /// raw key bytes.
+    pub fn set_delete_callback<F>(&mut self, f: F)
+    where
+        F: Fn(&[u8]) + Send + 'static,
+    {
+        self.delete_callback = Some(Box::new(f));
+    }
+
     // ── RDP (reliable transport) ────────────────────
 
     /// Listen for RDP connections on `port`.
blob - 8bfe66d68d155a0f10e85574ede7e5e217c0b768
blob + 6efddafda2b8509bbfd0327a285ab2007af663c3
--- tests/integration.rs
+++ tests/integration.rs
@@ -512,6 +512,7 @@ fn ban_list_initially_empty() {
 fn ban_list_unit() {
     use tesseras_dht::banlist::BanList;
     let mut bl = BanList::new();
+    bl.set_threshold(3);
     let addr: std::net::SocketAddr = "127.0.0.1:9999".parse().unwrap();
 
     assert!(!bl.is_banned(&addr));