Skip to content

Commit a90b796

Browse files
committed
Spawn blocking thread for DNS resolution
1 parent 9bc6ec2 commit a90b796

File tree

9 files changed

+97
-55
lines changed

9 files changed

+97
-55
lines changed

src/compute.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,7 @@ impl ComputeNode {
179179
.compute_nodes
180180
.get(config.compute_node_idx)
181181
.ok_or(ComputeError::ConfigError("Invalid compute index"))?;
182-
let addr = create_socket_addr(&raw_addr.address).or_else(|_| {
182+
let addr = create_socket_addr(&raw_addr.address).await.or_else(|_| {
183183
Err(ComputeError::ConfigError(
184184
"Invalid compute node address in config file",
185185
))
@@ -189,7 +189,7 @@ impl ComputeNode {
189189
.storage_nodes
190190
.get(config.compute_node_idx)
191191
.ok_or(ComputeError::ConfigError("Invalid storage index"))?;
192-
let storage_addr = create_socket_addr(&raw_storage_addr.address).or_else(|_| {
192+
let storage_addr = create_socket_addr(&raw_storage_addr.address).await.or_else(|_| {
193193
Err(ComputeError::ConfigError(
194194
"Invalid storage node address in config file",
195195
))

src/compute_raft.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -247,7 +247,7 @@ impl ComputeRaft {
247247
.collect::<Vec<String>>();
248248
let raft_active = ActiveRaft::new(
249249
config.compute_node_idx,
250-
&create_socket_addr_for_list(&raw_node_ips).unwrap_or_default(),
250+
&create_socket_addr_for_list(&raw_node_ips).await.unwrap_or_default(),
251251
use_raft,
252252
Duration::from_millis(config.compute_raft_tick_timeout as u64),
253253
db_utils::new_db(config.compute_db_mode, &DB_SPEC, raft_db, None),
@@ -1887,7 +1887,7 @@ mod test {
18871887
}
18881888

18891889
async fn new_test_node(seed_utxo: &[&str]) -> ComputeRaft {
1890-
let compute_node = create_socket_addr("0.0.0.0").unwrap();
1890+
let compute_node = create_socket_addr("0.0.0.0").await.unwrap();
18911891
let tx_out = TxOutSpec {
18921892
public_key: "5371832122a8e804fa3520ec6861c3fa554a7f6fb617e6f0768452090207e07c"
18931893
.to_owned(),

src/miner.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,7 @@ impl MinerNode {
180180
.compute_nodes
181181
.get(config.miner_compute_node_idx)
182182
.ok_or(MinerError::ConfigError("Invalid compute index"))?;
183-
let compute_addr = create_socket_addr(&raw_compute_addr.address).or_else(|_| {
183+
let compute_addr = create_socket_addr(&raw_compute_addr.address).await.or_else(|_| {
184184
Err(MinerError::ConfigError(
185185
"Invalid compute node address in config file",
186186
))
@@ -198,7 +198,7 @@ impl MinerNode {
198198
extra.custom_wallet_spec,
199199
)?;
200200
let disable_tcp_listener = extra.disable_tcp_listener;
201-
let tls_addr = create_socket_addr(&addr).unwrap();
201+
let tls_addr = create_socket_addr(&addr).await.unwrap();
202202
let tcp_tls_config = TcpTlsConfig::from_tls_spec(tls_addr, &config.tls_config)?;
203203
let api_addr = SocketAddr::new(tls_addr.ip(), config.miner_api_port);
204204
let api_tls_info = config

src/pre_launch.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -86,14 +86,14 @@ struct PreLaunchNodeConfigSelected {
8686
}
8787

8888
impl PreLaunchNodeConfigSelected {
89-
fn new(config: PreLaunchNodeConfig) -> Self {
89+
async fn new(config: PreLaunchNodeConfig) -> Self {
9090
match config.node_type {
9191
PreLaunchNodeType::Compute => Self {
9292
pre_launch_node_idx: config.compute_node_idx,
9393
pre_launch_db_mode: config.compute_db_mode,
9494
tls_config: config.tls_config,
9595
pre_launch_nodes: create_socket_addr_for_list(&config.compute_nodes)
96-
.unwrap_or_default(),
96+
.await.unwrap_or_default(),
9797
db_spec: crate::compute::DB_SPEC,
9898
raft_db_spec: crate::compute_raft::DB_SPEC,
9999
peer_limit: config.peer_limit,
@@ -103,7 +103,7 @@ impl PreLaunchNodeConfigSelected {
103103
pre_launch_db_mode: config.storage_db_mode,
104104
tls_config: config.tls_config,
105105
pre_launch_nodes: create_socket_addr_for_list(&config.storage_nodes)
106-
.unwrap_or_default(),
106+
.await.unwrap_or_default(),
107107
db_spec: crate::storage::DB_SPEC,
108108
raft_db_spec: crate::storage_raft::DB_SPEC,
109109
peer_limit: config.peer_limit,
@@ -135,7 +135,7 @@ impl PreLaunchNode {
135135
config: PreLaunchNodeConfig,
136136
mut extra: ExtraNodeParams,
137137
) -> Result<PreLaunchNode> {
138-
let config = PreLaunchNodeConfigSelected::new(config);
138+
let config = PreLaunchNodeConfigSelected::new(config).await;
139139
let addr = config
140140
.pre_launch_nodes
141141
.get(config.pre_launch_node_idx)

src/storage.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ impl StorageNode {
156156
.storage_nodes
157157
.get(config.storage_node_idx)
158158
.ok_or(StorageError::ConfigError("Invalid storage index"))?;
159-
let addr = create_socket_addr(&raw_addr.address).or_else(|_| {
159+
let addr = create_socket_addr(&raw_addr.address).await.or_else(|_| {
160160
Err(StorageError::ConfigError(
161161
"Invalid storage address supplied",
162162
))
@@ -166,7 +166,7 @@ impl StorageNode {
166166
.compute_nodes
167167
.get(config.storage_node_idx)
168168
.ok_or(StorageError::ConfigError("Invalid compute index"))?;
169-
let compute_addr = create_socket_addr(&raw_compute_addr.address).or_else(|_| {
169+
let compute_addr = create_socket_addr(&raw_compute_addr.address).await.or_else(|_| {
170170
Err(StorageError::ConfigError(
171171
"Invalid compute address supplied",
172172
))
@@ -180,8 +180,8 @@ impl StorageNode {
180180
let api_keys = to_api_keys(config.api_keys.clone());
181181

182182
let node = Node::new(&tcp_tls_config, config.peer_limit, NodeType::Storage, false).await?;
183-
let node_raft = StorageRaft::new(&config, extra.raft_db.take());
184-
let catchup_fetch = StorageFetch::new(&config, addr);
183+
let node_raft = StorageRaft::new(&config, extra.raft_db.take()).await;
184+
let catchup_fetch = StorageFetch::new(&config, addr).await;
185185
let api_pow_info = to_route_pow_infos(config.routes_pow.clone());
186186

187187
if config.backup_restore.unwrap_or(false) {

src/storage_fetch.rs

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -139,15 +139,26 @@ pub struct StorageFetch {
139139
to_receive: Option<FetchReceive>,
140140
}
141141

142+
143+
142144
impl StorageFetch {
143145
/// Initialize with database info
144-
pub fn new(config: &StorageNodeConfig, addr: SocketAddr) -> Self {
146+
pub async fn new(config: &StorageNodeConfig, addr: SocketAddr) -> Self {
145147
let timeout_duration = Duration::from_millis(config.storage_catchup_duration as u64);
146-
let storage_nodes = config
148+
let storage_nodes_filtered = config
147149
.storage_nodes
148150
.iter()
149-
.map(|s| create_socket_addr(&s.address).unwrap());
150-
let storage_nodes = storage_nodes.filter(|a| a != &addr).collect();
151+
.filter(|v| v.address.clone() != addr.to_string());
152+
153+
let mut storage_nodes = Vec::new();
154+
155+
for node in storage_nodes_filtered {
156+
let socket_addr = create_socket_addr(&node.address);
157+
if let Ok(socket) = socket_addr.await {
158+
storage_nodes.push(socket);
159+
}
160+
}
161+
151162
Self {
152163
timeout_duration,
153164
storage_nodes,

src/storage_raft.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ impl StorageRaft {
113113
///
114114
/// * `config` - Configuration option for a storage node.
115115
/// * `raft_db` - Override raft db to use.
116-
pub fn new(config: &StorageNodeConfig, raft_db: Option<SimpleDb>) -> Self {
116+
pub async fn new(config: &StorageNodeConfig, raft_db: Option<SimpleDb>) -> Self {
117117
let use_raft = config.storage_raft != 0;
118118

119119
if config.backup_restore.unwrap_or(false) {
@@ -126,7 +126,7 @@ impl StorageRaft {
126126
.collect::<Vec<String>>();
127127
let raft_active = ActiveRaft::new(
128128
config.storage_node_idx,
129-
&create_socket_addr_for_list(&storage_node_urls).unwrap_or_default(),
129+
&create_socket_addr_for_list(&storage_node_urls).await.unwrap_or_default(),
130130
use_raft,
131131
Duration::from_millis(config.storage_raft_tick_timeout as u64),
132132
db_utils::new_db(config.storage_db_mode, &DB_SPEC, raft_db, None),

src/user.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -153,9 +153,9 @@ impl UserNode {
153153
.get(config.user_compute_node_idx)
154154
.ok_or(UserError::ConfigError("Invalid compute index"))?;
155155
let compute_addr = create_socket_addr(&raw_compute_addr.address)
156-
.or_else(|_| Err(UserError::ConfigError("Invalid compute address")))?;
156+
.await.or_else(|_| Err(UserError::ConfigError("Invalid compute address")))?;
157157

158-
let tls_addr = create_socket_addr(&addr).unwrap();
158+
let tls_addr = create_socket_addr(&addr).await.unwrap();
159159
let tcp_tls_config = TcpTlsConfig::from_tls_spec(tls_addr, &config.tls_config)?;
160160
let api_addr = SocketAddr::new(tls_addr.ip(), config.user_api_port);
161161
let api_tls_info = config

src/utils.rs

Lines changed: 64 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -35,12 +35,13 @@ use std::io::Read;
3535
use std::net::{IpAddr, SocketAddr};
3636
use std::sync::{Arc, Mutex};
3737
use std::time::Duration;
38+
use tokio::runtime::Runtime;
3839
use tokio::sync::{mpsc, oneshot};
3940
use tokio::task;
4041
use tokio::time::Instant;
4142
use tracing::{trace, warn};
4243
use trust_dns_resolver::config::*;
43-
use trust_dns_resolver::Resolver;
44+
use trust_dns_resolver::TokioAsyncResolver;
4445
use url::Url;
4546

4647
pub type RoutesPoWInfo = Arc<Mutex<BTreeMap<String, usize>>>;
@@ -470,43 +471,73 @@ pub fn generate_random_num(len: usize) -> Vec<u8> {
470471
/// ### Arguments
471472
///
472473
/// * `url_str` - URL string to parse
473-
pub fn create_socket_addr(url_str: &str) -> Result<SocketAddr, Box<dyn std::error::Error>> {
474-
if let Ok(url) = Url::parse(url_str) {
475-
println!("url: {:?}", url);
476-
let host_str = url.host_str().ok_or("Invalid host")?;
477-
println!("host_str: {:?}", host_str);
478-
let port = url.port().unwrap_or(80);
479-
480-
// Check if the host is an IP address
481-
if let Ok(ip) = host_str.parse::<IpAddr>() {
482-
// Handle as direct IP address
483-
Ok(SocketAddr::new(ip, port))
474+
pub async fn create_socket_addr(url_str: &str) -> Result<SocketAddr, Box<dyn std::error::Error>> {
475+
let thread_url = url_str.to_owned();
476+
let handle = tokio::task::spawn_blocking(move || {
477+
if let Ok(url) = Url::parse(&thread_url.clone()) {
478+
// println!("url: {:?}", url);
479+
let host_str = match url.host_str() {
480+
Some(v) => v,
481+
None => return None,
482+
};
483+
// println!("host_str: {:?}", host_str);
484+
let port = url.port().unwrap_or(80);
485+
486+
// Check if the host is an IP address
487+
if let Ok(ip) = host_str.parse::<IpAddr>() {
488+
// Handle as direct IP address
489+
Some(SocketAddr::new(ip, port))
490+
} else {
491+
let io_loop = Runtime::new().unwrap();
492+
493+
// Handle as domain name
494+
let resolver = io_loop.block_on(async {
495+
TokioAsyncResolver::tokio(ResolverConfig::default(), ResolverOpts::default())
496+
});
497+
498+
let lookup_future = resolver.lookup_ip(host_str);
499+
let response = io_loop.block_on(lookup_future).unwrap();
500+
let ip = match response.iter().next() {
501+
Some(ip) => ip,
502+
None => return None,
503+
};
504+
Some(SocketAddr::new(ip, port))
505+
}
484506
} else {
485-
// Handle as domain name
486-
let resolver = Resolver::new(ResolverConfig::default(), ResolverOpts::default())?;
487-
let response = resolver.lookup_ip(host_str)?;
488-
let ip = response.iter().next().ok_or("No IP addresses found")?;
489-
Ok(SocketAddr::new(ip, port))
507+
// Handle as direct IP address with optional port
508+
let parts: Vec<&str> = thread_url.split(':').collect();
509+
let ip = match parts[0].parse::<IpAddr>() {
510+
Ok(ip) => ip,
511+
Err(_e) => return None,
512+
};
513+
let port = if parts.len() > 1 {
514+
match parts[1].parse::<u16>() {
515+
Ok(port) => port,
516+
Err(_e) => return None,
517+
}
518+
} else {
519+
80
520+
};
521+
Some(SocketAddr::new(ip, port))
490522
}
491-
} else {
492-
// Handle as direct IP address with optional port
493-
let parts: Vec<&str> = url_str.split(':').collect();
494-
let ip = parts[0].parse::<IpAddr>()?;
495-
let port = if parts.len() > 1 {
496-
parts[1].parse::<u16>()?
497-
} else {
498-
80
499-
};
500-
Ok(SocketAddr::new(ip, port))
523+
});
524+
525+
match handle.await {
526+
Ok(v) => match v {
527+
Some(v) => Ok(v),
528+
None => Err("Failed to parse URL".into()),
529+
},
530+
Err(_e) => Err("Failed to parse URL".into()),
501531
}
502532
}
503533

504-
pub fn create_socket_addr_for_list(
534+
pub async fn create_socket_addr_for_list(
505535
urls: &[String],
506536
) -> Result<Vec<SocketAddr>, Box<dyn std::error::Error>> {
507537
let mut result = Vec::new();
508538
for url in urls {
509-
result.push(create_socket_addr(url)?);
539+
let socket_addr = create_socket_addr(url).await?;
540+
result.push(socket_addr);
510541
}
511542
Ok(result)
512543
}
@@ -1235,16 +1266,16 @@ mod util_tests {
12351266
use super::*;
12361267
use std::net::Ipv4Addr;
12371268

1238-
#[test]
1269+
#[tokio::test]
12391270
/// Tests whether URL strings can be parsed successfully.
12401271
/// Testing DNS resolution is not possible in unit tests due to the lack of static IPs to test against,
12411272
/// so if you have any, please add them here
1242-
fn test_create_socket_addr() {
1273+
async fn test_create_socket_addr() {
12431274
let ip_raw = "0.0.0.0".to_string();
12441275
let ip_with_port = "0.0.0.0:12300".to_string();
12451276

1246-
let ip_addr = create_socket_addr(&ip_raw).unwrap();
1247-
let ip_with_port_addr = create_socket_addr(&ip_with_port).unwrap();
1277+
let ip_addr = create_socket_addr(&ip_raw).await.unwrap();
1278+
let ip_with_port_addr = create_socket_addr(&ip_with_port).await.unwrap();
12481279

12491280
assert_eq!(
12501281
ip_addr,

0 commit comments

Comments
 (0)