From b02dc40425e21d373e70c568e23106d4c24f561f Mon Sep 17 00:00:00 2001 From: Marc Nijdam Date: Sat, 7 Oct 2023 19:23:33 -0700 Subject: [PATCH 1/8] bump deps --- Cargo.lock | 214 ++++++++++++++++++++++++++--------------------------- 1 file changed, 107 insertions(+), 107 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e12551cd..e34fc68f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -62,9 +62,9 @@ dependencies = [ [[package]] name = "aho-corasick" -version = "1.0.5" +version = "1.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0c378d78423fdad8089616f827526ee33c19f2fddbd5de1629152c9593ba4783" +checksum = "ea5d730647d4fadd988536d06fecce94b7b4f2a7efdae548f1cf4b63205518ab" dependencies = [ "memchr", ] @@ -80,9 +80,9 @@ dependencies = [ [[package]] name = "anstyle" -version = "1.0.3" +version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b84bf0a05bbb2a83e5eb6fa36bb6e87baa08193c35ff52bbf6b38d8af2890e46" +checksum = "7079075b41f533b8c61d2a4d073c4676e1f8b249ff94a393b0595db304e0dd87" [[package]] name = "anyhow" @@ -121,7 +121,7 @@ checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193" dependencies = [ "proc-macro2", "quote", - "syn 2.0.33", + "syn 2.0.38", ] [[package]] @@ -132,7 +132,7 @@ checksum = "bc00ceb34980c03614e35a3a4e218276a0a824e911d07651cd0d858a51e8c0f0" dependencies = [ "proc-macro2", "quote", - "syn 2.0.33", + "syn 2.0.38", ] [[package]] @@ -216,7 +216,7 @@ checksum = "9ba43ea6f343b788c8764558649e08df62f86c6ef251fdaeb1ffd010a9ae50a2" [[package]] name = "beacon" version = "0.1.0" -source = "git+https://github.com/helium/proto?branch=master#983596a457e7b7d8a1850c5bf70312b2537c57c8" +source = "git+https://github.com/helium/proto?branch=master#d94ed4b4046263eb78003d484d94ad3cbff7a55f" dependencies = [ "base64", "byteorder", @@ -226,7 +226,7 @@ dependencies = [ "rand_chacha", "rust_decimal", "serde", - "sha2 0.10.7", + "sha2 0.10.8", "thiserror", ] @@ -249,7 +249,7 @@ dependencies = [ "regex", "rustc-hash", "shlex", - "syn 2.0.33", + "syn 2.0.38", "which", ] @@ -352,7 +352,7 @@ version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f5353f36341f7451062466f0b755b96ac3a9547e4d7f6b70d603fc721a7d7896" dependencies = [ - "sha2 0.10.7", + "sha2 0.10.8", "tinyvec", ] @@ -380,9 +380,9 @@ dependencies = [ [[package]] name = "byteorder" -version = "1.4.3" +version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610" +checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" [[package]] name = "bytes" @@ -416,9 +416,9 @@ checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" [[package]] name = "chrono" -version = "0.4.30" +version = "0.4.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "defd4e7873dbddba6c7c91e199c7fcb946abc4a6a4ac3195400bcfb01b5de877" +checksum = "7f2c685bad3eb3d45a01354cedb7d5faa66194d1d58ba6e267a8de788f79db38" dependencies = [ "num-traits", ] @@ -436,9 +436,9 @@ dependencies = [ [[package]] name = "clap" -version = "4.4.3" +version = "4.4.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "84ed82781cea27b43c9b106a979fe450a13a31aab0500595fb3fc06616de08e6" +checksum = "d04704f56c2cde07f43e8e2c154b43f216dc5c92fc98ada720177362f953b956" dependencies = [ "clap_builder", "clap_derive", @@ -446,9 +446,9 @@ dependencies = [ [[package]] name = "clap_builder" -version = "4.4.2" +version = "4.4.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2bb9faaa7c2ef94b2743a21f5a29e6f0010dff4caa69ac8e9d6cf8b6fa74da08" +checksum = "0e231faeaca65ebd1ea3c737966bf858971cd38c3849107aa3ea7de90a804e45" dependencies = [ "anstyle", "clap_lex", @@ -463,7 +463,7 @@ dependencies = [ "heck", "proc-macro2", "quote", - "syn 2.0.33", + "syn 2.0.38", ] [[package]] @@ -617,7 +617,7 @@ dependencies = [ "serde", "serde_derive", "serialport", - "sha2 0.10.7", + "sha2 0.10.8", "thiserror", ] @@ -685,9 +685,9 @@ checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5" [[package]] name = "errno" -version = "0.3.3" +version = "0.3.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "136526188508e25c6fef639d7927dfb3e0e3084488bf202267829cf7fc23dbdd" +checksum = "add4f07d43996f76ef320709726a556a9d4f965d9410d8d0271132d2f8293480" dependencies = [ "errno-dragonfly", "libc", @@ -814,7 +814,7 @@ checksum = "89ca545a94061b6365f2c7355b4b32bd20df3ff95f02da9329b34ccc3bd6ee72" dependencies = [ "proc-macro2", "quote", - "syn 2.0.33", + "syn 2.0.38", ] [[package]] @@ -872,7 +872,7 @@ dependencies = [ "serde", "serde_json", "serde_urlencoded", - "sha2 0.9.9", + "sha2 0.10.8", "signature", "thiserror", "time", @@ -968,9 +968,9 @@ dependencies = [ [[package]] name = "hashbrown" -version = "0.14.0" +version = "0.14.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2c6201b9ff9fd90a5a3bac2e56a830d0caa509576f0e503818ee82c181b3437a" +checksum = "7dfda62a12f55daeae5015f81b0baea145391cb4520f86c248fc615d72640d12" [[package]] name = "heck" @@ -998,7 +998,7 @@ dependencies = [ "rand_core", "rsa", "serde", - "sha2 0.10.7", + "sha2 0.10.8", "signature", "thiserror", "tss2", @@ -1007,7 +1007,7 @@ dependencies = [ [[package]] name = "helium-proto" version = "0.1.0" -source = "git+https://github.com/helium/proto?branch=master#983596a457e7b7d8a1850c5bf70312b2537c57c8" +source = "git+https://github.com/helium/proto?branch=master#d94ed4b4046263eb78003d484d94ad3cbff7a55f" dependencies = [ "bytes", "prost", @@ -1020,9 +1020,9 @@ dependencies = [ [[package]] name = "hermit-abi" -version = "0.3.2" +version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "443144c8cdadd93ebf52ddb4056d257f5b52c04d3c804e657d19eb73fc33668b" +checksum = "d77f7ec81a6d05a3abb01ab6eb7590f6083d08449fe5a1c8b1e620283546ccb7" [[package]] name = "hmac" @@ -1148,12 +1148,12 @@ dependencies = [ [[package]] name = "indexmap" -version = "2.0.0" +version = "2.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d5477fe2230a79769d8dc68e0eabf5437907c0457a5614a9e8dddb67f65eb65d" +checksum = "8adf3ddd720272c6ea8bf59463c04e0f93d0bbf7c5439b691bca2987e0270897" dependencies = [ "equivalent", - "hashbrown 0.14.0", + "hashbrown 0.14.1", ] [[package]] @@ -1221,9 +1221,9 @@ checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55" [[package]] name = "libc" -version = "0.2.148" +version = "0.2.149" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9cdc71e17332e86d2e1d38c1f99edcb6288ee11b815fb1a4b049eaa2114d369b" +checksum = "a08173bc88b7955d1b3145aa561539096c421ac8debde8cbc3612ec635fee29b" [[package]] name = "libloading" @@ -1237,9 +1237,9 @@ dependencies = [ [[package]] name = "libm" -version = "0.2.7" +version = "0.2.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f7012b1bbb0719e1097c47611d3898568c546d597c2e74d66f6087edd5233ff4" +checksum = "4ec2a862134d2a7d32d7983ddcdd1c4923530833c9f2ea1a44fc5fa473989058" [[package]] name = "linux-raw-sys" @@ -1288,9 +1288,9 @@ dependencies = [ [[package]] name = "matchit" -version = "0.7.2" +version = "0.7.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ed1202b2a6f884ae56f04cff409ab315c5ce26b5e58d7412e484f01fd52f52ef" +checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94" [[package]] name = "md5" @@ -1300,9 +1300,9 @@ checksum = "490cc448043f947bae3cbee9c203358d62dbee0db12107a74be5c30ccfd09771" [[package]] name = "memchr" -version = "2.6.3" +version = "2.6.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8f232d6ef707e1956a43342693d2a31e72989554d58299d7a88738cc95b0d35c" +checksum = "f665ee40bc4a3c5590afb1e9677db74a508659dfd71e126420da8274909a0167" [[package]] name = "mime" @@ -1414,9 +1414,9 @@ dependencies = [ [[package]] name = "num-traits" -version = "0.2.16" +version = "0.2.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f30b0abd723be7e2ffca1272140fac1a2f084c77ec3e123c192b66af1ee9e6c2" +checksum = "39e3200413f237f41ab11ad6d161bc7239c84dcb631773ccd7de3dfe4b5c267c" dependencies = [ "autocfg", "libm", @@ -1440,7 +1440,7 @@ dependencies = [ "proc-macro-crate 1.3.1", "proc-macro2", "quote", - "syn 2.0.33", + "syn 2.0.38", ] [[package]] @@ -1496,9 +1496,9 @@ checksum = "9b2a4787296e9989611394c33f193f676704af1686e70b8f8033ab5ba9a35a94" [[package]] name = "pest" -version = "2.7.3" +version = "2.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d7a4d085fd991ac8d5b05a147b437791b4260b76326baf0fc60cf7c9c27ecd33" +checksum = "c022f1e7b65d6a24c0dbbd5fb344c66881bc01f3e5ae74a1c8100f2f985d98a4" dependencies = [ "memchr", "thiserror", @@ -1512,7 +1512,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e1d3afd2628e69da2be385eb6f2fd57c8ac7977ceeff6dc166ff1657b0e386a9" dependencies = [ "fixedbitset", - "indexmap 2.0.0", + "indexmap 2.0.2", ] [[package]] @@ -1532,7 +1532,7 @@ checksum = "4359fd9c9171ec6e8c62926d6faaf553a8dc3f64e1507e76da7911b4f6a04405" dependencies = [ "proc-macro2", "quote", - "syn 2.0.33", + "syn 2.0.38", ] [[package]] @@ -1560,7 +1560,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ae005bd773ab59b4725093fd7df83fd7892f7d8eafb48dbd7de6e024e4215f9d" dependencies = [ "proc-macro2", - "syn 2.0.33", + "syn 2.0.38", ] [[package]] @@ -1584,18 +1584,18 @@ dependencies = [ [[package]] name = "proc-macro2" -version = "1.0.67" +version = "1.0.68" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3d433d9f1a3e8c1263d9456598b16fec66f4acc9a74dacffd35c7bb09b3a1328" +checksum = "5b1106fec09662ec6dd98ccac0f81cef56984d0b49f75c92d8cbad76e20c005c" dependencies = [ "unicode-ident", ] [[package]] name = "prost" -version = "0.12.0" +version = "0.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aa8473a65b88506c106c28ae905ca4a2b83a2993640467a41bb3080627ddfd2c" +checksum = "f4fdd22f3b9c31b53c060df4a0613a1c7f062d4115a2b984dd15b1858f7e340d" dependencies = [ "bytes", "prost-derive", @@ -1603,9 +1603,9 @@ dependencies = [ [[package]] name = "prost-build" -version = "0.12.0" +version = "0.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "30d3e647e9eb04ddfef78dfee2d5b3fefdf94821c84b710a3d8ebc89ede8b164" +checksum = "8bdf592881d821b83d471f8af290226c8d51402259e9bb5be7f9f8bdebbb11ac" dependencies = [ "bytes", "heck", @@ -1618,29 +1618,29 @@ dependencies = [ "prost", "prost-types", "regex", - "syn 2.0.33", + "syn 2.0.38", "tempfile", "which", ] [[package]] name = "prost-derive" -version = "0.12.0" +version = "0.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "56075c27b20ae524d00f247b8a4dc333e5784f889fe63099f8e626bc8d73486c" +checksum = "265baba7fabd416cf5078179f7d2cbeca4ce7a9041111900675ea7c4cb8a4c32" dependencies = [ "anyhow", "itertools", "proc-macro2", "quote", - "syn 2.0.33", + "syn 2.0.38", ] [[package]] name = "prost-types" -version = "0.12.0" +version = "0.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cebe0a918c97f86c217b0f76fd754e966f8b9f41595095cf7d74cb4e59d730f6" +checksum = "e081b29f63d83a4bc75cfc9f3fe424f9156cf92d8a4f0c9407cce9a1b67327cf" dependencies = [ "prost", ] @@ -1721,9 +1721,9 @@ dependencies = [ [[package]] name = "regex" -version = "1.9.5" +version = "1.9.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "697061221ea1b4a94a624f67d0ae2bfe4e22b8a17b6a192afb11046542cc8c47" +checksum = "ebee201405406dbf528b8b672104ae6d6d63e6d118cb10e4d51abbc7b58044ff" dependencies = [ "aho-corasick", "memchr", @@ -1733,9 +1733,9 @@ dependencies = [ [[package]] name = "regex-automata" -version = "0.3.8" +version = "0.3.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c2f401f4955220693b56f8ec66ee9c78abffd8d1c4f23dc41a23839eb88f0795" +checksum = "59b23e92ee4318893fa3fe3e6fb365258efbfe6ac6ab30f090cdcbb7aa37efa9" dependencies = [ "aho-corasick", "memchr", @@ -1750,9 +1750,9 @@ checksum = "dbb5fb1acd8a1a18b3dd5be62d25485eb770e05afb408a9627d14d451bae12da" [[package]] name = "rend" -version = "0.4.0" +version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "581008d2099240d37fb08d77ad713bcaec2c4d89d50b5b21a8bb1996bbab68ab" +checksum = "a2571463863a6bd50c32f94402933f03457a3fbaf697a707c5be741e459f08fd" dependencies = [ "bytecheck", ] @@ -1854,9 +1854,9 @@ checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2" [[package]] name = "rustix" -version = "0.37.23" +version = "0.37.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4d69718bf81c6127a49dc64e44a742e8bb9213c0ff8869a22c308f84c1d4ab06" +checksum = "4279d76516df406a8bd37e7dff53fd37d1a093f997a3c34a5c21658c126db06d" dependencies = [ "bitflags 1.3.2", "errno", @@ -1956,14 +1956,14 @@ checksum = "4eca7ac642d82aa35b60049a6eccb4be6be75e599bd2e9adb5f875a737654af2" dependencies = [ "proc-macro2", "quote", - "syn 2.0.33", + "syn 2.0.38", ] [[package]] name = "serde_json" -version = "1.0.106" +version = "1.0.107" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2cc66a619ed80bf7a0f6b17dd063a84b88f6dea1813737cf469aef1d081142c2" +checksum = "6b420ce6e3d8bd882e9b243c6eed35dbc9a6110c9769e74b584e0d68d1f20c65" dependencies = [ "itoa", "ryu", @@ -1978,7 +1978,7 @@ checksum = "8725e1dfadb3a50f7e5ce0b1a540466f6ed3fe7a0fca2ac2b8b831d31316bd00" dependencies = [ "proc-macro2", "quote", - "syn 2.0.33", + "syn 2.0.38", ] [[package]] @@ -2025,9 +2025,9 @@ dependencies = [ [[package]] name = "sha2" -version = "0.10.7" +version = "0.10.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "479fb9d862239e610720565ca91403019f2f00410f1864c5aa7479b950a76ed8" +checksum = "793db75ad2bcafc3ffa7c68b215fee268f537982cd901d132f89c6343f3a3dc8" dependencies = [ "cfg-if", "cpufeatures", @@ -2036,9 +2036,9 @@ dependencies = [ [[package]] name = "sharded-slab" -version = "0.1.4" +version = "0.1.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "900fba806f70c630b0a382d0d825e17a0f19fcd059a2ade1ff237bcddf446b31" +checksum = "f40ca3c46823713e0d4209592e8d6e826aa57e928f09752619fc696c499637f6" dependencies = [ "lazy_static", ] @@ -2097,9 +2097,9 @@ dependencies = [ [[package]] name = "smallvec" -version = "1.11.0" +version = "1.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "62bb4feee49fdd9f707ef802e22365a35de4b7b299de4763d44bfea899442ff9" +checksum = "942b4a808e05215192e39f4ab80813e599068285906cc91aa64f923db842bd5a" [[package]] name = "socket2" @@ -2146,9 +2146,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.33" +version = "2.0.38" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9caece70c63bfba29ec2fed841a09851b14a235c60010fa4de58089b6c025668" +checksum = "e96b79aaa137db8f61e26363a0c9b47d8b4ec75da28b7d1d614c2303e232408b" dependencies = [ "proc-macro2", "quote", @@ -2183,22 +2183,22 @@ dependencies = [ [[package]] name = "thiserror" -version = "1.0.48" +version = "1.0.49" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9d6d7a740b8a666a7e828dd00da9c0dc290dff53154ea77ac109281de90589b7" +checksum = "1177e8c6d7ede7afde3585fd2513e611227efd6481bd78d2e82ba1ce16557ed4" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.48" +version = "1.0.49" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "49922ecae66cc8a249b77e68d1d0623c1b2c514f0060c27cdc68bd62a1219d35" +checksum = "10712f02019e9288794769fba95cd6847df9874d49d871d062172f9dd41bc4cc" dependencies = [ "proc-macro2", "quote", - "syn 2.0.33", + "syn 2.0.38", ] [[package]] @@ -2213,9 +2213,9 @@ dependencies = [ [[package]] name = "time" -version = "0.3.28" +version = "0.3.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "17f6bb557fd245c28e6411aa56b6403c689ad95061f50e4be16c274e70a17e48" +checksum = "426f806f4089c493dcac0d24c29c01e2c38baf8e30f1b716ee37e83d200b18fe" dependencies = [ "deranged", "itoa", @@ -2226,15 +2226,15 @@ dependencies = [ [[package]] name = "time-core" -version = "0.1.1" +version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7300fbefb4dadc1af235a9cef3737cea692a9d97e1b9cbcd4ebdae6f8868e6fb" +checksum = "ef927ca75afb808a4d64dd374f00a2adf8d0fcff8e7b184af886c3c87ec4a3f3" [[package]] name = "time-macros" -version = "0.2.14" +version = "0.2.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1a942f44339478ef67935ab2bbaec2fb0322496cf3cbe84b261e06ac3814c572" +checksum = "4ad70d68dba9e1f8aceda7aa6711965dfec1cac869f311a51bd08b3a2ccbce20" dependencies = [ "time-core", ] @@ -2289,7 +2289,7 @@ checksum = "630bdcf245f78637c13ec01ffae6187cca34625e8c63150d424b59e55af2675e" dependencies = [ "proc-macro2", "quote", - "syn 2.0.33", + "syn 2.0.38", ] [[package]] @@ -2305,9 +2305,9 @@ dependencies = [ [[package]] name = "tokio-util" -version = "0.7.8" +version = "0.7.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "806fe8c2c87eccc8b3267cbae29ed3ab2d0bd37fca70ab622e46aaa9375ddb7d" +checksum = "1d68074620f57a0b21594d9735eb2e98ab38b17f80d3fcb189fca266771ca60d" dependencies = [ "bytes", "futures-core", @@ -2338,16 +2338,16 @@ version = "0.19.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1b5bb770da30e5cbfde35a2d7b9b8a2c4b8ef89548a7a6aeab5c9a576e3e7421" dependencies = [ - "indexmap 2.0.0", + "indexmap 2.0.2", "toml_datetime", "winnow", ] [[package]] name = "tonic" -version = "0.10.0" +version = "0.10.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5469afaf78a11265c343a88969045c1568aa8ecc6c787dbf756e92e70f199861" +checksum = "d560933a0de61cf715926b9cac824d4c883c2c43142f787595e48280c40a1d0e" dependencies = [ "async-stream", "async-trait", @@ -2372,15 +2372,15 @@ dependencies = [ [[package]] name = "tonic-build" -version = "0.10.0" +version = "0.10.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8b477abbe1d18c0b08f56cd01d1bc288668c5b5cfd19b2ae1886bbf599c546f1" +checksum = "9d021fc044c18582b9a2408cd0dd05b1596e3ecdb5c4df822bb0183545683889" dependencies = [ "prettyplease", "proc-macro2", "prost-build", "quote", - "syn 2.0.33", + "syn 2.0.38", ] [[package]] @@ -2446,7 +2446,7 @@ checksum = "5f4f31f56159e98206da9efd823404b79b6ef3143b4a7ab76e67b1751b25a4ab" dependencies = [ "proc-macro2", "quote", - "syn 2.0.33", + "syn 2.0.38", ] [[package]] @@ -2493,9 +2493,9 @@ dependencies = [ [[package]] name = "typenum" -version = "1.16.0" +version = "1.17.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "497961ef93d974e23eb6f433eb5fe1b7930b659f06d12dec6fc44a8f554c0bba" +checksum = "42ff0bf0c66b8238c6f3b578df37d0b7848e55df8577b3f74f92a69acceeb825" [[package]] name = "ucd-trie" @@ -2643,9 +2643,9 @@ checksum = "ed94fce61571a4006852b7389a063ab983c02eb1bb37b47f8272ce92d06d9538" [[package]] name = "winnow" -version = "0.5.15" +version = "0.5.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7c2e3184b9c4e92ad5167ca73039d0c42476302ab603e2fec4487511f38ccefc" +checksum = "037711d82167854aff2018dfd193aa0fef5370f456732f0d5a0c59b0f1b4b907" dependencies = [ "memchr", ] @@ -2676,5 +2676,5 @@ checksum = "ce36e65b0d2999d2aafac989fb249189a141aee1f53c612c1f37d72631959f69" dependencies = [ "proc-macro2", "quote", - "syn 2.0.33", + "syn 2.0.38", ] From 55e7a1500cf80e8c4c14d9db2234f5cb6af64c67 Mon Sep 17 00:00:00 2001 From: Marc Nijdam Date: Sat, 7 Oct 2023 08:37:26 -0700 Subject: [PATCH 2/8] Make packet stream conduit generic --- src/service/conduit.rs | 146 +++++++++++++++++++++++++++++++++++++++++ src/service/mod.rs | 1 + 2 files changed, 147 insertions(+) create mode 100644 src/service/conduit.rs diff --git a/src/service/conduit.rs b/src/service/conduit.rs new file mode 100644 index 00000000..e6173fc5 --- /dev/null +++ b/src/service/conduit.rs @@ -0,0 +1,146 @@ +use std::sync::Arc; + +use crate::{ + service::{CONNECT_TIMEOUT, RPC_TIMEOUT}, + Keypair, Result, +}; + +use helium_proto::services::{Channel, Endpoint}; + +use http::Uri; +use tokio::sync::mpsc; +use tokio_stream::wrappers::ReceiverStream; + +/// A conduit is the tx/rx stream pair for the a streaming rpc on the +/// `packet_router` service. It does not connect on construction but on the +/// first messsage sent. +#[derive(Debug)] +struct Conduit { + tx: mpsc::Sender, + rx: tonic::Streaming, +} + +#[derive(Debug)] +pub struct ConduitService { + pub uri: Uri, + conduit: Option>, + keypair: Arc, + client: C, +} + +pub const CONDUIT_CAPACITY: usize = 50; + +/// The time between TCP keepalive messages to keep the connection to the packet +/// router open. Some load balancer disconnect after a number of seconds. AWS +/// NLBs are hardcoded to 350s so we pick a slightly shorter timeframe to send +/// keepalives +pub const TCP_KEEP_ALIVE_DURATION: std::time::Duration = std::time::Duration::from_secs(300); + +#[tonic::async_trait] +pub trait ConduitClient { + async fn init( + &mut self, + endpoint: Channel, + client_rx: ReceiverStream, + ) -> Result>; + + async fn register(&mut self, keypair: Arc) -> Result; +} + +impl Conduit { + async fn new(uri: Uri, client: &mut C) -> Result { + let endpoint = Endpoint::from(uri) + .timeout(RPC_TIMEOUT) + .connect_timeout(CONNECT_TIMEOUT) + .tcp_keepalive(Some(TCP_KEEP_ALIVE_DURATION)) + .connect_lazy(); + let (tx, client_rx) = mpsc::channel(CONDUIT_CAPACITY); + let rx = client + .init(endpoint, ReceiverStream::new(client_rx)) + .await?; + Ok(Self { tx, rx }) + } + + async fn recv(&mut self) -> Result> { + Ok(self.rx.message().await?) + } + + async fn send(&mut self, msg: U) -> Result { + Ok(self.tx.send(msg).await?) + } + + async fn register( + &mut self, + client: &mut C, + keypair: Arc, + ) -> Result { + let msg = client.register(keypair).await?; + Ok(self.tx.send(msg).await?) + } +} + +impl ConduitService { + pub fn new(uri: Uri, client: C, keypair: Arc) -> Self { + Self { + uri, + conduit: None, + keypair, + client, + } + } + + pub async fn send(&mut self, msg: U) -> Result { + if self.conduit.is_none() { + self.connect().await?; + } + // Unwrap since the above connect early exits if no conduit is created + match self.conduit.as_mut().unwrap().send(msg).await { + Ok(()) => Ok(()), + other => { + self.disconnect(); + other + } + } + } + + pub async fn recv(&mut self) -> Result> { + // Since recv is usually called from a select loop we don't try a + // connect every time it is called since the rate for attempted + // connections in failure setups would be as high as the loop rate of + // the caller. This relies on either a reconnect attempt or a packet + // send at a later time to reconnect the conduit. + if self.conduit.is_none() { + futures::future::pending::<()>().await; + return Ok(None); + } + match self.conduit.as_mut().unwrap().recv().await { + Ok(msg) if msg.is_some() => Ok(msg), + other => { + self.disconnect(); + other + } + } + } + + pub fn disconnect(&mut self) { + self.conduit = None; + } + + pub async fn connect(&mut self) -> Result { + let mut conduit = Conduit::new(self.uri.clone(), &mut self.client).await?; + conduit + .register(&mut self.client, self.keypair.clone()) + .await?; + self.conduit = Some(conduit); + Ok(()) + } + + pub async fn reconnect(&mut self) -> Result { + self.disconnect(); + self.connect().await + } + + pub fn is_connected(&self) -> bool { + self.conduit.is_some() + } +} diff --git a/src/service/mod.rs b/src/service/mod.rs index b3c061cd..f24eee2e 100644 --- a/src/service/mod.rs +++ b/src/service/mod.rs @@ -3,6 +3,7 @@ use std::time::Duration; pub const CONNECT_TIMEOUT: Duration = Duration::from_secs(10); pub const RPC_TIMEOUT: Duration = Duration::from_secs(5); +mod conduit; pub mod config; pub mod entropy; pub mod packet_router; From 04c9084f86dc58a83d679b77b0f15b2bfa529bac Mon Sep 17 00:00:00 2001 From: Marc Nijdam Date: Sun, 8 Oct 2023 08:05:05 -0700 Subject: [PATCH 3/8] Use generic conduit for packet_router --- src/beaconer.rs | 2 +- src/service/conduit.rs | 14 +-- src/service/mod.rs | 2 +- src/service/packet_router.rs | 160 ++++++++++------------------------- 4 files changed, 54 insertions(+), 124 deletions(-) diff --git a/src/beaconer.rs b/src/beaconer.rs index 50d30242..2ee5981a 100644 --- a/src/beaconer.rs +++ b/src/beaconer.rs @@ -287,7 +287,7 @@ fn mk_beacon_offset(key: &PublicKey, interval: Duration) -> Duration { use rand::{Rng, SeedableRng}; use sha2::Digest; - let hash = sha2::Sha256::digest(&key.to_vec()); + let hash = sha2::Sha256::digest(key.to_vec()); let mut rng = rand::rngs::StdRng::from_seed(*hash.as_ref()); Duration::seconds(rng.gen_range(0..interval.whole_seconds())) } diff --git a/src/service/conduit.rs b/src/service/conduit.rs index e6173fc5..c7b1dbdf 100644 --- a/src/service/conduit.rs +++ b/src/service/conduit.rs @@ -21,7 +21,7 @@ struct Conduit { } #[derive(Debug)] -pub struct ConduitService { +pub struct ConduitService> { pub uri: Uri, conduit: Option>, keypair: Arc, @@ -37,18 +37,18 @@ pub const CONDUIT_CAPACITY: usize = 50; pub const TCP_KEEP_ALIVE_DURATION: std::time::Duration = std::time::Duration::from_secs(300); #[tonic::async_trait] -pub trait ConduitClient { - async fn init( +pub trait ConduitClient { + async fn init( &mut self, endpoint: Channel, client_rx: ReceiverStream, ) -> Result>; - async fn register(&mut self, keypair: Arc) -> Result; + async fn register(&mut self, keypair: Arc) -> Result; } impl Conduit { - async fn new(uri: Uri, client: &mut C) -> Result { + async fn new>(uri: Uri, client: &mut C) -> Result { let endpoint = Endpoint::from(uri) .timeout(RPC_TIMEOUT) .connect_timeout(CONNECT_TIMEOUT) @@ -69,7 +69,7 @@ impl Conduit { Ok(self.tx.send(msg).await?) } - async fn register( + async fn register>( &mut self, client: &mut C, keypair: Arc, @@ -79,7 +79,7 @@ impl Conduit { } } -impl ConduitService { +impl> ConduitService { pub fn new(uri: Uri, client: C, keypair: Arc) -> Self { Self { uri, diff --git a/src/service/mod.rs b/src/service/mod.rs index f24eee2e..5f785a11 100644 --- a/src/service/mod.rs +++ b/src/service/mod.rs @@ -3,7 +3,7 @@ use std::time::Duration; pub const CONNECT_TIMEOUT: Duration = Duration::from_secs(10); pub const RPC_TIMEOUT: Duration = Duration::from_secs(5); -mod conduit; +pub mod conduit; pub mod config; pub mod entropy; pub mod packet_router; diff --git a/src/service/packet_router.rs b/src/service/packet_router.rs index a9327f92..4e1a0666 100644 --- a/src/service/packet_router.rs +++ b/src/service/packet_router.rs @@ -2,10 +2,11 @@ use std::{ sync::Arc, time::{SystemTime, UNIX_EPOCH}, }; +use tonic::async_trait; use crate::{ error::DecodeError, - service::{CONNECT_TIMEOUT, RPC_TIMEOUT}, + service::conduit::{ConduitClient, ConduitService}, sign, Error, Keypair, Result, }; @@ -15,81 +16,37 @@ use helium_proto::{ envelope_down_v1, envelope_up_v1, EnvelopeDownV1, EnvelopeUpV1, PacketRouterClient, PacketRouterRegisterV1, }, - Channel, Endpoint, + Channel, }, Message, }; use http::Uri; -use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; -type PacketClient = PacketRouterClient; - -type PacketSender = mpsc::Sender; -type PacketReceiver = tonic::Streaming; - // The router service maintains a re-connectable connection to a remote packet // router. The service will connect when (re)connect or a packet send is // attempted. It will ensure that the register rpc is called on the constructed // connection before a packet is sent. -#[derive(Debug)] -pub struct PacketRouterService { - pub uri: Uri, - conduit: Option, - keypair: Arc, -} - -/// A router conduit is the tx/rx stream pair for the `route` rpc on the -/// `packet_router` service. It does not connect on construction but on the -/// first messsage sent. -#[derive(Debug)] -struct PacketRouterConduit { - tx: PacketSender, - rx: PacketReceiver, -} - -pub const CONDUIT_CAPACITY: usize = 50; - -/// The time between TCP keepalive messages to keep the connection to the packet -/// router open. Some load balancer disconnect after a number of seconds. AWS -/// NLBs are hardcoded to 350s so we pick a slightly shorter timeframe to send -/// keepalives -pub const TCP_KEEP_ALIVE_DURATION: std::time::Duration = std::time::Duration::from_secs(300); - -impl PacketRouterConduit { - async fn new(uri: Uri) -> Result { - let endpoint = Endpoint::from(uri) - .timeout(RPC_TIMEOUT) - .connect_timeout(CONNECT_TIMEOUT) - .tcp_keepalive(Some(TCP_KEEP_ALIVE_DURATION)) - .connect_lazy(); - let mut client = PacketClient::new(endpoint); - let (tx, client_rx) = mpsc::channel(CONDUIT_CAPACITY); - let rx = client - .route(ReceiverStream::new(client_rx)) - .await? - .into_inner(); - Ok(Self { tx, rx }) - } - - async fn recv(&mut self) -> Result> { - match self.rx.message().await { - Ok(Some(msg)) => match msg.data { - Some(data) => Ok(Some(data)), - None => Err(DecodeError::invalid_envelope()), - }, - Ok(None) => Ok(None), - Err(err) => Err(err.into()), - } +pub struct PacketRouterService( + ConduitService, +); + +pub struct PacketRouterConduitClient {} + +#[async_trait] +impl ConduitClient for PacketRouterConduitClient { + async fn init( + &mut self, + endpoint: Channel, + client_rx: ReceiverStream, + ) -> Result> { + let mut client = PacketRouterClient::::new(endpoint); + let rx = client.route(client_rx).await?.into_inner(); + Ok(rx) } - async fn send(&mut self, msg: envelope_up_v1::Data) -> Result { - let msg = EnvelopeUpV1 { data: Some(msg) }; - Ok(self.tx.send(msg).await?) - } - - async fn register(&mut self, keypair: Arc) -> Result { + async fn register(&mut self, keypair: Arc) -> Result { let mut msg = PacketRouterRegisterV1 { timestamp: SystemTime::now() .duration_since(UNIX_EPOCH) @@ -103,69 +60,42 @@ impl PacketRouterConduit { let msg = EnvelopeUpV1 { data: Some(envelope_up_v1::Data::Register(msg)), }; - Ok(self.tx.send(msg).await?) + Ok(msg) + } +} + +impl std::ops::Deref for PacketRouterService { + type Target = ConduitService; + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl std::ops::DerefMut for PacketRouterService { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 } } impl PacketRouterService { pub fn new(uri: Uri, keypair: Arc) -> Self { - Self { - uri, - conduit: None, - keypair, - } + let client = PacketRouterConduitClient {}; + Self(ConduitService::new(uri, client, keypair)) } pub async fn send(&mut self, msg: envelope_up_v1::Data) -> Result { - if self.conduit.is_none() { - self.connect().await?; - } - // Unwrap since the above connect early exits if no conduit is created - match self.conduit.as_mut().unwrap().send(msg).await { - Ok(()) => Ok(()), - other => { - self.disconnect(); - other - } - } + let msg = EnvelopeUpV1 { data: Some(msg) }; + self.0.send(msg).await } pub async fn recv(&mut self) -> Result> { - // Since recv is usually called from a select loop we don't try a - // connect every time it is called since the rate for attempted - // connections in failure setups would be as high as the loop rate of - // the caller. This relies on either a reconnect attempt or a packet - // send at a later time to reconnect the conduit. - if self.conduit.is_none() { - futures::future::pending::<()>().await; - return Ok(None); - } - match self.conduit.as_mut().unwrap().recv().await { - Ok(msg) if msg.is_some() => Ok(msg), - other => { - self.disconnect(); - other - } + match self.0.recv().await { + Ok(Some(msg)) => match msg.data { + Some(data) => Ok(Some(data)), + None => Err(DecodeError::invalid_envelope()), + }, + Ok(None) => Ok(None), + Err(err) => Err(err), } } - - pub fn disconnect(&mut self) { - self.conduit = None; - } - - pub async fn connect(&mut self) -> Result { - let mut conduit = PacketRouterConduit::new(self.uri.clone()).await?; - conduit.register(self.keypair.clone()).await?; - self.conduit = Some(conduit); - Ok(()) - } - - pub async fn reconnect(&mut self) -> Result { - self.disconnect(); - self.connect().await - } - - pub fn is_connected(&self) -> bool { - self.conduit.is_some() - } } From 612e56acce7b79a76b895917c654699b2862f4cc Mon Sep 17 00:00:00 2001 From: Marc Nijdam Date: Mon, 9 Oct 2023 12:33:08 -0700 Subject: [PATCH 4/8] session keys for beaconer uses reconnect and poc iot conduit service --- src/beaconer.rs | 242 +++++++++++++++++++++++------------ src/packet_router/mod.rs | 70 ++++------ src/service/conduit.rs | 66 +++++----- src/service/mod.rs | 53 +++++++- src/service/packet_router.rs | 23 ++-- src/service/poc.rs | 91 ++++++++++--- 6 files changed, 344 insertions(+), 201 deletions(-) diff --git a/src/beaconer.rs b/src/beaconer.rs index 2ee5981a..1ef7d33a 100644 --- a/src/beaconer.rs +++ b/src/beaconer.rs @@ -3,14 +3,18 @@ use crate::{ error::DecodeError, gateway::{self, BeaconResp}, + keypair::mk_session_keypair, message_cache::MessageCache, region_watcher, - service::{entropy::EntropyService, poc::PocIotService}, + service::{entropy::EntropyService, poc::PocIotService, Reconnect}, settings::Settings, - sign, sync, Base64, Keypair, PacketUp, PublicKey, RegionParams, Result, + sign, sync, Base64, Error, Keypair, PacketUp, PublicKey, RegionParams, Result, }; use futures::TryFutureExt; -use helium_proto::{services::poc_lora, Message as ProtoMessage}; +use helium_proto::{ + services::poc_lora::{self, lora_stream_request_v1, lora_stream_response_v1}, + Message as ProtoMessage, +}; use http::Uri; use std::sync::Arc; use time::{Duration, Instant}; @@ -38,23 +42,28 @@ impl MessageSender { pub struct Beaconer { /// Beacon/Witness handling disabled disabled: bool, - /// keypair to sign reports with + /// keypair to sign session init with keypair: Arc, + /// session keypair to use for reports + session_key: Option>, /// gateway packet transmit message queue transmit: gateway::MessageSender, /// Our receive queue. messages: MessageReceiver, + /// Service to deliver PoC reports to + service: PocIotService, + /// Service reconnect trigger + reconnect: Reconnect, /// Region change queue region_watch: region_watcher::MessageReceiver, /// Beacon interval interval: Duration, - // Time next beacon attempt is o be made + // Time next beacon attempt is to be made next_beacon_time: Instant, /// Last seen beacons last_seen: MessageCache>, /// Use for channel plan and FR parameters - region_params: RegionParams, - poc_ingest_uri: Uri, + region_params: Arc, entropy_uri: Uri, } @@ -66,14 +75,16 @@ impl Beaconer { transmit: gateway::MessageSender, ) -> Self { let interval = Duration::seconds(settings.poc.interval as i64); - let poc_ingest_uri = settings.poc.ingest_uri.clone(); let entropy_uri = settings.poc.entropy_uri.clone(); + let service = PocIotService::new(settings.poc.ingest_uri.clone(), settings.keypair.clone()); let keypair = settings.keypair.clone(); - let region_params = region_watcher::current_value(®ion_watch); + let reconnect = Reconnect::default(); + let region_params = Arc::new(region_watcher::current_value(®ion_watch)); let disabled = settings.poc.disable; Self { keypair, + session_key: None, transmit, messages, region_watch, @@ -84,9 +95,10 @@ impl Beaconer { // cause the beacon to not occur next_beacon_time: Instant::now() + interval, region_params, - poc_ingest_uri, + service, entropy_uri, disabled, + reconnect, } } @@ -138,32 +150,46 @@ impl Beaconer { info!(delay = delay.whole_seconds(), "first beacon"); self.next_beacon_time = Instant::now() + delay; } - self.region_params = region_watcher::current_value(&self.region_watch); + self.region_params = Arc::new(region_watcher::current_value(&self.region_watch)); info!(region = RegionParams::to_string(&self.region_params), "region updated"); }, Err(_) => warn!("region watch disconnected"), - } - + }, + service_message = self.service.recv() => match service_message { + Ok(Some(lora_stream_response_v1::Response::Offer(message))) => { + let session_result = self.handle_session_offer(message).await; + if session_result.is_ok() { + // (Re)set retry count to max to maximize time to + // next disconnect from service + self.reconnect.retry_count = self.reconnect.max_retries; + } else { + // Failed fto handle session offer, disconnect + self.disconnect(); + } + self.reconnect.update_next_time(session_result.is_err()); + }, + Ok(None) => { + warn!("ingest disconnected"); + self.reconnect.update_next_time(true); + }, + Err(err) => { + warn!(?err, "ingest error"); + self.reconnect.update_next_time(true); + }, + }, + _ = self.reconnect.wait() => { + let reconnect_result = self.handle_reconnect().await; + self.reconnect.update_next_time(reconnect_result.is_err()); + }, } } } - pub async fn mk_beacon(&self) -> Result { - self.region_params.check_valid()?; - - let mut entropy_service = EntropyService::new(self.entropy_uri.clone()); - let remote_entropy = entropy_service.get_entropy().await?; - let local_entropy = beacon::Entropy::local()?; - - let beacon = beacon::Beacon::new(remote_entropy, local_entropy, &self.region_params)?; - Ok(beacon) - } - /// Sends a gateway-to-gateway packet. /// /// See [`gateway::MessageSender::transmit_beacon`] - pub async fn send_beacon(&self, beacon: beacon::Beacon) -> Result { + pub async fn send_beacon(&mut self, beacon: beacon::Beacon) -> Result { let beacon_id = beacon .beacon_data() .map(|data| data.to_b64()) @@ -178,59 +204,52 @@ impl Beaconer { .map_ok(|BeaconResp { powe, tmst }| (powe, tmst)) .await?; - // Construct concurrent futures for connecting to the poc ingester and - // signing the report - let report_fut = self.mk_beacon_report(beacon.clone(), powe, tmst); - let service_fut = PocIotService::connect(self.poc_ingest_uri.clone()); - - match tokio::try_join!(report_fut, service_fut) { - Ok((report, mut poc_service)) => { - poc_service - .submit_beacon(report) - .inspect_err(|err| warn!(beacon_id, %err, "submit poc beacon report")) - .inspect_ok(|_| info!(beacon_id, "poc beacon report submitted")) - .await? - } - Err(err) => { - warn!(beacon_id, %err, "poc beacon report"); - } - } + // Check if a session key is available to sign the report + let Some(session_key) = self.session_key.clone() else { + warn!(%beacon_id, "no session key for beacon report"); + return Err(Error::no_service()); + }; + + Self::mk_beacon_report(beacon.clone(), powe, tmst, session_key) + .and_then(|report| self.service.submit_beacon(report)) + .inspect_err(|err| warn!(beacon_id, %err, "submit poc beacon report")) + .inspect_ok(|_| info!(beacon_id, "poc beacon report submitted")) + .await?; Ok(beacon) } - async fn mk_beacon_report( - &self, - beacon: beacon::Beacon, - conducted_power: i32, - tmst: u32, - ) -> Result { - let mut report = poc_lora::LoraBeaconReportReqV1::try_from(beacon)?; - report.tx_power = conducted_power; - report.tmst = tmst; - report.pub_key = self.keypair.public_key().to_vec(); - report.signature = sign(self.keypair.clone(), report.encode_to_vec()).await?; - Ok(report) + async fn handle_session_offer( + &mut self, + message: poc_lora::LoraStreamSessionOfferV1, + ) -> Result { + let session_key = mk_session_key_init(self.keypair.clone(), &message) + .and_then(|(session_key, session_init)| { + self.service.send(session_init).map_ok(|_| session_key) + }) + .inspect_err(|err| warn!(%err, "failed to initialize session")) + .await?; + self.session_key = Some(session_key.clone()); + info!(session_key = %session_key.public_key(),"initialized session"); + Ok(()) } - async fn mk_witness_report( - &self, - packet: PacketUp, - payload: Vec, - ) -> Result { - let mut report = poc_lora::LoraWitnessReportReqV1::try_from(packet)?; - report.data = payload; - report.pub_key = self.keypair.public_key().to_vec(); - report.signature = sign(self.keypair.clone(), report.encode_to_vec()).await?; - Ok(report) + async fn handle_reconnect(&mut self) -> Result { + // Do not send waiting reports on ok here since we wait for a sesson + // offer. Also do not reset the reconnect retry counter since only a + // session key indicates a good connection + self.service + .reconnect() + .inspect_err(|err| warn!(%err, "failed to reconnect")) + .await } async fn handle_beacon_tick(&mut self) { if self.disabled { return; } - let last_beacon = self - .mk_beacon() + + let last_beacon = Self::mk_beacon(self.region_params.clone(), self.entropy_uri.clone()) .inspect_err(|err| warn!(%err, "construct beacon")) .and_then(|beacon| self.send_beacon(beacon)) .map_ok_or_else(|_| None, Some) @@ -261,24 +280,81 @@ impl Beaconer { return; } - // Construct concurrent futures for connecting to the poc ingester and - // signing the report - let report_fut = self.mk_witness_report(packet, beacon_data); - let service_fut = PocIotService::connect(self.poc_ingest_uri.clone()); - - match tokio::try_join!(report_fut, service_fut) { - Ok((report, mut poc_service)) => { - let _ = poc_service - .submit_witness(report) - .inspect_err(|err| warn!(beacon_id, %err, "submit poc witness report")) - .inspect_ok(|_| info!(beacon_id, "poc witness report submitted")) - .await; - } - Err(err) => { - warn!(%err, "poc witness report"); - } - } + // Check if a session key is available to sign the report + let Some(session_key) = self.session_key.clone() else { + warn!(%beacon_id, "no session key for witness report"); + return; + }; + + let _ = Self::mk_witness_report(packet, beacon_data, session_key) + .and_then(|report| self.service.submit_witness(report)) + .inspect_err(|err| warn!(beacon_id, %err, "submit poc witness report")) + .inspect_ok(|_| info!(beacon_id, "poc witness report submitted")) + .await; + } + + fn disconnect(&mut self) { + self.service.disconnect(); + self.session_key = None; + } + + pub async fn mk_beacon( + region_params: Arc, + entropy_uri: Uri, + ) -> Result { + region_params.check_valid()?; + + let mut entropy_service = EntropyService::new(entropy_uri); + let remote_entropy = entropy_service.get_entropy().await?; + let local_entropy = beacon::Entropy::local()?; + + let beacon = beacon::Beacon::new(remote_entropy, local_entropy, ®ion_params)?; + Ok(beacon) } + + async fn mk_beacon_report( + beacon: beacon::Beacon, + conducted_power: i32, + tmst: u32, + keypair: Arc, + ) -> Result { + let mut report = poc_lora::LoraBeaconReportReqV1::try_from(beacon)?; + report.tx_power = conducted_power; + report.tmst = tmst; + report.pub_key = keypair.public_key().to_vec(); + report.signature = sign(keypair.clone(), report.encode_to_vec()).await?; + Ok(report) + } + + async fn mk_witness_report( + packet: PacketUp, + payload: Vec, + keypair: Arc, + ) -> Result { + let mut report = poc_lora::LoraWitnessReportReqV1::try_from(packet)?; + report.data = payload; + report.pub_key = keypair.public_key().to_vec(); + report.signature = sign(keypair.clone(), report.encode_to_vec()).await?; + Ok(report) + } +} + +pub async fn mk_session_key_init( + keypair: Arc, + offer: &poc_lora::LoraStreamSessionOfferV1, +) -> Result<(Arc, lora_stream_request_v1::Request)> { + let session_keypair = Arc::new(mk_session_keypair()); + let session_key = session_keypair.public_key(); + + let mut session_init = poc_lora::LoraStreamSessionInitV1 { + pub_key: keypair.public_key().into(), + session_key: session_key.into(), + nonce: offer.nonce.clone(), + signature: vec![], + }; + session_init.signature = sign(keypair, session_init.encode_to_vec()).await?; + let envelope = lora_stream_request_v1::Request::SessionInit(session_init); + Ok((session_keypair, envelope)) } /// Construct a random but deterministic offset for beaconing. This is based on diff --git a/src/packet_router/mod.rs b/src/packet_router/mod.rs index 63f8b2ba..1afeeafe 100644 --- a/src/packet_router/mod.rs +++ b/src/packet_router/mod.rs @@ -2,10 +2,9 @@ use crate::{ gateway, keypair::mk_session_keypair, message_cache::{CacheMessage, MessageCache}, - service::packet_router::PacketRouterService, + service::{packet_router::PacketRouterService, Reconnect}, sign, sync, Base64, Keypair, PacketUp, Result, Settings, }; -use exponential_backoff::Backoff; use futures::TryFutureExt; use helium_proto::{ services::router::{ @@ -16,16 +15,12 @@ use helium_proto::{ }; use serde::Serialize; use std::{sync::Arc, time::Instant as StdInstant}; -use tokio::time::{self, Duration, Instant}; +use tokio::time::Duration; use tracing::{debug, info, warn}; const STORE_GC_INTERVAL: Duration = Duration::from_secs(60); -const RECONNECT_BACKOFF_RETRIES: u32 = 40; -const RECONNECT_BACKOFF_MIN_WAIT: Duration = Duration::from_secs(5); -const RECONNECT_BACKOFF_MAX_WAIT: Duration = Duration::from_secs(1800); // 30 minutes - #[derive(Debug)] pub enum Message { Uplink { @@ -64,7 +59,7 @@ pub struct PacketRouter { messages: MessageReceiver, transmit: gateway::MessageSender, service: PacketRouterService, - reconnect_retry: u32, + reconnect: Reconnect, session_key: Option>, keypair: Arc, store: MessageCache, @@ -80,6 +75,7 @@ impl PacketRouter { let service = PacketRouterService::new(router_settings.uri.clone(), settings.keypair.clone()); let store = MessageCache::new(router_settings.queue); + let reconnect = Reconnect::default(); Self { service, keypair: settings.keypair.clone(), @@ -87,7 +83,7 @@ impl PacketRouter { transmit, messages, store, - reconnect_retry: 0, + reconnect, } } @@ -95,16 +91,6 @@ impl PacketRouter { pub async fn run(&mut self, shutdown: &triggered::Listener) -> Result { info!(uri = %self.service.uri, "starting"); - let reconnect_backoff = Backoff::new( - RECONNECT_BACKOFF_RETRIES, - RECONNECT_BACKOFF_MIN_WAIT, - RECONNECT_BACKOFF_MAX_WAIT, - ); - - // Use a deadline based sleep for reconnect to allow the store gc timer - // to fire without resetting the reconnect timer - let mut reconnect_sleep = Instant::now() + RECONNECT_BACKOFF_MIN_WAIT; - loop { tokio::select! { _ = shutdown.clone() => { @@ -116,7 +102,7 @@ impl PacketRouter { if self.handle_uplink(packet, received).await.is_err() { self.disconnect(); warn!("router disconnected"); - reconnect_sleep = self.next_connect(&reconnect_backoff, true); + self.reconnect.update_next_time(true); }, Some(Message::Status(tx_resp)) => { let status = RouterStatus { @@ -128,54 +114,45 @@ impl PacketRouter { } None => warn!("ignoring closed message channel"), }, - _ = time::sleep_until(reconnect_sleep) => { - reconnect_sleep = self.handle_reconnect(&reconnect_backoff).await; + _ = self.reconnect.wait() => { + let reconnect_result = self.handle_reconnect().await; + self.reconnect.update_next_time(reconnect_result.is_err()); }, router_message = self.service.recv() => match router_message { Ok(Some(envelope_down_v1::Data::Packet(message))) => self.handle_downlink(message).await, Ok(Some(envelope_down_v1::Data::SessionOffer(message))) => { let session_result = self.handle_session_offer(message).await; - if session_result.is_err() { + if session_result.is_ok() { + // (Re)set retry count to max to maximize time to + // next disconnect from service + self.reconnect.retry_count = self.reconnect.max_retries; + } else { + // Failed fto handle session offer, disconnect self.disconnect(); } - reconnect_sleep = self.next_connect(&reconnect_backoff, session_result.is_err()); + self.reconnect.update_next_time(session_result.is_err()); }, Ok(None) => { warn!("router disconnected"); - reconnect_sleep = self.next_connect(&reconnect_backoff, true) + self.reconnect.update_next_time(true); }, Err(err) => { warn!(?err, "router error"); - reconnect_sleep = self.next_connect(&reconnect_backoff, true) + self.reconnect.update_next_time(true); }, } } } } - fn next_connect(&mut self, reconnect_backoff: &Backoff, inc_retry: bool) -> Instant { - if inc_retry { - if self.reconnect_retry == RECONNECT_BACKOFF_RETRIES { - self.reconnect_retry = 0; - } else { - self.reconnect_retry += 1; - } - } - let backoff = reconnect_backoff - .next(self.reconnect_retry) - .unwrap_or(RECONNECT_BACKOFF_MAX_WAIT); - Instant::now() + backoff - } - - async fn handle_reconnect(&mut self, reconnect_backoff: &Backoff) -> Instant { - let reconnect_result = self.service.reconnect().await; + async fn handle_reconnect(&mut self) -> Result { // Do not send waiting packets on ok here since we wait for a sesson // offer. Also do not reset the reconnect retry counter since only a // session key indicates a good connection - if let Err(err) = &reconnect_result { - warn!(%err, "failed to connect"); - } - self.next_connect(reconnect_backoff, reconnect_result.is_err()) + self.service + .reconnect() + .inspect_err(|err| warn!(%err, "failed to reconnect")) + .await } async fn handle_uplink(&mut self, uplink: PacketUp, received: StdInstant) -> Result { @@ -204,7 +181,6 @@ impl PacketRouter { self.send_waiting_packets(session_key.clone()) .inspect_err(|err| warn!(%err, "failed to send queued packets")) .await?; - self.reconnect_retry = RECONNECT_BACKOFF_RETRIES; Ok(()) } diff --git a/src/service/conduit.rs b/src/service/conduit.rs index c7b1dbdf..46c77229 100644 --- a/src/service/conduit.rs +++ b/src/service/conduit.rs @@ -1,25 +1,21 @@ -use std::sync::Arc; - use crate::{ service::{CONNECT_TIMEOUT, RPC_TIMEOUT}, Keypair, Result, }; - use helium_proto::services::{Channel, Endpoint}; - use http::Uri; +use std::sync::Arc; use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; -/// A conduit is the tx/rx stream pair for the a streaming rpc on the -/// `packet_router` service. It does not connect on construction but on the -/// first messsage sent. -#[derive(Debug)] -struct Conduit { - tx: mpsc::Sender, - rx: tonic::Streaming, -} +/// The time between TCP keepalive messages to keep the connection to the packet +/// router open. Some load balancer disconnect after a number of seconds. AWS +/// NLBs are hardcoded to 350s so we pick a slightly shorter timeframe to send +/// keepalives +pub const TCP_KEEP_ALIVE_DURATION: std::time::Duration = std::time::Duration::from_secs(300); +pub const CONDUIT_CAPACITY: usize = 50; +/// A conduit service maintains a re-connectable connection to a remote service. #[derive(Debug)] pub struct ConduitService> { pub uri: Uri, @@ -28,27 +24,29 @@ pub struct ConduitService> { client: C, } -pub const CONDUIT_CAPACITY: usize = 50; - -/// The time between TCP keepalive messages to keep the connection to the packet -/// router open. Some load balancer disconnect after a number of seconds. AWS -/// NLBs are hardcoded to 350s so we pick a slightly shorter timeframe to send -/// keepalives -pub const TCP_KEEP_ALIVE_DURATION: std::time::Duration = std::time::Duration::from_secs(300); +#[derive(Debug)] +struct Conduit { + tx: mpsc::Sender, + rx: tonic::Streaming, +} #[tonic::async_trait] pub trait ConduitClient { async fn init( &mut self, endpoint: Channel, + tx: mpsc::Sender, client_rx: ReceiverStream, + keypair: Arc, ) -> Result>; - - async fn register(&mut self, keypair: Arc) -> Result; } impl Conduit { - async fn new>(uri: Uri, client: &mut C) -> Result { + async fn new>( + uri: Uri, + client: &mut C, + keypair: Arc, + ) -> Result { let endpoint = Endpoint::from(uri) .timeout(RPC_TIMEOUT) .connect_timeout(CONNECT_TIMEOUT) @@ -56,7 +54,12 @@ impl Conduit { .connect_lazy(); let (tx, client_rx) = mpsc::channel(CONDUIT_CAPACITY); let rx = client - .init(endpoint, ReceiverStream::new(client_rx)) + .init( + endpoint, + tx.clone(), + ReceiverStream::new(client_rx), + keypair, + ) .await?; Ok(Self { tx, rx }) } @@ -68,15 +71,6 @@ impl Conduit { async fn send(&mut self, msg: U) -> Result { Ok(self.tx.send(msg).await?) } - - async fn register>( - &mut self, - client: &mut C, - keypair: Arc, - ) -> Result { - let msg = client.register(keypair).await?; - Ok(self.tx.send(msg).await?) - } } impl> ConduitService { @@ -107,7 +101,7 @@ impl> ConduitService { // Since recv is usually called from a select loop we don't try a // connect every time it is called since the rate for attempted // connections in failure setups would be as high as the loop rate of - // the caller. This relies on either a reconnect attempt or a packet + // the caller. This relies on either a reconnect attempt or a message // send at a later time to reconnect the conduit. if self.conduit.is_none() { futures::future::pending::<()>().await; @@ -127,10 +121,8 @@ impl> ConduitService { } pub async fn connect(&mut self) -> Result { - let mut conduit = Conduit::new(self.uri.clone(), &mut self.client).await?; - conduit - .register(&mut self.client, self.keypair.clone()) - .await?; + let conduit = + Conduit::new(self.uri.clone(), &mut self.client, self.keypair.clone()).await?; self.conduit = Some(conduit); Ok(()) } diff --git a/src/service/mod.rs b/src/service/mod.rs index 5f785a11..87920d67 100644 --- a/src/service/mod.rs +++ b/src/service/mod.rs @@ -1,10 +1,61 @@ -use std::time::Duration; +use tokio::time::{self, Duration, Instant}; pub const CONNECT_TIMEOUT: Duration = Duration::from_secs(10); pub const RPC_TIMEOUT: Duration = Duration::from_secs(5); +pub const RECONNECT_BACKOFF_RETRIES: u32 = 40; +pub const RECONNECT_BACKOFF_MIN_WAIT: Duration = Duration::from_secs(5); +pub const RECONNECT_BACKOFF_MAX_WAIT: Duration = Duration::from_secs(1800); // 30 minutes + pub mod conduit; pub mod config; pub mod entropy; pub mod packet_router; pub mod poc; + +#[derive(Debug)] +pub struct Reconnect { + backoff: exponential_backoff::Backoff, + next_time: Instant, + pub max_wait: Duration, + pub max_retries: u32, + pub retry_count: u32, +} + +impl Default for Reconnect { + fn default() -> Self { + Self::new( + RECONNECT_BACKOFF_RETRIES, + RECONNECT_BACKOFF_MIN_WAIT, + RECONNECT_BACKOFF_MAX_WAIT, + ) + } +} + +impl Reconnect { + pub fn new(retries: u32, min: Duration, max: Duration) -> Self { + Self { + backoff: exponential_backoff::Backoff::new(retries, min, max), + next_time: Instant::now() + min, + max_retries: retries, + max_wait: max, + retry_count: 0, + } + } + + pub fn wait(&self) -> time::Sleep { + time::sleep_until(self.next_time) + } + + pub fn update_next_time(&mut self, inc_retry: bool) { + if inc_retry { + if self.retry_count == self.max_retries { + self.retry_count = 0; + } else { + self.retry_count += 1; + } + } + let backoff = self.backoff.next(self.retry_count).unwrap_or(self.max_wait); + self.next_time = Instant::now() + backoff; + } +} diff --git a/src/service/packet_router.rs b/src/service/packet_router.rs index 4e1a0666..571e3dee 100644 --- a/src/service/packet_router.rs +++ b/src/service/packet_router.rs @@ -1,15 +1,8 @@ -use std::{ - sync::Arc, - time::{SystemTime, UNIX_EPOCH}, -}; -use tonic::async_trait; - use crate::{ error::DecodeError, service::conduit::{ConduitClient, ConduitService}, sign, Error, Keypair, Result, }; - use helium_proto::{ services::{ router::{ @@ -20,9 +13,14 @@ use helium_proto::{ }, Message, }; - use http::Uri; +use std::{ + sync::Arc, + time::{SystemTime, UNIX_EPOCH}, +}; +use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; +use tonic::async_trait; // The router service maintains a re-connectable connection to a remote packet // router. The service will connect when (re)connect or a packet send is @@ -39,14 +37,12 @@ impl ConduitClient for PacketRouterConduitClient { async fn init( &mut self, endpoint: Channel, + tx: mpsc::Sender, client_rx: ReceiverStream, + keypair: Arc, ) -> Result> { let mut client = PacketRouterClient::::new(endpoint); let rx = client.route(client_rx).await?.into_inner(); - Ok(rx) - } - - async fn register(&mut self, keypair: Arc) -> Result { let mut msg = PacketRouterRegisterV1 { timestamp: SystemTime::now() .duration_since(UNIX_EPOCH) @@ -60,7 +56,8 @@ impl ConduitClient for PacketRouterConduitClient { let msg = EnvelopeUpV1 { data: Some(envelope_up_v1::Data::Register(msg)), }; - Ok(msg) + tx.send(msg).await.map_err(|_| Error::channel())?; + Ok(rx) } } diff --git a/src/service/poc.rs b/src/service/poc.rs index 89ad11bc..1804763d 100644 --- a/src/service/poc.rs +++ b/src/service/poc.rs @@ -1,37 +1,88 @@ use crate::{ - service::{CONNECT_TIMEOUT, RPC_TIMEOUT}, - Result, + error::DecodeError, + service::conduit::{ConduitClient, ConduitService}, + Keypair, Result, }; use helium_proto::services::{ - self, - poc_lora::{LoraBeaconReportReqV1, LoraWitnessReportReqV1}, - Channel, Endpoint, + poc_lora::{ + self, lora_stream_request_v1, lora_stream_response_v1, LoraBeaconReportReqV1, + LoraStreamRequestV1, LoraStreamResponseV1, LoraWitnessReportReqV1, + }, + Channel, }; use http::Uri; +use std::sync::Arc; +use tokio::sync::mpsc; +use tokio_stream::wrappers::ReceiverStream; +use tonic::async_trait; -type PocIotClient = helium_proto::services::poc_lora::Client; +// The poc service maintains a re-connectable connection to a remote poc +// ingester. The service will (re)connect when a poc report send is attempted. +// It will ensure that the stream_requests rpc is called on the constructed +// connection before a report is sent. +pub struct PocIotService( + ConduitService, +); -#[derive(Debug)] -pub struct PocIotService(PocIotClient); +pub struct PocIotConduitClient {} + +#[async_trait] +impl ConduitClient for PocIotConduitClient { + async fn init( + &mut self, + endpoint: Channel, + _tx: mpsc::Sender, + client_rx: ReceiverStream, + _keypair: Arc, + ) -> Result> { + let mut client = poc_lora::Client::::new(endpoint); + let rx = client.stream_requests(client_rx).await?.into_inner(); + Ok(rx) + } +} + +impl std::ops::Deref for PocIotService { + type Target = ConduitService; + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl std::ops::DerefMut for PocIotService { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} impl PocIotService { - pub async fn connect(uri: Uri) -> Result { - let channel = Endpoint::from(uri) - .connect_timeout(CONNECT_TIMEOUT) - .timeout(RPC_TIMEOUT) - .connect() - .await?; - let client = services::poc_lora::Client::new(channel); - Ok(Self(client)) + pub fn new(uri: Uri, keypair: Arc) -> Self { + let client = PocIotConduitClient {}; + Self(ConduitService::new(uri, client, keypair)) + } + + pub async fn send(&mut self, msg: lora_stream_request_v1::Request) -> Result { + let msg = LoraStreamRequestV1 { request: Some(msg) }; + self.0.send(msg).await + } + + pub async fn recv(&mut self) -> Result> { + match self.0.recv().await { + Ok(Some(msg)) => match msg.response { + Some(data) => Ok(Some(data)), + None => Err(DecodeError::invalid_envelope()), + }, + Ok(None) => Ok(None), + Err(err) => Err(err), + } } pub async fn submit_beacon(&mut self, req: LoraBeaconReportReqV1) -> Result { - _ = self.0.submit_lora_beacon(req).await?; - Ok(()) + let msg = lora_stream_request_v1::Request::BeaconReport(req); + self.send(msg).await } pub async fn submit_witness(&mut self, req: LoraWitnessReportReqV1) -> Result { - _ = self.0.submit_lora_witness(req).await?; - Ok(()) + let msg = lora_stream_request_v1::Request::WitnessReport(req); + self.send(msg).await } } From 4394b21979bb794d03c4d9cdb95a8f52a442ee3e Mon Sep 17 00:00:00 2001 From: Marc Nijdam Date: Mon, 9 Oct 2023 13:02:44 -0700 Subject: [PATCH 5/8] Update src/beaconer.rs Co-authored-by: mawdegroot <73519916+mawdegroot@users.noreply.github.com> --- src/beaconer.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/beaconer.rs b/src/beaconer.rs index 1ef7d33a..267bad9e 100644 --- a/src/beaconer.rs +++ b/src/beaconer.rs @@ -163,7 +163,7 @@ impl Beaconer { // next disconnect from service self.reconnect.retry_count = self.reconnect.max_retries; } else { - // Failed fto handle session offer, disconnect + // Failed to handle session offer, disconnect self.disconnect(); } self.reconnect.update_next_time(session_result.is_err()); From 4b59c14fd4825ed572cee61765d695fb59f87301 Mon Sep 17 00:00:00 2001 From: Marc Nijdam Date: Mon, 9 Oct 2023 13:02:51 -0700 Subject: [PATCH 6/8] Update src/beaconer.rs Co-authored-by: mawdegroot <73519916+mawdegroot@users.noreply.github.com> --- src/beaconer.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/beaconer.rs b/src/beaconer.rs index 267bad9e..6a8872fb 100644 --- a/src/beaconer.rs +++ b/src/beaconer.rs @@ -235,7 +235,7 @@ impl Beaconer { } async fn handle_reconnect(&mut self) -> Result { - // Do not send waiting reports on ok here since we wait for a sesson + // Do not send waiting reports on ok here since we wait for a session // offer. Also do not reset the reconnect retry counter since only a // session key indicates a good connection self.service From 33f61ba00ca81bab8a1b7a06765ba6698231c14a Mon Sep 17 00:00:00 2001 From: Marc Nijdam Date: Tue, 10 Oct 2023 13:54:02 -0700 Subject: [PATCH 7/8] Remove most sharing of keypairs And move keypair and session keypair management to the conduit itself. --- src/api/mod.rs | 4 +- src/beaconer.rs | 91 +++++++++--------------------------- src/error.rs | 8 ++-- src/keyed_uri.rs | 4 +- src/keypair.rs | 66 +++++++++++++++++--------- src/lib.rs | 53 +++++++++++++++++---- src/packet_router/mod.rs | 85 +++++++-------------------------- src/service/conduit.rs | 52 ++++++++++++++++++++- src/service/config.rs | 17 +++++-- src/service/packet_router.rs | 37 +++++++++++++-- src/service/poc.rs | 45 ++++++++++++++---- 11 files changed, 263 insertions(+), 199 deletions(-) diff --git a/src/api/mod.rs b/src/api/mod.rs index 60f5e7fe..4509fb13 100644 --- a/src/api/mod.rs +++ b/src/api/mod.rs @@ -11,7 +11,7 @@ pub use helium_proto::{ }; pub use server::LocalServer; -use crate::{Error, Result}; +use crate::{Error, PublicKey, Result}; impl TryFrom for crate::packet_router::RouterStatus { type Error = Error; @@ -20,7 +20,7 @@ impl TryFrom for crate::packet_router::RouterStatus { Ok(Self { uri: http::Uri::from_str(&value.uri)?, connected: value.connected, - session_key: helium_crypto::PublicKey::try_from(value.session_key).ok(), + session_key: PublicKey::try_from(value.session_key).ok(), }) } } diff --git a/src/beaconer.rs b/src/beaconer.rs index 6a8872fb..2006f2d1 100644 --- a/src/beaconer.rs +++ b/src/beaconer.rs @@ -1,20 +1,15 @@ //! This module provides proof-of-coverage (PoC) beaconing support. - use crate::{ error::DecodeError, gateway::{self, BeaconResp}, - keypair::mk_session_keypair, message_cache::MessageCache, region_watcher, service::{entropy::EntropyService, poc::PocIotService, Reconnect}, settings::Settings, - sign, sync, Base64, Error, Keypair, PacketUp, PublicKey, RegionParams, Result, + sync, Base64, PacketUp, PublicKey, RegionParams, Result, }; use futures::TryFutureExt; -use helium_proto::{ - services::poc_lora::{self, lora_stream_request_v1, lora_stream_response_v1}, - Message as ProtoMessage, -}; +use helium_proto::services::poc_lora::{self, lora_stream_response_v1}; use http::Uri; use std::sync::Arc; use time::{Duration, Instant}; @@ -42,10 +37,6 @@ impl MessageSender { pub struct Beaconer { /// Beacon/Witness handling disabled disabled: bool, - /// keypair to sign session init with - keypair: Arc, - /// session keypair to use for reports - session_key: Option>, /// gateway packet transmit message queue transmit: gateway::MessageSender, /// Our receive queue. @@ -77,14 +68,11 @@ impl Beaconer { let interval = Duration::seconds(settings.poc.interval as i64); let entropy_uri = settings.poc.entropy_uri.clone(); let service = PocIotService::new(settings.poc.ingest_uri.clone(), settings.keypair.clone()); - let keypair = settings.keypair.clone(); let reconnect = Reconnect::default(); let region_params = Arc::new(region_watcher::current_value(®ion_watch)); let disabled = settings.poc.disable; Self { - keypair, - session_key: None, transmit, messages, region_watch, @@ -106,6 +94,7 @@ impl Beaconer { info!( beacon_interval = self.interval.whole_seconds(), disabled = self.disabled, + uri = %self.service.uri, "starting" ); @@ -140,7 +129,7 @@ impl Beaconer { if self.region_params.params.is_empty() { // Calculate a random but deterministic time offset // for this hotspot's beacons - let offset = mk_beacon_offset(self.keypair.public_key(), self.interval); + let offset = mk_beacon_offset(self.service.gateway_key(), self.interval); // Get a delay for the first beacon based on the // deterministic offset and the timestamp in the // first region params. If there's an error @@ -204,17 +193,16 @@ impl Beaconer { .map_ok(|BeaconResp { powe, tmst }| (powe, tmst)) .await?; - // Check if a session key is available to sign the report - let Some(session_key) = self.session_key.clone() else { - warn!(%beacon_id, "no session key for beacon report"); - return Err(Error::no_service()); - }; - - Self::mk_beacon_report(beacon.clone(), powe, tmst, session_key) - .and_then(|report| self.service.submit_beacon(report)) - .inspect_err(|err| warn!(beacon_id, %err, "submit poc beacon report")) - .inspect_ok(|_| info!(beacon_id, "poc beacon report submitted")) - .await?; + Self::mk_beacon_report( + beacon.clone(), + powe, + tmst, + self.service.gateway_key().clone(), + ) + .and_then(|report| self.service.submit_beacon(report)) + .inspect_err(|err| warn!(beacon_id, %err, "submit poc beacon report")) + .inspect_ok(|_| info!(beacon_id, "poc beacon report submitted")) + .await?; Ok(beacon) } @@ -223,15 +211,7 @@ impl Beaconer { &mut self, message: poc_lora::LoraStreamSessionOfferV1, ) -> Result { - let session_key = mk_session_key_init(self.keypair.clone(), &message) - .and_then(|(session_key, session_init)| { - self.service.send(session_init).map_ok(|_| session_key) - }) - .inspect_err(|err| warn!(%err, "failed to initialize session")) - .await?; - self.session_key = Some(session_key.clone()); - info!(session_key = %session_key.public_key(),"initialized session"); - Ok(()) + self.service.session_init(&message.nonce).await } async fn handle_reconnect(&mut self) -> Result { @@ -280,13 +260,7 @@ impl Beaconer { return; } - // Check if a session key is available to sign the report - let Some(session_key) = self.session_key.clone() else { - warn!(%beacon_id, "no session key for witness report"); - return; - }; - - let _ = Self::mk_witness_report(packet, beacon_data, session_key) + let _ = Self::mk_witness_report(packet, beacon_data, self.service.gateway_key().clone()) .and_then(|report| self.service.submit_witness(report)) .inspect_err(|err| warn!(beacon_id, %err, "submit poc witness report")) .inspect_ok(|_| info!(beacon_id, "poc witness report submitted")) @@ -295,7 +269,6 @@ impl Beaconer { fn disconnect(&mut self) { self.service.disconnect(); - self.session_key = None; } pub async fn mk_beacon( @@ -316,47 +289,27 @@ impl Beaconer { beacon: beacon::Beacon, conducted_power: i32, tmst: u32, - keypair: Arc, + gateway: PublicKey, ) -> Result { let mut report = poc_lora::LoraBeaconReportReqV1::try_from(beacon)?; + report.pub_key = gateway.to_vec(); report.tx_power = conducted_power; report.tmst = tmst; - report.pub_key = keypair.public_key().to_vec(); - report.signature = sign(keypair.clone(), report.encode_to_vec()).await?; Ok(report) } async fn mk_witness_report( packet: PacketUp, payload: Vec, - keypair: Arc, + gateway: PublicKey, ) -> Result { let mut report = poc_lora::LoraWitnessReportReqV1::try_from(packet)?; + report.pub_key = gateway.to_vec(); report.data = payload; - report.pub_key = keypair.public_key().to_vec(); - report.signature = sign(keypair.clone(), report.encode_to_vec()).await?; Ok(report) } } -pub async fn mk_session_key_init( - keypair: Arc, - offer: &poc_lora::LoraStreamSessionOfferV1, -) -> Result<(Arc, lora_stream_request_v1::Request)> { - let session_keypair = Arc::new(mk_session_keypair()); - let session_key = session_keypair.public_key(); - - let mut session_init = poc_lora::LoraStreamSessionInitV1 { - pub_key: keypair.public_key().into(), - session_key: session_key.into(), - nonce: offer.nonce.clone(), - signature: vec![], - }; - session_init.signature = sign(keypair, session_init.encode_to_vec()).await?; - let envelope = lora_stream_request_v1::Request::SessionInit(session_init); - Ok((session_keypair, envelope)) -} - /// Construct a random but deterministic offset for beaconing. This is based on /// the public key as of this hotspot as the seed to a random number generator. fn mk_beacon_offset(key: &PublicKey, interval: Duration) -> Duration { @@ -447,14 +400,14 @@ mod test { const PUBKEY_1: &str = "13WvV82S7QN3VMzMSieiGxvuaPKknMtf213E5JwPnboDkUfesKw"; const PUBKEY_2: &str = "14HZVR4bdF9QMowYxWrumcFBNfWnhDdD5XXA5za1fWwUhHxxFS1"; - let pubkey_1 = helium_crypto::PublicKey::from_str(PUBKEY_1).expect("public key"); + let pubkey_1 = crate::PublicKey::from_str(PUBKEY_1).expect("public key"); let offset_1 = mk_beacon_offset(&pubkey_1, time::Duration::hours(6)); // Same key and interval should always end up at the same offset assert_eq!( offset_1, mk_beacon_offset(&pubkey_1, time::Duration::hours(6)) ); - let pubkey_2 = helium_crypto::PublicKey::from_str(PUBKEY_2).expect("public key 2"); + let pubkey_2 = crate::PublicKey::from_str(PUBKEY_2).expect("public key 2"); let offset_2 = mk_beacon_offset(&pubkey_2, time::Duration::hours(6)); assert_eq!( offset_2, diff --git a/src/error.rs b/src/error.rs index 55671b08..e389bf06 100644 --- a/src/error.rs +++ b/src/error.rs @@ -77,8 +77,8 @@ pub enum ServiceError { Stream, #[error("channel closed")] Channel, - #[error("no service")] - NoService, + #[error("no active session")] + NoSession, #[error("age {age}s > {max_age}s")] Check { age: u64, max_age: u64 }, #[error("Unable to connect to local server. Check that `helium_gateway` is running.")] @@ -170,8 +170,8 @@ impl Error { Error::Service(ServiceError::Channel) } - pub fn no_service() -> Error { - Error::Service(ServiceError::NoService) + pub fn no_session() -> Error { + Error::Service(ServiceError::NoSession) } pub fn gateway_service_check(age: u64, max_age: u64) -> Error { diff --git a/src/keyed_uri.rs b/src/keyed_uri.rs index c929f5d8..50b5746a 100644 --- a/src/keyed_uri.rs +++ b/src/keyed_uri.rs @@ -43,7 +43,7 @@ impl TryFrom for KeyedUri { fn try_from(v: helium_proto::services::local::KeyedUri) -> Result { let result = Self { uri: http::Uri::from_str(&v.uri)?, - pubkey: Arc::new(helium_crypto::PublicKey::from_bytes(v.address)?), + pubkey: Arc::new(PublicKey::from_bytes(v.address)?), }; Ok(result) } @@ -63,7 +63,7 @@ impl TryFrom for KeyedUri { fn try_from(v: helium_proto::RoutingAddress) -> Result { let result = Self { uri: http::Uri::from_str(&String::from_utf8_lossy(&v.uri))?, - pubkey: Arc::new(helium_crypto::PublicKey::from_bytes(v.pub_key)?), + pubkey: Arc::new(PublicKey::from_bytes(v.pub_key)?), }; Ok(result) } diff --git a/src/keypair.rs b/src/keypair.rs index 97bae71e..39e75125 100644 --- a/src/keypair.rs +++ b/src/keypair.rs @@ -1,4 +1,4 @@ -use crate::*; +use crate::{error, Error, Result}; #[cfg(feature = "ecc608")] use helium_crypto::ecc608; #[cfg(feature = "tpm")] @@ -10,33 +10,21 @@ use serde::{de, Deserializer}; #[cfg(feature = "ecc608")] use std::path::Path; use std::{collections::HashMap, convert::TryFrom, fmt, fs, io, path, str::FromStr}; +use tonic::async_trait; #[derive(Debug)] pub struct Keypair(helium_crypto::Keypair); pub type PublicKey = helium_crypto::PublicKey; -pub fn load_from_file(path: &str) -> error::Result { - let data = fs::read(path)?; - Ok(helium_crypto::Keypair::try_from(&data[..])?.into()) -} - -pub fn save_to_file(keypair: &Keypair, path: &str) -> io::Result<()> { - if let Some(parent) = path::PathBuf::from(path).parent() { - fs::create_dir_all(parent)?; - }; - fs::write(path, keypair.0.to_vec())?; - Ok(()) +#[async_trait] +pub trait Sign { + async fn sign(&mut self, keypair: K) -> Result + where + K: AsRef + std::marker::Send + 'static; } -pub fn mk_session_keypair() -> Keypair { - let keypair = helium_crypto::Keypair::generate( - KeyTag { - network: Network::MainNet, - key_type: KeyType::Ed25519, - }, - &mut OsRng, - ); - keypair.into() +pub trait Verify { + fn verify(&self, pub_key: &crate::PublicKey) -> Result; } macro_rules! uri_error { @@ -61,7 +49,7 @@ impl FromStr for Keypair { .parse() .map_err(|err| uri_error!("invalid keypair url \"{str}\": {err:?}"))?; match url.scheme_str() { - Some("file") | None => match load_from_file(url.path()) { + Some("file") | None => match Self::load_from_file(url.path()) { Ok(k) => Ok(k), Err(Error::IO(io_error)) if io_error.kind() == std::io::ErrorKind::NotFound => { let args = KeypairArgs::from_uri(&url)?; @@ -74,7 +62,7 @@ impl FromStr for Keypair { &mut OsRng, ) .into(); - save_to_file(&new_key, url.path()).map_err(|err| { + new_key.save_to_file(url.path()).map_err(|err| { uri_error!("unable to save key file \"{}\": {err:?}", url.path()) })?; Ok(new_key) @@ -137,6 +125,38 @@ impl std::ops::Deref for Keypair { } } +impl Keypair { + pub fn new() -> Self { + let keypair = helium_crypto::Keypair::generate( + KeyTag { + network: Network::MainNet, + key_type: KeyType::Ed25519, + }, + &mut OsRng, + ); + keypair.into() + } + + pub fn load_from_file(path: &str) -> Result { + let data = fs::read(path)?; + Ok(helium_crypto::Keypair::try_from(&data[..])?.into()) + } + + pub fn save_to_file(&self, path: &str) -> io::Result<()> { + if let Some(parent) = path::PathBuf::from(path).parent() { + fs::create_dir_all(parent)?; + }; + fs::write(path, self.0.to_vec())?; + Ok(()) + } +} + +impl Default for Keypair { + fn default() -> Self { + Self::new() + } +} + #[derive(Debug)] struct KeypairArgs(HashMap); diff --git a/src/lib.rs b/src/lib.rs index ac59cf19..db491cd2 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -21,7 +21,7 @@ pub(crate) use crate::base64::Base64; pub use beacon::{Region, RegionParams}; pub use error::{Error, Result}; pub use keyed_uri::KeyedUri; -pub use keypair::{Keypair, PublicKey}; +pub use keypair::{Keypair, PublicKey, Sign, Verify}; pub use packet::{PacketDown, PacketUp}; pub use settings::Settings; @@ -34,7 +34,7 @@ pub type Future = Pin> + Send>>; /// A type alias for `Stream` that may result in `crate::error::Error` pub type Stream = Pin> + Send>>; -pub async fn sign(keypair: K, data: Vec) -> Result> +async fn sign(keypair: K, data: Vec) -> Result> where K: AsRef + std::marker::Send + 'static, { @@ -49,13 +49,46 @@ where .await? } -macro_rules! verify { - ($key: expr, $msg: expr, $sig: ident) => {{ - let mut _msg = $msg.clone(); - _msg.$sig = vec![]; - let buf = _msg.encode_to_vec(); - $key.verify(&buf, &$msg.$sig).map_err(Error::from) - }}; +macro_rules! impl_sign { + ($type: ty) => { + #[tonic::async_trait] + impl Sign for $type { + async fn sign(&mut self, keypair: K) -> Result + where + K: AsRef + std::marker::Send + 'static, + { + self.signature = crate::sign(keypair, self.encode_to_vec()).await?; + Ok(()) + } + } + }; } +pub(crate) use impl_sign; -pub(crate) use verify; +macro_rules! impl_verify { + ($type: ty) => { + impl crate::Verify for $type { + fn verify(&self, pub_key: &crate::PublicKey) -> Result { + use helium_crypto::Verify as _; + let mut _msg = self.clone(); + _msg.signature = vec![]; + let buf = _msg.encode_to_vec(); + pub_key + .verify(&buf, &self.signature) + .map_err(crate::Error::from) + } + } + }; +} +pub(crate) use impl_verify; + +// macro_rules! verify { +// ($key: expr, $msg: expr, $sig: ident) => {{ +// let mut _msg = $msg.clone(); +// _msg.$sig = vec![]; +// let buf = _msg.encode_to_vec(); +// $key.verify(&buf, &$msg.$sig).map_err(Error::from) +// }}; +// } + +// pub(crate) use verify; diff --git a/src/packet_router/mod.rs b/src/packet_router/mod.rs index 1afeeafe..1e156861 100644 --- a/src/packet_router/mod.rs +++ b/src/packet_router/mod.rs @@ -1,20 +1,15 @@ use crate::{ gateway, - keypair::mk_session_keypair, message_cache::{CacheMessage, MessageCache}, service::{packet_router::PacketRouterService, Reconnect}, - sign, sync, Base64, Keypair, PacketUp, Result, Settings, + sync, Base64, PacketUp, PublicKey, Result, Settings, }; use futures::TryFutureExt; -use helium_proto::{ - services::router::{ - envelope_down_v1, envelope_up_v1, PacketRouterPacketDownV1, PacketRouterPacketUpV1, - PacketRouterSessionInitV1, PacketRouterSessionOfferV1, - }, - Message as ProtoMessage, +use helium_proto::services::router::{ + envelope_down_v1, PacketRouterPacketDownV1, PacketRouterPacketUpV1, PacketRouterSessionOfferV1, }; use serde::Serialize; -use std::{sync::Arc, time::Instant as StdInstant}; +use std::{ops::Deref, time::Instant as StdInstant}; use tokio::time::Duration; use tracing::{debug, info, warn}; @@ -35,7 +30,7 @@ pub struct RouterStatus { #[serde(with = "http_serde::uri")] pub uri: http::Uri, pub connected: bool, - pub session_key: Option, + pub session_key: Option, } pub type MessageSender = sync::MessageSender; @@ -60,8 +55,6 @@ pub struct PacketRouter { transmit: gateway::MessageSender, service: PacketRouterService, reconnect: Reconnect, - session_key: Option>, - keypair: Arc, store: MessageCache, } @@ -78,8 +71,6 @@ impl PacketRouter { let reconnect = Reconnect::default(); Self { service, - keypair: settings.keypair.clone(), - session_key: None, transmit, messages, store, @@ -108,7 +99,7 @@ impl PacketRouter { let status = RouterStatus { uri: self.service.uri.clone(), connected: self.service.is_connected(), - session_key: self.session_key.as_ref().map(|keypair| keypair.public_key().to_owned()), + session_key: self.service.session_key().cloned(), }; tx_resp.send(status) } @@ -158,9 +149,7 @@ impl PacketRouter { async fn handle_uplink(&mut self, uplink: PacketUp, received: StdInstant) -> Result { self.store.push_back(uplink, received); if self.service.is_connected() { - if let Some(session_key) = &self.session_key { - self.send_waiting_packets(session_key.clone()).await?; - } + self.send_waiting_packets().await?; } Ok(()) } @@ -170,31 +159,22 @@ impl PacketRouter { } async fn handle_session_offer(&mut self, message: PacketRouterSessionOfferV1) -> Result { - let session_key = mk_session_key_init(self.keypair.clone(), &message) - .and_then(|(session_key, session_init)| { - self.service.send(session_init).map_ok(|_| session_key) - }) - .inspect_err(|err| warn!(%err, "failed to initialize session")) - .await?; - self.session_key = Some(session_key.clone()); - info!(session_key = %session_key.public_key(),"initialized session"); - self.send_waiting_packets(session_key.clone()) + self.service.session_init(&message.nonce).await?; + self.send_waiting_packets() .inspect_err(|err| warn!(%err, "failed to send queued packets")) - .await?; - Ok(()) + .await } fn disconnect(&mut self) { self.service.disconnect(); - self.session_key = None; } - async fn send_waiting_packets(&mut self, keypair: Arc) -> Result { + async fn send_waiting_packets(&mut self) -> Result { while let (removed, Some(packet)) = self.store.pop_front(STORE_GC_INTERVAL) { if removed > 0 { info!(removed, "discarded queued packets"); } - if let Err(err) = self.send_packet(&packet, keypair.clone()).await { + if let Err(err) = self.send_packet(&packet).await { warn!(%err, "failed to send uplink"); self.store.push_front(packet); return Err(err); @@ -203,44 +183,11 @@ impl PacketRouter { Ok(()) } - async fn send_packet( - &mut self, - packet: &CacheMessage, - keypair: Arc, - ) -> Result { + async fn send_packet(&mut self, packet: &CacheMessage) -> Result { debug!(packet_hash = packet.hash().to_b64(), "sending packet"); - let uplink = mk_uplink(packet, keypair).await?; - self.service.send(uplink).await + let mut uplink: PacketRouterPacketUpV1 = packet.deref().into(); + uplink.hold_time = packet.hold_time().as_millis() as u64; + self.service.send_uplink(uplink).await } } - -pub async fn mk_uplink( - packet: &CacheMessage, - keypair: Arc, -) -> Result { - use std::ops::Deref; - let mut uplink: PacketRouterPacketUpV1 = packet.deref().into(); - uplink.hold_time = packet.hold_time().as_millis() as u64; - uplink.signature = sign(keypair, uplink.encode_to_vec()).await?; - let envelope = envelope_up_v1::Data::Packet(uplink); - Ok(envelope) -} - -pub async fn mk_session_key_init( - keypair: Arc, - offer: &PacketRouterSessionOfferV1, -) -> Result<(Arc, envelope_up_v1::Data)> { - let session_keypair = Arc::new(mk_session_keypair()); - let session_key = session_keypair.public_key(); - - let mut session_init = PacketRouterSessionInitV1 { - gateway: keypair.public_key().into(), - session_key: session_key.into(), - nonce: offer.nonce.clone(), - signature: vec![], - }; - session_init.signature = sign(keypair, session_init.encode_to_vec()).await?; - let envelope = envelope_up_v1::Data::SessionInit(session_init); - Ok((session_keypair, envelope)) -} diff --git a/src/service/conduit.rs b/src/service/conduit.rs index 46c77229..4d660286 100644 --- a/src/service/conduit.rs +++ b/src/service/conduit.rs @@ -1,12 +1,14 @@ use crate::{ service::{CONNECT_TIMEOUT, RPC_TIMEOUT}, - Keypair, Result, + Error, Keypair, PublicKey, Result, Sign, }; +use futures::TryFutureExt; use helium_proto::services::{Channel, Endpoint}; use http::Uri; use std::sync::Arc; use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; +use tracing::{info, warn}; /// The time between TCP keepalive messages to keep the connection to the packet /// router open. Some load balancer disconnect after a number of seconds. AWS @@ -19,6 +21,7 @@ pub const CONDUIT_CAPACITY: usize = 50; #[derive(Debug)] pub struct ConduitService> { pub uri: Uri, + session_keypair: Option>, conduit: Option>, keypair: Arc, client: C, @@ -39,6 +42,13 @@ pub trait ConduitClient { client_rx: ReceiverStream, keypair: Arc, ) -> Result>; + + async fn mk_session_init( + &self, + nonce: &[u8], + session_key: &PublicKey, + keypair: Arc, + ) -> Result; } impl Conduit { @@ -77,9 +87,10 @@ impl> ConduitService { pub fn new(uri: Uri, client: C, keypair: Arc) -> Self { Self { uri, - conduit: None, keypair, client, + conduit: None, + session_keypair: None, } } @@ -118,6 +129,7 @@ impl> ConduitService { pub fn disconnect(&mut self) { self.conduit = None; + self.session_keypair = None; } pub async fn connect(&mut self) -> Result { @@ -135,4 +147,40 @@ impl> ConduitService { pub fn is_connected(&self) -> bool { self.conduit.is_some() } + + pub fn gateway_key(&self) -> &PublicKey { + self.keypair.public_key() + } + + pub fn session_key(&self) -> Option<&PublicKey> { + self.session_keypair.as_ref().map(|k| k.public_key()) + } + + pub fn session_keypair(&self) -> Option> { + self.session_keypair.clone() + } + + pub async fn session_sign(&self, msg: &mut M) -> Result { + if let Some(keypair) = self.session_keypair.as_ref() { + msg.sign(keypair.clone()).await?; + Ok(()) + } else { + Err(Error::no_session()) + } + } + + pub async fn session_init(&mut self, nonce: &[u8]) -> Result { + let session_keypair = Arc::new(Keypair::new()); + let session_key = session_keypair.public_key(); + let msg = self + .client + .mk_session_init(nonce, session_key, self.keypair.clone()) + .await?; + self.send(msg) + .inspect_err(|err| warn!(%err, "failed to initialize session")) + .await?; + self.session_keypair = Some(session_keypair.clone()); + info!(%session_key, "initialized session"); + Ok(()) + } } diff --git a/src/service/config.rs b/src/service/config.rs index eaeaaae8..d9fcb32e 100644 --- a/src/service/config.rs +++ b/src/service/config.rs @@ -1,10 +1,14 @@ use crate::{ + impl_sign, impl_verify, service::{CONNECT_TIMEOUT, RPC_TIMEOUT}, - sign, verify, Error, KeyedUri, Keypair, Region, RegionParams, Result, + KeyedUri, Keypair, Region, RegionParams, Result, Sign, Verify, }; -use helium_crypto::Verify; use helium_proto::{ - services::{self, iot_config::GatewayRegionParamsReqV1, Channel, Endpoint}, + services::{ + self, + iot_config::{GatewayRegionParamsReqV1, GatewayRegionParamsResV1}, + Channel, Endpoint, + }, Message, }; use std::sync::Arc; @@ -39,10 +43,13 @@ impl ConfigService { address: keypair.public_key().to_vec(), signature: vec![], }; - req.signature = sign(keypair, req.encode_to_vec()).await?; + req.sign(keypair).await?; let resp = self.client.region_params(req).await?.into_inner(); - verify!(&self.uri.pubkey, resp, signature)?; + resp.verify(&self.uri.pubkey)?; Ok(RegionParams::try_from(resp)?) } } + +impl_sign!(GatewayRegionParamsReqV1); +impl_verify!(GatewayRegionParamsResV1); diff --git a/src/service/packet_router.rs b/src/service/packet_router.rs index 571e3dee..05e9c3d7 100644 --- a/src/service/packet_router.rs +++ b/src/service/packet_router.rs @@ -1,13 +1,14 @@ use crate::{ error::DecodeError, + impl_sign, service::conduit::{ConduitClient, ConduitService}, - sign, Error, Keypair, Result, + Error, Keypair, PublicKey, Result, Sign, }; use helium_proto::{ services::{ router::{ envelope_down_v1, envelope_up_v1, EnvelopeDownV1, EnvelopeUpV1, PacketRouterClient, - PacketRouterRegisterV1, + PacketRouterPacketUpV1, PacketRouterRegisterV1, PacketRouterSessionInitV1, }, Channel, }, @@ -52,15 +53,38 @@ impl ConduitClient for PacketRouterConduitClient { signature: vec![], session_capable: true, }; - msg.signature = sign(keypair.clone(), msg.encode_to_vec()).await?; + msg.sign(keypair.clone()).await?; let msg = EnvelopeUpV1 { data: Some(envelope_up_v1::Data::Register(msg)), }; tx.send(msg).await.map_err(|_| Error::channel())?; Ok(rx) } + + async fn mk_session_init( + &self, + nonce: &[u8], + session_key: &PublicKey, + keypair: Arc, + ) -> Result { + let mut session_init = PacketRouterSessionInitV1 { + gateway: keypair.public_key().into(), + session_key: session_key.into(), + nonce: nonce.to_vec(), + signature: vec![], + }; + session_init.sign(keypair).await?; + let envelope = EnvelopeUpV1 { + data: Some(envelope_up_v1::Data::SessionInit(session_init)), + }; + Ok(envelope) + } } +impl_sign!(PacketRouterRegisterV1); +impl_sign!(PacketRouterPacketUpV1); +impl_sign!(PacketRouterSessionInitV1); + impl std::ops::Deref for PacketRouterService { type Target = ConduitService; fn deref(&self) -> &Self::Target { @@ -80,8 +104,11 @@ impl PacketRouterService { Self(ConduitService::new(uri, client, keypair)) } - pub async fn send(&mut self, msg: envelope_up_v1::Data) -> Result { - let msg = EnvelopeUpV1 { data: Some(msg) }; + pub async fn send_uplink(&mut self, mut msg: PacketRouterPacketUpV1) -> Result { + self.session_sign(&mut msg).await?; + let msg = EnvelopeUpV1 { + data: Some(envelope_up_v1::Data::Packet(msg)), + }; self.0.send(msg).await } diff --git a/src/service/poc.rs b/src/service/poc.rs index 1804763d..5a56478a 100644 --- a/src/service/poc.rs +++ b/src/service/poc.rs @@ -1,14 +1,18 @@ use crate::{ error::DecodeError, + impl_sign, service::conduit::{ConduitClient, ConduitService}, - Keypair, Result, + Keypair, PublicKey, Result, Sign, }; -use helium_proto::services::{ - poc_lora::{ - self, lora_stream_request_v1, lora_stream_response_v1, LoraBeaconReportReqV1, - LoraStreamRequestV1, LoraStreamResponseV1, LoraWitnessReportReqV1, +use helium_proto::{ + services::{ + poc_lora::{ + self, lora_stream_request_v1, lora_stream_response_v1, LoraBeaconReportReqV1, + LoraStreamRequestV1, LoraStreamResponseV1, LoraWitnessReportReqV1, + }, + Channel, }, - Channel, + Message as ProtoMessage, }; use http::Uri; use std::sync::Arc; @@ -39,8 +43,31 @@ impl ConduitClient for PocIotConduitC let rx = client.stream_requests(client_rx).await?.into_inner(); Ok(rx) } + + async fn mk_session_init( + &self, + nonce: &[u8], + session_key: &PublicKey, + keypair: Arc, + ) -> Result { + let mut session_init = poc_lora::LoraStreamSessionInitV1 { + pub_key: keypair.public_key().into(), + session_key: session_key.into(), + nonce: nonce.to_vec(), + signature: vec![], + }; + session_init.sign(keypair).await?; + let envelope = LoraStreamRequestV1 { + request: Some(lora_stream_request_v1::Request::SessionInit(session_init)), + }; + Ok(envelope) + } } +impl_sign!(poc_lora::LoraStreamSessionInitV1); +impl_sign!(poc_lora::LoraBeaconReportReqV1); +impl_sign!(poc_lora::LoraWitnessReportReqV1); + impl std::ops::Deref for PocIotService { type Target = ConduitService; fn deref(&self) -> &Self::Target { @@ -76,12 +103,14 @@ impl PocIotService { } } - pub async fn submit_beacon(&mut self, req: LoraBeaconReportReqV1) -> Result { + pub async fn submit_beacon(&mut self, mut req: LoraBeaconReportReqV1) -> Result { + self.0.session_sign(&mut req).await?; let msg = lora_stream_request_v1::Request::BeaconReport(req); self.send(msg).await } - pub async fn submit_witness(&mut self, req: LoraWitnessReportReqV1) -> Result { + pub async fn submit_witness(&mut self, mut req: LoraWitnessReportReqV1) -> Result { + self.0.session_sign(&mut req).await?; let msg = lora_stream_request_v1::Request::WitnessReport(req); self.send(msg).await } From 9fb5ecdeb16dd7f9481bace871830f71b6370d67 Mon Sep 17 00:00:00 2001 From: Marc Nijdam Date: Tue, 10 Oct 2023 15:09:25 -0700 Subject: [PATCH 8/8] Conduit code reuse * Make ConduitService::recv handle the cosed connection * Expose DecodeError to reduce error module references --- src/beaconer.rs | 15 +++------------ src/error.rs | 4 ++++ src/keypair.rs | 10 +++++----- src/lib.rs | 2 +- src/packet.rs | 2 +- src/packet_router/mod.rs | 16 ++++------------ src/service/conduit.rs | 14 +++++++++----- src/service/packet_router.rs | 17 ++++++----------- src/service/poc.rs | 17 ++++++----------- 9 files changed, 39 insertions(+), 58 deletions(-) diff --git a/src/beaconer.rs b/src/beaconer.rs index 2006f2d1..3e3a77c1 100644 --- a/src/beaconer.rs +++ b/src/beaconer.rs @@ -1,12 +1,11 @@ //! This module provides proof-of-coverage (PoC) beaconing support. use crate::{ - error::DecodeError, gateway::{self, BeaconResp}, message_cache::MessageCache, region_watcher, service::{entropy::EntropyService, poc::PocIotService, Reconnect}, settings::Settings, - sync, Base64, PacketUp, PublicKey, RegionParams, Result, + sync, Base64, DecodeError, PacketUp, PublicKey, RegionParams, Result, }; use futures::TryFutureExt; use helium_proto::services::poc_lora::{self, lora_stream_response_v1}; @@ -145,7 +144,7 @@ impl Beaconer { Err(_) => warn!("region watch disconnected"), }, service_message = self.service.recv() => match service_message { - Ok(Some(lora_stream_response_v1::Response::Offer(message))) => { + Ok(lora_stream_response_v1::Response::Offer(message)) => { let session_result = self.handle_session_offer(message).await; if session_result.is_ok() { // (Re)set retry count to max to maximize time to @@ -153,14 +152,10 @@ impl Beaconer { self.reconnect.retry_count = self.reconnect.max_retries; } else { // Failed to handle session offer, disconnect - self.disconnect(); + self.service.disconnect(); } self.reconnect.update_next_time(session_result.is_err()); }, - Ok(None) => { - warn!("ingest disconnected"); - self.reconnect.update_next_time(true); - }, Err(err) => { warn!(?err, "ingest error"); self.reconnect.update_next_time(true); @@ -267,10 +262,6 @@ impl Beaconer { .await; } - fn disconnect(&mut self) { - self.service.disconnect(); - } - pub async fn mk_beacon( region_params: Arc, entropy_uri: Uri, diff --git a/src/error.rs b/src/error.rs index e389bf06..a87074de 100644 --- a/src/error.rs +++ b/src/error.rs @@ -174,6 +174,10 @@ impl Error { Error::Service(ServiceError::NoSession) } + pub fn no_stream() -> Error { + Error::Service(ServiceError::Stream) + } + pub fn gateway_service_check(age: u64, max_age: u64) -> Error { Error::Service(ServiceError::Check { age, max_age }) } diff --git a/src/keypair.rs b/src/keypair.rs index 39e75125..dcea5d02 100644 --- a/src/keypair.rs +++ b/src/keypair.rs @@ -1,4 +1,4 @@ -use crate::{error, Error, Result}; +use crate::{DecodeError, Error, Result}; #[cfg(feature = "ecc608")] use helium_crypto::ecc608; #[cfg(feature = "tpm")] @@ -29,10 +29,10 @@ pub trait Verify { macro_rules! uri_error { ($format:expr) => { - error::DecodeError::keypair_uri(format!($format)) + DecodeError::keypair_uri(format!($format)) }; ($format:expr, $( $arg:expr ),+ ) => { - error::DecodeError::keypair_uri(format!($format, $( $arg ),+)) + DecodeError::keypair_uri(format!($format, $( $arg ),+)) }; } @@ -74,7 +74,7 @@ impl FromStr for Keypair { }, #[cfg(feature = "ecc608")] Some("ecc") => { - let args = KeypairArgs::from_uri(&url).map_err(error::DecodeError::keypair_uri)?; + let args = KeypairArgs::from_uri(&url).map_err(DecodeError::keypair_uri)?; let bus_address = url.port_u16().unwrap_or(96); let slot = args.get::("slot", 0)?; @@ -101,7 +101,7 @@ impl FromStr for Keypair { } #[cfg(feature = "tpm")] Some("tpm") => { - let args = KeypairArgs::from_uri(&url).map_err(error::DecodeError::keypair_uri)?; + let args = KeypairArgs::from_uri(&url).map_err(DecodeError::keypair_uri)?; let network = args.get("network", Network::MainNet)?; let path = url.path(); diff --git a/src/lib.rs b/src/lib.rs index db491cd2..dce3c23c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -19,7 +19,7 @@ mod base64; pub(crate) use crate::base64::Base64; pub use beacon::{Region, RegionParams}; -pub use error::{Error, Result}; +pub use error::{DecodeError, Error, Result}; pub use keyed_uri::KeyedUri; pub use keypair::{Keypair, PublicKey, Sign, Verify}; pub use packet::{PacketDown, PacketUp}; diff --git a/src/packet.rs b/src/packet.rs index a6ddf776..8d5b86ee 100644 --- a/src/packet.rs +++ b/src/packet.rs @@ -1,4 +1,4 @@ -use crate::{error::DecodeError, Error, PublicKey, Region, Result}; +use crate::{DecodeError, Error, PublicKey, Region, Result}; use helium_proto::services::{ poc_lora, router::{PacketRouterPacketDownV1, PacketRouterPacketUpV1}, diff --git a/src/packet_router/mod.rs b/src/packet_router/mod.rs index 1e156861..007aaba1 100644 --- a/src/packet_router/mod.rs +++ b/src/packet_router/mod.rs @@ -91,7 +91,7 @@ impl PacketRouter { message = self.messages.recv() => match message { Some(Message::Uplink{packet, received}) => if self.handle_uplink(packet, received).await.is_err() { - self.disconnect(); + self.service.disconnect(); warn!("router disconnected"); self.reconnect.update_next_time(true); }, @@ -110,8 +110,8 @@ impl PacketRouter { self.reconnect.update_next_time(reconnect_result.is_err()); }, router_message = self.service.recv() => match router_message { - Ok(Some(envelope_down_v1::Data::Packet(message))) => self.handle_downlink(message).await, - Ok(Some(envelope_down_v1::Data::SessionOffer(message))) => { + Ok(envelope_down_v1::Data::Packet(message)) => self.handle_downlink(message).await, + Ok(envelope_down_v1::Data::SessionOffer(message)) => { let session_result = self.handle_session_offer(message).await; if session_result.is_ok() { // (Re)set retry count to max to maximize time to @@ -119,14 +119,10 @@ impl PacketRouter { self.reconnect.retry_count = self.reconnect.max_retries; } else { // Failed fto handle session offer, disconnect - self.disconnect(); + self.service.disconnect(); } self.reconnect.update_next_time(session_result.is_err()); }, - Ok(None) => { - warn!("router disconnected"); - self.reconnect.update_next_time(true); - }, Err(err) => { warn!(?err, "router error"); self.reconnect.update_next_time(true); @@ -165,10 +161,6 @@ impl PacketRouter { .await } - fn disconnect(&mut self) { - self.service.disconnect(); - } - async fn send_waiting_packets(&mut self) -> Result { while let (removed, Some(packet)) = self.store.pop_front(STORE_GC_INTERVAL) { if removed > 0 { diff --git a/src/service/conduit.rs b/src/service/conduit.rs index 4d660286..bbffcf7c 100644 --- a/src/service/conduit.rs +++ b/src/service/conduit.rs @@ -108,7 +108,7 @@ impl> ConduitService { } } - pub async fn recv(&mut self) -> Result> { + pub async fn recv(&mut self) -> Result { // Since recv is usually called from a select loop we don't try a // connect every time it is called since the rate for attempted // connections in failure setups would be as high as the loop rate of @@ -116,13 +116,17 @@ impl> ConduitService { // send at a later time to reconnect the conduit. if self.conduit.is_none() { futures::future::pending::<()>().await; - return Ok(None); + return Err(Error::no_stream()); } match self.conduit.as_mut().unwrap().recv().await { - Ok(msg) if msg.is_some() => Ok(msg), - other => { + Ok(Some(msg)) => Ok(msg), + Ok(None) => { self.disconnect(); - other + Err(Error::no_stream()) + } + Err(err) => { + self.disconnect(); + Err(err) } } } diff --git a/src/service/packet_router.rs b/src/service/packet_router.rs index 05e9c3d7..394cf05a 100644 --- a/src/service/packet_router.rs +++ b/src/service/packet_router.rs @@ -1,8 +1,7 @@ use crate::{ - error::DecodeError, impl_sign, service::conduit::{ConduitClient, ConduitService}, - Error, Keypair, PublicKey, Result, Sign, + DecodeError, Error, Keypair, PublicKey, Result, Sign, }; use helium_proto::{ services::{ @@ -112,14 +111,10 @@ impl PacketRouterService { self.0.send(msg).await } - pub async fn recv(&mut self) -> Result> { - match self.0.recv().await { - Ok(Some(msg)) => match msg.data { - Some(data) => Ok(Some(data)), - None => Err(DecodeError::invalid_envelope()), - }, - Ok(None) => Ok(None), - Err(err) => Err(err), - } + pub async fn recv(&mut self) -> Result { + self.0.recv().await.and_then(|msg| match msg.data { + Some(data) => Ok(data), + None => Err(DecodeError::invalid_envelope()), + }) } } diff --git a/src/service/poc.rs b/src/service/poc.rs index 5a56478a..466eab1a 100644 --- a/src/service/poc.rs +++ b/src/service/poc.rs @@ -1,8 +1,7 @@ use crate::{ - error::DecodeError, impl_sign, service::conduit::{ConduitClient, ConduitService}, - Keypair, PublicKey, Result, Sign, + DecodeError, Keypair, PublicKey, Result, Sign, }; use helium_proto::{ services::{ @@ -92,15 +91,11 @@ impl PocIotService { self.0.send(msg).await } - pub async fn recv(&mut self) -> Result> { - match self.0.recv().await { - Ok(Some(msg)) => match msg.response { - Some(data) => Ok(Some(data)), - None => Err(DecodeError::invalid_envelope()), - }, - Ok(None) => Ok(None), - Err(err) => Err(err), - } + pub async fn recv(&mut self) -> Result { + self.0.recv().await.and_then(|msg| match msg.response { + Some(data) => Ok(data), + None => Err(DecodeError::invalid_envelope()), + }) } pub async fn submit_beacon(&mut self, mut req: LoraBeaconReportReqV1) -> Result {