Skip to content

Commit

Permalink
Networking & Other Refactoring (#115)
Browse files Browse the repository at this point in the history
* Networking & Other Refactoring

Signed-off-by: Maksim Dimitrov <[email protected]>

* refactor: Replace other_certificates with certificates count

Signed-off-by: Maksim Dimitrov <[email protected]>

---------

Signed-off-by: Maksim Dimitrov <[email protected]>
Co-authored-by: Angel Petrov <[email protected]>
  • Loading branch information
dimitrovmaksim and Angel-Petrov authored Jan 8, 2024
1 parent d5d59e7 commit 7f1be4a
Show file tree
Hide file tree
Showing 6 changed files with 57 additions and 93 deletions.
23 changes: 9 additions & 14 deletions core/network/examples/peer_outbound_ping.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use std::{
array,
env::args,
io,
net::{IpAddr, Ipv6Addr},
net::{IpAddr, SocketAddr},
str::FromStr,
time::{Duration, SystemTime},
};
Expand All @@ -27,12 +28,14 @@ fn main() -> io::Result<()> {
let port = args().nth(2).expect("no port given");
let port: u16 = port.parse().unwrap();

let addr = SocketAddr::new(peer_ip, port);

let key_path = random_manager::tmp_path(10, Some(".key")).unwrap();
let cert_path = random_manager::tmp_path(10, Some(".cert")).unwrap();
cert_manager::x509::generate_and_write_pem(None, &key_path, &cert_path)?;

let connector = outbound::Connector::new_from_pem(&key_path, &cert_path)?;
let mut stream = connector.connect(peer_ip, port, Duration::from_secs(10))?;
let mut stream = connector.connect(addr, Duration::from_secs(10))?;
log::info!("peer certificate:\n\n{}", stream.peer_certificate_pem);

log::info!("sending version...");
Expand All @@ -41,21 +44,13 @@ fn main() -> io::Result<()> {
.duration_since(SystemTime::UNIX_EPOCH)
.expect("unexpected None duration_since")
.as_secs();
let tracked_subnets = vec![
Id::from_slice(&random_manager::secure_bytes(32).unwrap()),
Id::from_slice(&random_manager::secure_bytes(32).unwrap()),
Id::from_slice(&random_manager::secure_bytes(32).unwrap()),
Id::from_slice(&random_manager::secure_bytes(32).unwrap()),
Id::from_slice(&random_manager::secure_bytes(32).unwrap()),
];
let mut tracked_subnets_bytes: Vec<Vec<u8>> = Vec::new();
for id in tracked_subnets.iter() {
tracked_subnets_bytes.push(id.as_ref().to_vec());
}
let tracked_subnets: [Id; 5] =
array::from_fn(|_| Id::from_slice(&random_manager::secure_bytes(32).unwrap()));

let msg = message::version::Message::default()
.network_id(1000000)
.my_time(now_unix)
.ip_addr(IpAddr::V6(Ipv6Addr::LOCALHOST))
.ip_addr(addr.ip())
.ip_port(0)
.my_version("avalanche/1.2.3".to_string())
.sig(random_manager::secure_bytes(64).unwrap())
Expand Down
10 changes: 4 additions & 6 deletions core/network/src/peer/inbound.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::{
io::{self, Error, ErrorKind},
net::{IpAddr, SocketAddr},
net::SocketAddr,
sync::Arc,
};

Expand Down Expand Up @@ -35,7 +35,7 @@ impl Listener {
.with_single_cert(vec![certificate], private_key)
.map_err(|e| {
Error::new(
ErrorKind::InvalidInput,
ErrorKind::Other,
format!("failed to create TLS server config '{}'", e),
)
})?;
Expand All @@ -47,11 +47,9 @@ impl Listener {

/// Creates a listening stream for the specified IP and port.
/// ref. <https://github.com/rustls/hyper-rustls/blob/main/examples/server.rs>
pub fn listen(&self, ip: IpAddr, port: u16) -> io::Result<Stream> {
log::info!("[rustls] listening on {}:{}", ip, port);
pub fn listen(&self, addr: SocketAddr) -> io::Result<Stream> {
log::info!("[rustls] listening on {addr}");

// ref. https://doc.rust-lang.org/std/net/enum.SocketAddr.html
let addr = SocketAddr::new(ip, port);
let incoming = AddrIncoming::bind(&addr)
.map_err(|e| Error::new(ErrorKind::Other, format!("failed to bind '{}'", e)))?;
let local_addr = incoming.local_addr();
Expand Down
31 changes: 8 additions & 23 deletions core/network/src/peer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,8 @@ mod test {
use rcgen::CertificateParams;
use rustls::ServerConfig;
use std::{
io::{self, Error, ErrorKind},
net::{IpAddr, SocketAddr},
str::FromStr,
io,
net::{IpAddr, Ipv4Addr, SocketAddr},
sync::Arc,
time::Duration,
};
Expand All @@ -48,9 +47,11 @@ mod test {
// .is_test(true)
.try_init();

let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 9649);

let server_key_path = random_manager::tmp_path(10, None)?;
let server_cert_path = random_manager::tmp_path(10, None)?;
let server_cert_sna_params = CertificateParams::new(vec!["127.0.0.1".to_string()]);
let server_cert_sna_params = CertificateParams::new([addr.ip().to_string()]);
cert_manager::x509::generate_and_write_pem(
Some(server_cert_sna_params),
&server_key_path,
Expand All @@ -63,25 +64,13 @@ mod test {
server_cert_path.as_ref(),
)?;

let ip_addr = String::from("127.0.0.1");
let ip_port = 9649_u16;

let join_handle = tokio::task::spawn(async move {
let server_config = ServerConfig::builder()
.with_safe_defaults()
.with_no_client_auth()
.with_single_cert(vec![certificate], private_key)
.map_err(|e| {
Error::new(
ErrorKind::InvalidInput,
format!("failed to create TLS server config '{}'", e),
)
})
.unwrap();

let ip = ip_addr.clone().parse::<std::net::IpAddr>().unwrap();
let addr = SocketAddr::new(ip, ip_port);

let tls_acceptor = TlsAcceptor::from(Arc::new(server_config));
let tcp_listener = TcpListener::bind(addr).await.unwrap();

Expand All @@ -104,20 +93,16 @@ mod test {

let client_key_path = random_manager::tmp_path(10, None)?;
let client_cert_path = random_manager::tmp_path(10, None)?;
let client_cert_sna_params = CertificateParams::new(vec!["127.0.0.1".to_string()]);
let client_cert_sna_params = CertificateParams::new([addr.ip().to_string()]);
cert_manager::x509::generate_and_write_pem(
Some(client_cert_sna_params),
&client_key_path,
&client_cert_path,
)?;
log::info!("client cert path: {}", client_cert_path);
log::info!("client cert path: {client_cert_path}");

let connector = outbound::Connector::new_from_pem(&client_key_path, &client_cert_path)?;
let stream = connector.connect(
IpAddr::from_str("127.0.0.1").unwrap(),
ip_port,
Duration::from_secs(5),
)?;
let stream = connector.connect(addr, Duration::from_secs(5))?;

log::info!("peer certificate:\n\n{}", stream.peer_certificate_pem);

Expand Down
64 changes: 26 additions & 38 deletions core/network/src/peer/outbound.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::net;
use std::net::SocketAddr;
use std::{
io::{self, Error, ErrorKind, Read, Write},
net::TcpStream,
Expand Down Expand Up @@ -52,70 +52,58 @@ impl Connector {

/// Creates a connection to the specified peer's IP and port.
/// ref. <https://pkg.go.dev/github.com/ava-labs/avalanchego/network/peer#NewTLSClientUpgrader>
pub fn connect(
&self,
peer_ip: net::IpAddr,
port: u16,
_timeout: Duration,
) -> io::Result<Stream> {
info!("[rustls] connecting to {}:{}", peer_ip, port);

// ref. https://doc.rust-lang.org/std/net/enum.SocketAddr.html
let sock_addr = format!("{}:{}", peer_ip, port);
pub fn connect(&self, addr: SocketAddr, _timeout: Duration) -> io::Result<Stream> {
info!("[rustls] connecting to {addr}");

// This is now possible with rustls v0.21.0+
let server_name: ServerName = ServerName::try_from(peer_ip.to_string().as_ref()).unwrap();
let server_name = ServerName::IpAddress(addr.ip());
let mut conn =
rustls::ClientConnection::new(self.client_config.clone(), server_name).unwrap();
let mut sock = TcpStream::connect(sock_addr.clone()).unwrap();
let mut sock = TcpStream::connect(addr).unwrap();
let mut tls = rustls::Stream::new(&mut conn, &mut sock);

let binding = format!("GET / HTTP/1.1\r\nHost: {}:{}\r\nConnection: close\r\nAccept-Encoding: identity\r\n\r\n", peer_ip, port);
let header = binding.as_bytes();
let binding = format!("GET / HTTP/1.1\r\nHost: {addr}\r\nConnection: close\r\nAccept-Encoding: identity\r\n\r\n");

// This is a dummy write to ensure that the certificate data is transmitted.
// Without this GET we get an error: Error: Custom { kind: NotConnected, error: "no peer certificate found" }
match tls.write_all(header) {
Ok(_) => {
println!("\n\n WROTE REQUEST\n\n");
}
Err(e) => {
println!("failed to write request: {}", e);
}
};
tls.write_all(binding.as_bytes())?;
info!("\n\n WROTE REQUEST\n\n");

info!("retrieving peer certificates...");
let peer_certs = conn.peer_certificates();
if peer_certs.is_none() {
return Err(Error::new(
ErrorKind::NotConnected,
"no peer certificate found",
));
}

// The certificate details are used to establish node identity.
// See https://docs.avax.network/specs/cryptographic-primitives#tls-certificates.
// The avalanchego certs are intentionally NOT signed by a legitimate CA.
let peer_certs = peer_certs.unwrap();
let peer_certificate = peer_certs[0].clone();
let (peer_certificate, total_certificates) = conn
.peer_certificates()
.and_then(|certs| {
let total_certs = certs.len();
certs.split_first().map(|(first, _)| (first, total_certs))
})
.ok_or(Error::new(
ErrorKind::NotConnected,
"no peer certificate found",
))?;

let peer_node_id = node::Id::from_cert_der_bytes(&peer_certificate.0)?;
info!(
"successfully connected to {} (total {} certificates, first cert {}-byte)",
peer_node_id,
peer_certs.len(),
total_certificates,
peer_certificate.0.len(),
);

Ok(Stream {
addr: sock_addr,
conn,
addr,
peer_certificate: peer_certificate.clone(),
peer_node_id,

#[cfg(feature = "pem_encoding")]
peer_certificate_pem: pem::encode(&Pem::new(
"CERTIFICATE".to_string(),
peer_certificate.0,
peer_certificate.0.clone(),
)),

conn,
})
}
}
Expand Down Expand Up @@ -154,7 +142,7 @@ fn test_connector() {
/// Represents a connection to a peer.
/// ref. <https://github.com/rustls/rustls/commit/b8024301747fb0328c9493d7cf7268e0de17ffb3>
pub struct Stream {
pub addr: String,
pub addr: SocketAddr,

/// ref. <https://docs.rs/rustls/latest/rustls/enum.Connection.html>
/// ref. <https://docs.rs/rustls/latest/rustls/client/struct.ClientConnection.html>
Expand Down
16 changes: 8 additions & 8 deletions crates/avalanche-types/src/message/version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,13 @@ impl Message {
}

#[must_use]
pub fn tracked_subnets(mut self, tracked_subnets: Vec<ids::Id>) -> Self {
let mut tracked_subnet_bytes: Vec<prost::bytes::Bytes> =
Vec::with_capacity(tracked_subnets.len());
for id in tracked_subnets.iter() {
tracked_subnet_bytes.push(prost::bytes::Bytes::from(id.to_vec()));
}
self.msg.tracked_subnets = tracked_subnet_bytes;
pub fn tracked_subnets(mut self, tracked_subnets: impl AsRef<[ids::Id]>) -> Self {
self.msg.tracked_subnets = tracked_subnets
.as_ref()
.iter()
.map(|id| id.to_vec())
.map(prost::bytes::Bytes::from)
.collect();
self
}

Expand Down Expand Up @@ -180,7 +180,7 @@ fn test_message() {
.my_version(String::from("v1.2.3"))
.my_version_time(1234567)
.sig(random_manager::secure_bytes(65).unwrap())
.tracked_subnets(vec![
.tracked_subnets([
ids::Id::empty(),
ids::Id::empty(),
ids::Id::empty(),
Expand Down
6 changes: 2 additions & 4 deletions tests/avalanchego-byzantine/src/tests/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::{
net::{IpAddr, Ipv4Addr},
str::FromStr,
net::{IpAddr, Ipv4Addr, SocketAddr},
thread,
time::{Duration, Instant, SystemTime},
};
Expand Down Expand Up @@ -269,8 +268,7 @@ async fn byzantine() {
utils::urls::extract_scheme_host_port_path_chain_alias(&rpc_eps[0]).unwrap();
log::info!("connecting to the first peer {} with port {:?}", host, port);
let res = connector.connect(
IpAddr::from_str("127.0.0.1").unwrap(),
port.unwrap(),
SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port.unwrap()),
Duration::from_secs(10),
);
if res.is_err() {
Expand Down

0 comments on commit 7f1be4a

Please sign in to comment.