commit 08f93dead1349baf90feec44722032cee6e6fc42 from: murilo ijanc date: Fri Mar 27 14:56:51 2026 UTC Bump version to 0.1.4 - Fix query/ping flood from timeout retry loops - Owner-only delete and TTL reduction protection - NAT rebind handling (clear bans on port change) - Ban threshold 3 -> 10 for UDP/NAT tolerance - delete_callback for application-level notification - lookups_started metric commit - 15c4eb2a119a29e3d1fc610bbd76e0e1a3f3db46 commit + 08f93dead1349baf90feec44722032cee6e6fc42 blob - 37d5b386e23f12864eb3893d631348affae761dd blob + 2c4dc3cd750579f5a57d5353a107bab6ac3c6b4c --- CHANGELOG.md +++ CHANGELOG.md @@ -5,6 +5,31 @@ All notable changes to this project will be documented The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [0.1.4] - 2026-03-27 + +### Fixed + +- Fix iterative query flood: `timeout()` now marks timed-out peers as + queried, preventing `next_to_query()` from re-selecting them every + poll cycle. This caused find_node/find_value storms that led to + spurious bans. +- Fix ping flood: `probe_liveness()` and `check_node_activity()` now + skip peers that already have a pending ping in flight. +- Increase ban threshold from 3 to 10 failures to tolerate NAT rebinds + and UDP packet loss without banning legitimate peers. +- Handle NAT rebind gracefully: when a known peer sends from a new port, + clear failures and bans on the old address before updating. + +### Added + +- `set_delete_callback()`: application-level callback invoked when a + remote delete (store TTL=0) is received. Only the original source + (owner) can delete values via `remove_by_owner()`. +- `remove_by_owner()` on `DhtStorage`: only removes values where the + requester matches the stored source, preventing unauthorized deletes. +- TTL protection: non-owner stores can only extend TTL, never reduce it. +- `lookups_started` metric is now properly incremented. + ## [0.1.3] - 2026-03-27 ### Fixed blob - 104f8eba06ec37ff6c7989231959d66af32a7f5c blob + ba17db8fdaf17961ebeebefb6a245ad81aff8d89 --- Cargo.lock +++ Cargo.lock @@ -474,7 +474,7 @@ dependencies = [ [[package]] name = "tesseras-dht" -version = "0.1.3" +version = "0.1.4" dependencies = [ "ed25519-dalek", "env_logger", blob - 441a3827bfd482aadc62ecbcfe981a05122eb92f blob + d438b981b1f3e99540ac4d3a493eb6e580ff187b --- Cargo.toml +++ Cargo.toml @@ -1,6 +1,6 @@ [package] name = "tesseras-dht" -version = "0.1.3" +version = "0.1.4" edition = "2024" authors = [ "murilo ijanc' " ] categories = ["network-programming"] blob - 482724296d5585e96ad451f1537d45e7c8b1baf9 blob + 1ee3a613bc03eaf2186e29eb7494f4fd2417eec5 --- src/banlist.rs +++ src/banlist.rs @@ -9,7 +9,8 @@ use std::net::SocketAddr; use std::time::{Duration, Instant}; /// Default number of failures before banning a peer. -const DEFAULT_BAN_THRESHOLD: u32 = 3; +/// High enough to tolerate NAT rebinds and UDP loss. +const DEFAULT_BAN_THRESHOLD: u32 = 10; /// Default ban duration (3 hours, matching gonode). const DEFAULT_BAN_DURATION: Duration = Duration::from_secs(3 * 3600); @@ -145,6 +146,7 @@ mod tests { #[test] fn ban_after_threshold() { let mut bl = BanList::new(); + bl.set_threshold(3); let a = addr(1000); assert!(!bl.record_failure(a)); assert!(!bl.record_failure(a)); @@ -167,6 +169,7 @@ mod tests { #[test] fn ban_expires() { let mut bl = BanList::new(); + bl.set_threshold(3); bl.set_ban_duration(Duration::from_millis(1)); let a = addr(1000); bl.record_failure(a); @@ -180,6 +183,7 @@ mod tests { #[test] fn cleanup_removes_expired() { let mut bl = BanList::new(); + bl.set_threshold(3); bl.set_ban_duration(Duration::from_millis(1)); let a = addr(1000); bl.record_failure(a); @@ -202,6 +206,7 @@ mod tests { #[test] fn independent_peers() { let mut bl = BanList::new(); + bl.set_threshold(3); let a = addr(1000); let b = addr(2000); bl.record_failure(a); blob - 3928856762d60d6ea12c03baa8c54838ed554481 blob + 793eb9c14ab0703818fdfa293e7d7f6e1e56a6cf --- src/dht.rs +++ src/dht.rs @@ -196,9 +196,13 @@ impl DhtStorage { ); return; } - entry[pos].ttl = val.ttl; - entry[pos].stored_at = val.stored_at; - entry[pos].version = val.version; + // Only the original source can reduce TTL; + // other nodes can only extend it. + if val.source == entry[pos].source || val.ttl >= entry[pos].ttl { + entry[pos].ttl = val.ttl; + entry[pos].stored_at = val.stored_at; + entry[pos].version = val.version; + } } else { // Don't add if existing data is unique if entry.len() == 1 && entry[0].is_unique { @@ -221,6 +225,34 @@ impl DhtStorage { } } + /// Remove values only if the requester is the + /// original source. Returns true if any were removed. + pub fn remove_by_owner( + &mut self, + id: &NodeId, + raw_key: &[u8], + owner: &NodeId, + ) -> bool { + let key = StorageKey { + raw: raw_key.to_vec(), + }; + let inner = match self.data.get_mut(id) { + Some(i) => i, + None => return false, + }; + let had_values = inner.get(&key).is_some_and(|v| !v.is_empty()); + if let Some(vals) = inner.get_mut(&key) { + vals.retain(|v| v.source != *owner); + if vals.is_empty() { + inner.remove(&key); + } + } + if inner.is_empty() { + self.data.remove(id); + } + had_values + } + /// Get all values for a target ID and key. pub fn get(&self, id: &NodeId, raw_key: &[u8]) -> Vec { let key = StorageKey { @@ -519,6 +551,7 @@ impl IterativeQuery { /// Mark a peer as timed out. pub fn timeout(&mut self, id: &NodeId) { self.pending.remove(id); + self.queried.insert(*id); if self.pending.is_empty() && self.next_to_query().is_empty() { self.phase = QueryPhase::Done; } blob - e5714bcdc221d0c7313a4744109e285dd4623fe3 blob + 133a7a192b029ab729c22d2795822515267af1c5 --- src/handlers.rs +++ src/handlers.rs @@ -66,6 +66,15 @@ impl Node { hdr.src ); + // If we know this peer at a different address + // (NAT rebind), clear failures/ban on the old + // address before updating. + if let Some(known) = self.peers.get(&hdr.src) { + if known.addr != from { + self.ban_list.unban(&known.addr); + } + } + // Register sender as peer and record successful // communication (clears ban list failures) self.peers.add(PeerInfo::new(hdr.src, from)); @@ -657,7 +666,23 @@ impl Node { self.dht_table.add(PeerInfo::new(hdr.src, from)); if store.ttl == 0 { - self.storage.remove(&store.id, &store.key); + if self + .storage + .remove_by_owner(&store.id, &store.key, &store.from) + { + log::debug!( + "Delete from {:?}: authorized (owner match)", + store.from + ); + if let Some(cb) = &self.delete_callback { + cb(&store.key); + } + } else { + log::debug!( + "Delete from {:?}: rejected (not owner)", + store.from + ); + } } else { let val = crate::dht::StoredValue { key: store.key, blob - 2f43775cdc914a772bfcefab4c32fa24b04f2ccf blob + f360ffdd416ceadadd8cfffd8dca7c0f7f640719 --- src/net.rs +++ src/net.rs @@ -141,6 +141,9 @@ impl Node { self.send_query_batch(nonce)?; self.queries.insert(nonce, query); + self.metrics + .lookups_started + .fetch_add(1, std::sync::atomic::Ordering::Relaxed); Ok(nonce) } @@ -325,11 +328,19 @@ impl Node { return; } + // Collect IDs that already have a pending ping + let pending_ids: std::collections::HashSet = + self.pending_pings.values().map(|(id, _)| *id).collect(); + for peer in &lru_peers { // Skip if we've seen them recently if peer.last_seen.elapsed() < Duration::from_secs(30) { continue; } + // Skip if a ping is already in flight + if pending_ids.contains(&peer.id) { + continue; + } let nonce = self.alloc_nonce(); let size = msg::PING_MSG_SIZE; let mut buf = [0u8; msg::PING_MSG_SIZE]; @@ -583,6 +594,9 @@ impl Node { return; } + let pending_ids: std::collections::HashSet = + self.pending_pings.values().map(|(id, _)| *id).collect(); + let mut pinged = 0u32; for peer in &lru_peers { // Only ping peers not seen for >60s @@ -593,6 +607,10 @@ impl Node { if self.ban_list.is_banned(&peer.addr) { continue; } + // Skip if a ping is already in flight + if pending_ids.contains(&peer.id) { + continue; + } let nonce = self.alloc_nonce(); let size = crate::msg::PING_MSG_SIZE; blob - fe347f36129ee1beceba8f6645e7160c0d07ccc5 blob + 4dca6a0ec1418f96fc37ed38b2464c8fd3c0bb88 --- src/node.rs +++ src/node.rs @@ -32,6 +32,7 @@ const DEFAULT_POLL_TIMEOUT: Duration = Duration::from_ type DgramCallback = Box; type RdpCallback = Box; +type DeleteCallback = Box; /// The tesseras-dht node. /// @@ -58,6 +59,7 @@ pub struct Node { pub(crate) is_dtun: bool, pub(crate) dgram_callback: Option, pub(crate) rdp_callback: Option, + pub(crate) delete_callback: Option, /// Active iterative queries keyed by nonce. pub(crate) queries: HashMap, @@ -247,6 +249,7 @@ impl Node { is_dtun: true, dgram_callback: None, rdp_callback: None, + delete_callback: None, queries: HashMap::new(), last_refresh: Instant::now(), last_restore: Instant::now(), @@ -357,8 +360,8 @@ impl Node { pub fn put(&mut self, key: &[u8], value: &[u8], ttl: u16, is_unique: bool) { let target_id = NodeId::from_key(key); log::debug!( - "put: key={} target={}", - String::from_utf8_lossy(key), + "put: key={:02x?} target={}", + &key[..key.len().min(8)], target_id ); @@ -671,6 +674,9 @@ impl Node { // Send initial batch self.send_query_batch(nonce)?; + self.metrics + .lookups_started + .fetch_add(1, std::sync::atomic::Ordering::Relaxed); Ok(nonce) } @@ -805,6 +811,16 @@ impl Node { self.rdp_callback = None; } + /// Set a callback invoked when a remote delete + /// (store TTL=0) arrives. The callback receives the + /// raw key bytes. + pub fn set_delete_callback(&mut self, f: F) + where + F: Fn(&[u8]) + Send + 'static, + { + self.delete_callback = Some(Box::new(f)); + } + // ── RDP (reliable transport) ──────────────────── /// Listen for RDP connections on `port`. blob - 8bfe66d68d155a0f10e85574ede7e5e217c0b768 blob + 6efddafda2b8509bbfd0327a285ab2007af663c3 --- tests/integration.rs +++ tests/integration.rs @@ -512,6 +512,7 @@ fn ban_list_initially_empty() { fn ban_list_unit() { use tesseras_dht::banlist::BanList; let mut bl = BanList::new(); + bl.set_threshold(3); let addr: std::net::SocketAddr = "127.0.0.1:9999".parse().unwrap(); assert!(!bl.is_banned(&addr));