feat(net): Phase 7 — dual-socket IPv4+IPv6 ICE

Adds a dedicated IPv6 QUIC endpoint (IPV6_V6ONLY=1 via socket2)
alongside the existing IPv4 signal endpoint for proper dual-stack
P2P connectivity. Previous [::]:0 dual-stack attempt broke IPv4
on Android; this uses separate sockets per address family like
WebRTC/libwebrtc.

- create_ipv6_endpoint(): socket2-based IPv6-only UDP socket,
  tries same port as IPv4 signal EP, falls back to ephemeral
- local_host_candidates(v4_port, v6_port): now gathers IPv6
  global-unicast (2000::/3) and unique-local (fc00::/7) addrs
- dual_path::race(): A-role accepts on both v4+v6 via select!,
  D-role routes each candidate to matching-AF endpoint
- Graceful fallback: if IPv6 unavailable, .ok() → None → pure
  IPv4 behavior identical to pre-Phase-7

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Siavash Sameni
2026-04-12 11:54:13 +04:00
parent aee41a638d
commit c2d298beb5
8 changed files with 224 additions and 65 deletions

19
Cargo.lock generated
View File

@@ -2566,7 +2566,7 @@ dependencies = [
"libc", "libc",
"percent-encoding", "percent-encoding",
"pin-project-lite", "pin-project-lite",
"socket2", "socket2 0.6.3",
"system-configuration", "system-configuration",
"tokio", "tokio",
"tower-service", "tower-service",
@@ -4240,7 +4240,7 @@ dependencies = [
"quinn-udp", "quinn-udp",
"rustc-hash", "rustc-hash",
"rustls", "rustls",
"socket2", "socket2 0.6.3",
"thiserror 2.0.18", "thiserror 2.0.18",
"tokio", "tokio",
"tracing", "tracing",
@@ -4279,7 +4279,7 @@ dependencies = [
"cfg_aliases", "cfg_aliases",
"libc", "libc",
"once_cell", "once_cell",
"socket2", "socket2 0.6.3",
"tracing", "tracing",
"windows-sys 0.60.2", "windows-sys 0.60.2",
] ]
@@ -5241,6 +5241,16 @@ version = "1.15.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "67b1b7a3b5fe4f1376887184045fcf45c69e92af734b7aaddc05fb777b6fbd03" checksum = "67b1b7a3b5fe4f1376887184045fcf45c69e92af734b7aaddc05fb777b6fbd03"
[[package]]
name = "socket2"
version = "0.5.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e22376abed350d73dd1cd119b57ffccad95b4e585a7cda43e286245ce23c0678"
dependencies = [
"libc",
"windows-sys 0.52.0",
]
[[package]] [[package]]
name = "socket2" name = "socket2"
version = "0.6.3" version = "0.6.3"
@@ -6000,7 +6010,7 @@ dependencies = [
"parking_lot", "parking_lot",
"pin-project-lite", "pin-project-lite",
"signal-hook-registry", "signal-hook-registry",
"socket2", "socket2 0.6.3",
"tokio-macros", "tokio-macros",
"windows-sys 0.61.2", "windows-sys 0.61.2",
] ]
@@ -7810,6 +7820,7 @@ dependencies = [
"rustls", "rustls",
"serde_json", "serde_json",
"sha2", "sha2",
"socket2 0.5.10",
"tokio", "tokio",
"tracing", "tracing",
"wzp-proto", "wzp-proto",

View File

@@ -32,6 +32,7 @@ serde = { version = "1", features = ["derive"] }
# Transport # Transport
quinn = "0.11" quinn = "0.11"
socket2 = "0.5"
# FEC # FEC
raptorq = "2" raptorq = "2"

View File

@@ -131,6 +131,11 @@ pub async fn race(
// When `None`, falls back to fresh endpoints per role. // When `None`, falls back to fresh endpoints per role.
// Used by tests. // Used by tests.
shared_endpoint: Option<wzp_transport::Endpoint>, shared_endpoint: Option<wzp_transport::Endpoint>,
// Phase 7: dedicated IPv6 endpoint with IPV6_V6ONLY=1.
// When `Some`, A-role accepts on both v4+v6, D-role routes
// each candidate to its matching-AF endpoint. When `None`,
// IPv6 candidates are skipped (IPv4-only, pre-Phase-7).
ipv6_endpoint: Option<wzp_transport::Endpoint>,
) -> anyhow::Result<RaceResult> { ) -> anyhow::Result<RaceResult> {
// Rustls provider must be installed before any quinn endpoint // Rustls provider must be installed before any quinn endpoint
// is created. Install attempt is idempotent. // is created. Install attempt is idempotent.
@@ -187,18 +192,33 @@ pub async fn race(
} }
}; };
let ep_for_fut = ep.clone(); let ep_for_fut = ep.clone();
let v6_ep_for_accept = ipv6_endpoint.clone();
direct_fut = Box::pin(async move { direct_fut = Box::pin(async move {
// `wzp_transport::accept` wraps the same // Phase 7: accept on both IPv4 and IPv6 endpoints.
// `endpoint.accept().await?.await?` dance we want. // First incoming connection on either wins.
// If `ep_for_fut` is the shared signal endpoint, match v6_ep_for_accept {
// this pulls the NEXT incoming connection — Some(v6_ep) => {
// normally that's the peer's direct-P2P dial. tracing::debug!("dual_path: A-role accepting on both v4 + v6 endpoints");
// Signal recv is done via the signal CONNECTION tokio::select! {
// (accept_bi), not the endpoint, so no conflict. v4 = wzp_transport::accept(&ep_for_fut) => {
let conn = v4.map_err(|e| anyhow::anyhow!("v4 accept: {e}"))?;
tracing::info!("dual_path: A-role accepted on IPv4 endpoint");
Ok(QuinnTransport::new(conn))
}
v6 = wzp_transport::accept(&v6_ep) => {
let conn = v6.map_err(|e| anyhow::anyhow!("v6 accept: {e}"))?;
tracing::info!("dual_path: A-role accepted on IPv6 endpoint");
Ok(QuinnTransport::new(conn))
}
}
}
None => {
let conn = wzp_transport::accept(&ep_for_fut) let conn = wzp_transport::accept(&ep_for_fut)
.await .await
.map_err(|e| anyhow::anyhow!("direct accept: {e}"))?; .map_err(|e| anyhow::anyhow!("direct accept: {e}"))?;
Ok(QuinnTransport::new(conn)) Ok(QuinnTransport::new(conn))
}
}
}); });
direct_ep = ep; direct_ep = ep;
} }
@@ -231,6 +251,7 @@ pub async fn race(
} }
}; };
let ep_for_fut = ep.clone(); let ep_for_fut = ep.clone();
let v6_ep_for_dial = ipv6_endpoint.clone();
let dial_order = peer_candidates.dial_order(); let dial_order = peer_candidates.dial_order();
let sni = call_sni.clone(); let sni = call_sni.clone();
direct_fut = Box::pin(async move { direct_fut = Box::pin(async move {
@@ -250,10 +271,26 @@ pub async fn race(
// when ALL have failed do we return Err. // when ALL have failed do we return Err.
let mut set = tokio::task::JoinSet::new(); let mut set = tokio::task::JoinSet::new();
for (idx, candidate) in dial_order.iter().enumerate() { for (idx, candidate) in dial_order.iter().enumerate() {
let ep = ep_for_fut.clone(); // Phase 7: route each candidate to the
// endpoint matching its address family.
let candidate = *candidate;
let ep = if candidate.is_ipv6() {
match &v6_ep_for_dial {
Some(v6) => v6.clone(),
None => {
tracing::debug!(
%candidate,
candidate_idx = idx,
"dual_path: skipping IPv6 candidate, no v6 endpoint"
);
continue;
}
}
} else {
ep_for_fut.clone()
};
let client_cfg = wzp_transport::client_config(); let client_cfg = wzp_transport::client_config();
let sni = sni.clone(); let sni = sni.clone();
let candidate = *candidate;
set.spawn(async move { set.spawn(async move {
let result = wzp_transport::connect( let result = wzp_transport::connect(
&ep, &ep,
@@ -474,7 +511,7 @@ pub async fn race(
return Err(anyhow::anyhow!("both paths failed: no media transport available")); return Err(anyhow::anyhow!("both paths failed: no media transport available"));
} }
let _ = (direct_ep, relay_ep); let _ = (direct_ep, relay_ep, ipv6_endpoint);
Ok(RaceResult { Ok(RaceResult {
direct_transport: direct_result direct_transport: direct_result

View File

@@ -292,7 +292,7 @@ pub async fn detect_nat_type(
/// Safe to call from any thread; no I/O, no async. The `if-addrs` /// Safe to call from any thread; no I/O, no async. The `if-addrs`
/// crate reads the kernel's interface table via a single /// crate reads the kernel's interface table via a single
/// getifaddrs(3) syscall. /// getifaddrs(3) syscall.
pub fn local_host_candidates(port: u16) -> Vec<SocketAddr> { pub fn local_host_candidates(v4_port: u16, v6_port: Option<u16>) -> Vec<SocketAddr> {
let Ok(ifaces) = if_addrs::get_if_addrs() else { let Ok(ifaces) = if_addrs::get_if_addrs() else {
return Vec::new(); return Vec::new();
}; };
@@ -311,28 +311,35 @@ pub fn local_host_candidates(port: u16) -> Vec<SocketAddr> {
// Skip public v4 because the reflex addr already // Skip public v4 because the reflex addr already
// covers that path. // covers that path.
if v4.is_private() { if v4.is_private() {
out.push(SocketAddr::new(std::net::IpAddr::V4(v4), port)); out.push(SocketAddr::new(std::net::IpAddr::V4(v4), v4_port));
} else if v4.octets()[0] == 100 && (v4.octets()[1] & 0xc0) == 0x40 { } else if v4.octets()[0] == 100 && (v4.octets()[1] & 0xc0) == 0x40 {
// 100.64/10 CGNAT — rare but valid if two // 100.64/10 CGNAT — rare but valid if two
// phones are on the same CGNAT-hairpinned // phones are on the same CGNAT-hairpinned
// carrier LAN (some hotspot setups). // carrier LAN (some hotspot setups).
out.push(SocketAddr::new(std::net::IpAddr::V4(v4), port)); out.push(SocketAddr::new(std::net::IpAddr::V4(v4), v4_port));
} }
} }
std::net::IpAddr::V6(_v6) => { std::net::IpAddr::V6(v6) => {
// IPv6 host candidates are disabled until we add // Phase 7: IPv6 host candidates via dedicated
// a dedicated IPv6 socket alongside the IPv4 one. // IPv6 socket. When v6_port is None, no IPv6
// Android's IPV6_V6ONLY=1 default on some kernels // endpoint exists — skip silently.
// makes [::]:0 dual-stack unreliable — IPv4 dials let Some(port) = v6_port else { continue };
// silently fail. Advertising IPv6 addrs from an if v6.is_loopback() || v6.is_unspecified() {
// IPv4-only socket wastes JoinSet slots and adds continue;
// timeout delays before the working IPv4 candidate }
// gets picked. // fe80::/10 link-local — needs scope ID, not
// // routable across interfaces.
// TODO: Phase 7 — create a second quinn::Endpoint if (v6.segments()[0] & 0xffc0) == 0xfe80 {
// on [::]:0 for IPv6-only dials, run them alongside continue;
// the IPv4 JoinSet. This gives true dual-stack ICE }
// without the v4-mapped-address fragility. // Accept global unicast (2000::/3) and
// unique-local (fc00::/7).
let first_seg = v6.segments()[0];
let is_global = (first_seg & 0xe000) == 0x2000;
let is_ula = (first_seg & 0xfe00) == 0xfc00;
if is_global || is_ula {
out.push(SocketAddr::new(std::net::IpAddr::V6(v6), port));
}
} }
} }
} }

View File

@@ -15,6 +15,7 @@ tracing = { workspace = true }
async-trait = { workspace = true } async-trait = { workspace = true }
serde_json = "1" serde_json = "1"
rustls = { version = "0.23", default-features = false, features = ["ring", "std"] } rustls = { version = "0.23", default-features = false, features = ["ring", "std"] }
socket2 = { workspace = true }
rcgen = "0.13" rcgen = "0.13"
ed25519-dalek = { workspace = true } ed25519-dalek = { workspace = true }
hkdf = { workspace = true } hkdf = { workspace = true }

View File

@@ -39,6 +39,71 @@ pub async fn connect(
Ok(connection) Ok(connection)
} }
/// Create an IPv6-only QUIC endpoint with `IPV6_V6ONLY=1`.
///
/// Tries `[::]:preferred_port` first (same port as the IPv4 signal
/// endpoint — allowed on Linux/Android when the AFs differ and
/// V6ONLY is set). Falls back to `[::]:0` (OS-assigned) if the
/// preferred port is already taken.
///
/// Must be called from within a tokio runtime (quinn needs the
/// async runtime handle for its I/O driver).
pub fn create_ipv6_endpoint(
preferred_port: u16,
server_config: Option<quinn::ServerConfig>,
) -> Result<quinn::Endpoint, TransportError> {
use socket2::{Domain, Protocol, Socket, Type};
use std::net::{Ipv6Addr, SocketAddrV6};
let sock = Socket::new(Domain::IPV6, Type::DGRAM, Some(Protocol::UDP))
.map_err(|e| TransportError::Internal(format!("ipv6 socket: {e}")))?;
// Critical: IPv6-only so this socket never intercepts IPv4.
// On Android some kernels default to V6ONLY=1 anyway, but we
// set it explicitly for cross-platform consistency.
sock.set_only_v6(true)
.map_err(|e| TransportError::Internal(format!("set_only_v6: {e}")))?;
sock.set_reuse_address(true)
.map_err(|e| TransportError::Internal(format!("set_reuse_address: {e}")))?;
// Try the preferred port (same as IPv4 signal endpoint), fall
// back to ephemeral if the OS rejects it.
let bind_addr = SocketAddrV6::new(Ipv6Addr::UNSPECIFIED, preferred_port, 0, 0);
if let Err(e) = sock.bind(&bind_addr.into()) {
if preferred_port != 0 {
tracing::debug!(
preferred_port,
error = %e,
"ipv6 bind to preferred port failed, falling back to ephemeral"
);
let fallback = SocketAddrV6::new(Ipv6Addr::UNSPECIFIED, 0, 0, 0);
sock.bind(&fallback.into())
.map_err(|e| TransportError::Internal(format!("ipv6 bind fallback: {e}")))?;
} else {
return Err(TransportError::Internal(format!("ipv6 bind: {e}")));
}
}
sock.set_nonblocking(true)
.map_err(|e| TransportError::Internal(format!("set_nonblocking: {e}")))?;
let udp_socket: std::net::UdpSocket = sock.into();
let runtime = quinn::default_runtime()
.ok_or_else(|| TransportError::Internal("no async runtime for ipv6 endpoint".into()))?;
let endpoint = quinn::Endpoint::new(
quinn::EndpointConfig::default(),
server_config,
udp_socket,
runtime,
)
.map_err(|e| TransportError::Internal(format!("ipv6 endpoint: {e}")))?;
Ok(endpoint)
}
/// Accept the next incoming connection on an endpoint. /// Accept the next incoming connection on an endpoint.
pub async fn accept(endpoint: &quinn::Endpoint) -> Result<quinn::Connection, TransportError> { pub async fn accept(endpoint: &quinn::Endpoint) -> Result<quinn::Connection, TransportError> {
let incoming = endpoint let incoming = endpoint

View File

@@ -23,7 +23,7 @@ pub mod quic;
pub mod reliable; pub mod reliable;
pub use config::{client_config, server_config, server_config_from_seed, tls_fingerprint}; pub use config::{client_config, server_config, server_config_from_seed, tls_fingerprint};
pub use connection::{accept, connect, create_endpoint}; pub use connection::{accept, connect, create_endpoint, create_ipv6_endpoint};
pub use path_monitor::PathMonitor; pub use path_monitor::PathMonitor;
pub use quic::QuinnTransport; pub use quic::QuinnTransport;
pub use wzp_proto::{MediaTransport, PathQuality, TransportError}; pub use wzp_proto::{MediaTransport, PathQuality, TransportError};

View File

@@ -358,9 +358,9 @@ async fn connect(
// own_reflex_addr, unparseable addrs, equal addrs), we skip // own_reflex_addr, unparseable addrs, equal addrs), we skip
// the race entirely and fall back to the pure-relay path — // the race entirely and fall back to the pure-relay path —
// identical to Phase 0 behavior. // identical to Phase 0 behavior.
let (own_reflex_addr, signal_endpoint_for_race) = { let (own_reflex_addr, signal_endpoint_for_race, ipv6_endpoint_for_race) = {
let sig = state.signal.lock().await; let mut sig = state.signal.lock().await;
(sig.own_reflex_addr.clone(), sig.endpoint.clone()) (sig.own_reflex_addr.clone(), sig.endpoint.clone(), sig.ipv6_endpoint.take())
}; };
let peer_addr_parsed: Option<std::net::SocketAddr> = peer_direct_addr let peer_addr_parsed: Option<std::net::SocketAddr> = peer_direct_addr
.as_deref() .as_deref()
@@ -424,6 +424,7 @@ async fn connect(
room_sni, room_sni,
call_sni, call_sni,
signal_endpoint_for_race.clone(), signal_endpoint_for_race.clone(),
ipv6_endpoint_for_race.clone(),
) )
.await .await
{ {
@@ -765,6 +766,10 @@ struct SignalState {
/// silently drop packets from a second quinn::Endpoint to the same /// silently drop packets from a second quinn::Endpoint to the same
/// relay, so every call after register_signal MUST share this socket. /// relay, so every call after register_signal MUST share this socket.
endpoint: Option<wzp_transport::Endpoint>, endpoint: Option<wzp_transport::Endpoint>,
/// Phase 7: per-call IPv6 endpoint with IPV6_V6ONLY=1 for
/// dual-stack P2P. Created at place_call/answer_call time,
/// consumed by the connect command's dual_path::race.
ipv6_endpoint: Option<wzp_transport::Endpoint>,
fingerprint: String, fingerprint: String,
signal_status: String, signal_status: String,
incoming_call_id: Option<String>, incoming_call_id: Option<String>,
@@ -859,6 +864,7 @@ async fn internal_deregister(
let _ = t.close().await; let _ = t.close().await;
} }
sig.endpoint = None; sig.endpoint = None;
sig.ipv6_endpoint = None;
sig.signal_status = "idle".into(); sig.signal_status = "idle".into();
sig.incoming_call_id = None; sig.incoming_call_id = None;
sig.incoming_caller_fp = None; sig.incoming_caller_fp = None;
@@ -1043,7 +1049,7 @@ fn do_register_signal(
Ok(Some(SignalMessage::Hangup { reason })) => { Ok(Some(SignalMessage::Hangup { reason })) => {
tracing::info!(?reason, "signal: Hangup"); tracing::info!(?reason, "signal: Hangup");
emit_call_debug(&app_clone, "recv:Hangup", serde_json::json!({ "reason": format!("{:?}", reason) })); emit_call_debug(&app_clone, "recv:Hangup", serde_json::json!({ "reason": format!("{:?}", reason) }));
let mut sig = signal_state.lock().await; sig.signal_status = "registered".into(); sig.incoming_call_id = None; let mut sig = signal_state.lock().await; sig.signal_status = "registered".into(); sig.incoming_call_id = None; sig.ipv6_endpoint = None;
let _ = app_clone.emit("signal-event", serde_json::json!({"type":"hangup"})); let _ = app_clone.emit("signal-event", serde_json::json!({"type":"hangup"}));
} }
Ok(Some(SignalMessage::MediaPathReport { call_id, direct_ok, race_winner })) => { Ok(Some(SignalMessage::MediaPathReport { call_id, direct_ok, race_winner })) => {
@@ -1314,21 +1320,37 @@ async fn place_call(
emit_call_debug(&app, "place_call:reflect_query_none", serde_json::json!({})); emit_call_debug(&app, "place_call:reflect_query_none", serde_json::json!({}));
} }
// Phase 5.5: gather LAN host candidates using the signal // Phase 5.5 + 7: gather LAN host candidates. Create a
// endpoint's bound port so incoming dials land on the same // per-call IPv6 endpoint so we can advertise v6 candidates
// socket that's already listening. // with the correct port.
let caller_local_addrs: Vec<String> = { let caller_local_addrs: Vec<String> = {
let sig = state.signal.lock().await; let mut sig = state.signal.lock().await;
sig.endpoint let v4_port = sig.endpoint
.as_ref() .as_ref()
.and_then(|ep| ep.local_addr().ok()) .and_then(|ep| ep.local_addr().ok())
.map(|la| { .map(|la| la.port())
wzp_client::reflect::local_host_candidates(la.port()) .unwrap_or(0);
// Phase 7: create IPv6 endpoint, trying same port as v4
let (sc, _) = wzp_transport::server_config();
let v6_ep = wzp_transport::create_ipv6_endpoint(v4_port, Some(sc)).ok();
let v6_port = v6_ep.as_ref()
.and_then(|ep| ep.local_addr().ok())
.map(|a| a.port());
if let Some(ref ep) = v6_ep {
tracing::info!(
v4_port,
v6_port,
v6_local = ?ep.local_addr().ok(),
"place_call: IPv6 endpoint created for dual-stack P2P"
);
}
sig.ipv6_endpoint = v6_ep;
wzp_client::reflect::local_host_candidates(v4_port, v6_port)
.into_iter() .into_iter()
.map(|a| a.to_string()) .map(|a| a.to_string())
.collect() .collect()
})
.unwrap_or_default()
}; };
emit_call_debug(&app, "place_call:host_candidates", serde_json::json!({ emit_call_debug(&app, "place_call:host_candidates", serde_json::json!({
"local_addrs": caller_local_addrs, "local_addrs": caller_local_addrs,
@@ -1416,22 +1438,37 @@ async fn answer_call(
None None
}; };
// Phase 5.5: gather LAN host candidates (AcceptTrusted only // Phase 5.5 + 7: gather LAN host candidates (AcceptTrusted
// for symmetry with the reflex addr — privacy mode keeps // only — privacy mode keeps LAN addrs hidden).
// LAN addrs hidden too).
let callee_local_addrs: Vec<String> = let callee_local_addrs: Vec<String> =
if accept_mode == wzp_proto::CallAcceptMode::AcceptTrusted { if accept_mode == wzp_proto::CallAcceptMode::AcceptTrusted {
let sig = state.signal.lock().await; let mut sig = state.signal.lock().await;
sig.endpoint let v4_port = sig.endpoint
.as_ref() .as_ref()
.and_then(|ep| ep.local_addr().ok()) .and_then(|ep| ep.local_addr().ok())
.map(|la| { .map(|la| la.port())
wzp_client::reflect::local_host_candidates(la.port()) .unwrap_or(0);
// Phase 7: create IPv6 endpoint
let (sc, _) = wzp_transport::server_config();
let v6_ep = wzp_transport::create_ipv6_endpoint(v4_port, Some(sc)).ok();
let v6_port = v6_ep.as_ref()
.and_then(|ep| ep.local_addr().ok())
.map(|a| a.port());
if let Some(ref ep) = v6_ep {
tracing::info!(
v4_port,
v6_port,
v6_local = ?ep.local_addr().ok(),
"answer_call: IPv6 endpoint created for dual-stack P2P"
);
}
sig.ipv6_endpoint = v6_ep;
wzp_client::reflect::local_host_candidates(v4_port, v6_port)
.into_iter() .into_iter()
.map(|a| a.to_string()) .map(|a| a.to_string())
.collect() .collect()
})
.unwrap_or_default()
} else { } else {
Vec::new() Vec::new()
}; };
@@ -1745,7 +1782,7 @@ pub fn run() {
let state = Arc::new(AppState { let state = Arc::new(AppState {
engine: Mutex::new(None), engine: Mutex::new(None),
signal: Arc::new(Mutex::new(SignalState { signal: Arc::new(Mutex::new(SignalState {
transport: None, endpoint: None, fingerprint: String::new(), signal_status: "idle".into(), transport: None, endpoint: None, ipv6_endpoint: None, fingerprint: String::new(), signal_status: "idle".into(),
incoming_call_id: None, incoming_caller_fp: None, incoming_caller_alias: None, incoming_call_id: None, incoming_caller_fp: None, incoming_caller_alias: None,
pending_reflect: None, pending_reflect: None,
own_reflex_addr: None, own_reflex_addr: None,