commit - 15c4eb2a119a29e3d1fc610bbd76e0e1a3f3db46
commit + 08f93dead1349baf90feec44722032cee6e6fc42
blob - 37d5b386e23f12864eb3893d631348affae761dd
blob + 2c4dc3cd750579f5a57d5353a107bab6ac3c6b4c
--- CHANGELOG.md
+++ CHANGELOG.md
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
+## [0.1.4] - 2026-03-27
+
+### Fixed
+
+- Fix iterative query flood: `timeout()` now marks timed-out peers as
+ queried, preventing `next_to_query()` from re-selecting them every
+ poll cycle. This caused find_node/find_value storms that led to
+ spurious bans.
+- Fix ping flood: `probe_liveness()` and `check_node_activity()` now
+ skip peers that already have a pending ping in flight.
+- Increase ban threshold from 3 to 10 failures to tolerate NAT rebinds
+ and UDP packet loss without banning legitimate peers.
+- Handle NAT rebind gracefully: when a known peer sends from a new port,
+ clear failures and bans on the old address before updating.
+
+### Added
+
+- `set_delete_callback()`: application-level callback invoked when a
+ remote delete (store TTL=0) is received. Only the original source
+ (owner) can delete values via `remove_by_owner()`.
+- `remove_by_owner()` on `DhtStorage`: only removes values where the
+ requester matches the stored source, preventing unauthorized deletes.
+- TTL protection: non-owner stores can only extend TTL, never reduce it.
+- `lookups_started` metric is now properly incremented.
+
## [0.1.3] - 2026-03-27
### Fixed
blob - 104f8eba06ec37ff6c7989231959d66af32a7f5c
blob + ba17db8fdaf17961ebeebefb6a245ad81aff8d89
--- Cargo.lock
+++ Cargo.lock
[[package]]
name = "tesseras-dht"
-version = "0.1.3"
+version = "0.1.4"
dependencies = [
"ed25519-dalek",
"env_logger",
blob - 441a3827bfd482aadc62ecbcfe981a05122eb92f
blob + d438b981b1f3e99540ac4d3a493eb6e580ff187b
--- Cargo.toml
+++ Cargo.toml
[package]
name = "tesseras-dht"
-version = "0.1.3"
+version = "0.1.4"
edition = "2024"
authors = [ "murilo ijanc' <murilo@ijanc.org>" ]
categories = ["network-programming"]
blob - 482724296d5585e96ad451f1537d45e7c8b1baf9
blob + 1ee3a613bc03eaf2186e29eb7494f4fd2417eec5
--- src/banlist.rs
+++ src/banlist.rs
use std::time::{Duration, Instant};
/// Default number of failures before banning a peer.
-const DEFAULT_BAN_THRESHOLD: u32 = 3;
+/// High enough to tolerate NAT rebinds and UDP loss.
+const DEFAULT_BAN_THRESHOLD: u32 = 10;
/// Default ban duration (3 hours, matching gonode).
const DEFAULT_BAN_DURATION: Duration = Duration::from_secs(3 * 3600);
#[test]
fn ban_after_threshold() {
let mut bl = BanList::new();
+ bl.set_threshold(3);
let a = addr(1000);
assert!(!bl.record_failure(a));
assert!(!bl.record_failure(a));
#[test]
fn ban_expires() {
let mut bl = BanList::new();
+ bl.set_threshold(3);
bl.set_ban_duration(Duration::from_millis(1));
let a = addr(1000);
bl.record_failure(a);
#[test]
fn cleanup_removes_expired() {
let mut bl = BanList::new();
+ bl.set_threshold(3);
bl.set_ban_duration(Duration::from_millis(1));
let a = addr(1000);
bl.record_failure(a);
#[test]
fn independent_peers() {
let mut bl = BanList::new();
+ bl.set_threshold(3);
let a = addr(1000);
let b = addr(2000);
bl.record_failure(a);
blob - 3928856762d60d6ea12c03baa8c54838ed554481
blob + 793eb9c14ab0703818fdfa293e7d7f6e1e56a6cf
--- src/dht.rs
+++ src/dht.rs
);
return;
}
- entry[pos].ttl = val.ttl;
- entry[pos].stored_at = val.stored_at;
- entry[pos].version = val.version;
+ // Only the original source can reduce TTL;
+ // other nodes can only extend it.
+ if val.source == entry[pos].source || val.ttl >= entry[pos].ttl {
+ entry[pos].ttl = val.ttl;
+ entry[pos].stored_at = val.stored_at;
+ entry[pos].version = val.version;
+ }
} else {
// Don't add if existing data is unique
if entry.len() == 1 && entry[0].is_unique {
}
}
+ /// Remove values only if the requester is the
+ /// original source. Returns true if any were removed.
+ pub fn remove_by_owner(
+ &mut self,
+ id: &NodeId,
+ raw_key: &[u8],
+ owner: &NodeId,
+ ) -> bool {
+ let key = StorageKey {
+ raw: raw_key.to_vec(),
+ };
+ let inner = match self.data.get_mut(id) {
+ Some(i) => i,
+ None => return false,
+ };
+ let had_values = inner.get(&key).is_some_and(|v| !v.is_empty());
+ if let Some(vals) = inner.get_mut(&key) {
+ vals.retain(|v| v.source != *owner);
+ if vals.is_empty() {
+ inner.remove(&key);
+ }
+ }
+ if inner.is_empty() {
+ self.data.remove(id);
+ }
+ had_values
+ }
+
/// Get all values for a target ID and key.
pub fn get(&self, id: &NodeId, raw_key: &[u8]) -> Vec<StoredValue> {
let key = StorageKey {
/// Mark a peer as timed out.
pub fn timeout(&mut self, id: &NodeId) {
self.pending.remove(id);
+ self.queried.insert(*id);
if self.pending.is_empty() && self.next_to_query().is_empty() {
self.phase = QueryPhase::Done;
}
blob - e5714bcdc221d0c7313a4744109e285dd4623fe3
blob + 133a7a192b029ab729c22d2795822515267af1c5
--- src/handlers.rs
+++ src/handlers.rs
hdr.src
);
+ // If we know this peer at a different address
+ // (NAT rebind), clear failures/ban on the old
+ // address before updating.
+ if let Some(known) = self.peers.get(&hdr.src) {
+ if known.addr != from {
+ self.ban_list.unban(&known.addr);
+ }
+ }
+
// Register sender as peer and record successful
// communication (clears ban list failures)
self.peers.add(PeerInfo::new(hdr.src, from));
self.dht_table.add(PeerInfo::new(hdr.src, from));
if store.ttl == 0 {
- self.storage.remove(&store.id, &store.key);
+ if self
+ .storage
+ .remove_by_owner(&store.id, &store.key, &store.from)
+ {
+ log::debug!(
+ "Delete from {:?}: authorized (owner match)",
+ store.from
+ );
+ if let Some(cb) = &self.delete_callback {
+ cb(&store.key);
+ }
+ } else {
+ log::debug!(
+ "Delete from {:?}: rejected (not owner)",
+ store.from
+ );
+ }
} else {
let val = crate::dht::StoredValue {
key: store.key,
blob - 2f43775cdc914a772bfcefab4c32fa24b04f2ccf
blob + f360ffdd416ceadadd8cfffd8dca7c0f7f640719
--- src/net.rs
+++ src/net.rs
self.send_query_batch(nonce)?;
self.queries.insert(nonce, query);
+ self.metrics
+ .lookups_started
+ .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
Ok(nonce)
}
return;
}
+ // Collect IDs that already have a pending ping
+ let pending_ids: std::collections::HashSet<crate::id::NodeId> =
+ self.pending_pings.values().map(|(id, _)| *id).collect();
+
for peer in &lru_peers {
// Skip if we've seen them recently
if peer.last_seen.elapsed() < Duration::from_secs(30) {
continue;
}
+ // Skip if a ping is already in flight
+ if pending_ids.contains(&peer.id) {
+ continue;
+ }
let nonce = self.alloc_nonce();
let size = msg::PING_MSG_SIZE;
let mut buf = [0u8; msg::PING_MSG_SIZE];
return;
}
+ let pending_ids: std::collections::HashSet<crate::id::NodeId> =
+ self.pending_pings.values().map(|(id, _)| *id).collect();
+
let mut pinged = 0u32;
for peer in &lru_peers {
// Only ping peers not seen for >60s
if self.ban_list.is_banned(&peer.addr) {
continue;
}
+ // Skip if a ping is already in flight
+ if pending_ids.contains(&peer.id) {
+ continue;
+ }
let nonce = self.alloc_nonce();
let size = crate::msg::PING_MSG_SIZE;
blob - fe347f36129ee1beceba8f6645e7160c0d07ccc5
blob + 4dca6a0ec1418f96fc37ed38b2464c8fd3c0bb88
--- src/node.rs
+++ src/node.rs
type DgramCallback = Box<dyn Fn(&[u8], &NodeId) + Send>;
type RdpCallback =
Box<dyn Fn(i32, &crate::rdp::RdpAddr, crate::rdp::RdpEvent) + Send>;
+type DeleteCallback = Box<dyn Fn(&[u8]) + Send>;
/// The tesseras-dht node.
///
pub(crate) is_dtun: bool,
pub(crate) dgram_callback: Option<DgramCallback>,
pub(crate) rdp_callback: Option<RdpCallback>,
+ pub(crate) delete_callback: Option<DeleteCallback>,
/// Active iterative queries keyed by nonce.
pub(crate) queries: HashMap<u32, IterativeQuery>,
is_dtun: true,
dgram_callback: None,
rdp_callback: None,
+ delete_callback: None,
queries: HashMap::new(),
last_refresh: Instant::now(),
last_restore: Instant::now(),
pub fn put(&mut self, key: &[u8], value: &[u8], ttl: u16, is_unique: bool) {
let target_id = NodeId::from_key(key);
log::debug!(
- "put: key={} target={}",
- String::from_utf8_lossy(key),
+ "put: key={:02x?} target={}",
+ &key[..key.len().min(8)],
target_id
);
// Send initial batch
self.send_query_batch(nonce)?;
+ self.metrics
+ .lookups_started
+ .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
Ok(nonce)
}
self.rdp_callback = None;
}
+ /// Set a callback invoked when a remote delete
+ /// (store TTL=0) arrives. The callback receives the
+ /// raw key bytes.
+ pub fn set_delete_callback<F>(&mut self, f: F)
+ where
+ F: Fn(&[u8]) + Send + 'static,
+ {
+ self.delete_callback = Some(Box::new(f));
+ }
+
// ── RDP (reliable transport) ────────────────────
/// Listen for RDP connections on `port`.
blob - 8bfe66d68d155a0f10e85574ede7e5e217c0b768
blob + 6efddafda2b8509bbfd0327a285ab2007af663c3
--- tests/integration.rs
+++ tests/integration.rs
fn ban_list_unit() {
use tesseras_dht::banlist::BanList;
let mut bl = BanList::new();
+ bl.set_threshold(3);
let addr: std::net::SocketAddr = "127.0.0.1:9999".parse().unwrap();
assert!(!bl.is_banned(&addr));