From a6e199d7b0d8e9d93ab77e42f23b70e0495a4a9c Mon Sep 17 00:00:00 2001 From: "github-classroom[bot]" <66690702+github-classroom[bot]@users.noreply.github.com> Date: Tue, 12 Apr 2022 17:50:12 +0000 Subject: [PATCH 01/17] Setting up GitHub Classroom Feedback From d200cb3747207bdc5dd8c3cb382bd2812f09095b Mon Sep 17 00:00:00 2001 From: jkarmowska Date: Tue, 12 Apr 2022 21:05:33 +0200 Subject: [PATCH 02/17] update readme --- README.md | 26 +++++++++----------------- 1 file changed, 9 insertions(+), 17 deletions(-) diff --git a/README.md b/README.md index 7c2efa5..980af2a 100644 --- a/README.md +++ b/README.md @@ -1,27 +1,19 @@ -# Frobnicator +# Komunikator ## Autorzy -- Andrzej Głuszak (gr 9, @agluszak na githubie) -- Linus Torvalds (Uniwersytet Helsiński, @torvalds na githubie) +- Julia Karmowska (gr 4, @jkarmowska na githubie) ## Opis -Od zawsze chcieliśmy napisać grę komputerową. -Frobnicator będzie to gra platformowa, w której chodzi o to, żeby... - -Z grubsza będziemy wzorować się na [tym tutorialu](https://dev.to/sbelzile/rust-platformer-part-1-bevy-and-ecs-2pci). +Klient i serwer czatu w Ruscie ## Funkcjonalność -- Generowanie map -- Strzelanie -- AI dla wrogów (bardziej rozbudowane niż w tutorialu) -- Możliwość zapisywania i wczytywania stanu gry -- Punktacja +- Logowanie użytkowników i ustawianie nicków +- Wysyłanie wiadomości do grupy użytkowników +- Połączenie przez TCP ## Propozycja podziału na części -W pierwszej części stworzymy grę opartą na tutorialu (z lepszym AI) i jedną zahardcodowaną planszą. - -W drugiej części dodamy do tego losowy generator map, zapisywanie/wczytywanie stanu gry oraz system punktacji. +Pierwsza część - klient dołącza do jednej grupy na początku, grupy tworzone wraz z serwerem +Druga część - klient może być w kilku grupach, może opuścić grupę, stworzyć nową, wybrać do której grupy chce wysłać wiadomość ## Biblioteki -- Bevy -- może coś do serializacji danych? (czy mógłby Pan coś polecić?) +- Tokio - do obsługi współbieżności From 706122dd5103a8c03e55af9dbb53658fd1480e9e Mon Sep 17 00:00:00 2001 From: jkarmowska Date: Sun, 15 May 2022 12:49:51 +0200 Subject: [PATCH 03/17] simple server --- chat_app/Cargo.lock | 308 +++++++++++++++++++++++++++++++++++++++++++ chat_app/Cargo.toml | 9 ++ chat_app/src/main.rs | 40 ++++++ 3 files changed, 357 insertions(+) create mode 100644 chat_app/Cargo.lock create mode 100644 chat_app/Cargo.toml create mode 100644 chat_app/src/main.rs diff --git a/chat_app/Cargo.lock b/chat_app/Cargo.lock new file mode 100644 index 0000000..afa1d5c --- /dev/null +++ b/chat_app/Cargo.lock @@ -0,0 +1,308 @@ +# This file is automatically @generated by Cargo. +# It is not intended for manual editing. +version = 3 + +[[package]] +name = "autocfg" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" + +[[package]] +name = "bitflags" +version = "1.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" + +[[package]] +name = "bytes" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c4872d67bab6358e59559027aa3b9157c53d9358c51423c17554809a8858e0f8" + +[[package]] +name = "cfg-if" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" + +[[package]] +name = "chat_app" +version = "0.1.0" +dependencies = [ + "tokio", +] + +[[package]] +name = "hermit-abi" +version = "0.1.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62b467343b94ba476dcb2500d242dadbb39557df889310ac77c5d99100aaac33" +dependencies = [ + "libc", +] + +[[package]] +name = "libc" +version = "0.2.125" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5916d2ae698f6de9bfb891ad7a8d65c09d232dc58cc4ac433c7da3b2fd84bc2b" + +[[package]] +name = "lock_api" +version = "0.4.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "327fa5b6a6940e4699ec49a9beae1ea4845c6bab9314e4f84ac68742139d8c53" +dependencies = [ + "autocfg", + "scopeguard", +] + +[[package]] +name = "log" +version = "0.4.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "abb12e687cfb44aa40f41fc3978ef76448f9b6038cad6aef4259d3c095a2382e" +dependencies = [ + "cfg-if", +] + +[[package]] +name = "memchr" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2dffe52ecf27772e601905b7522cb4ef790d2cc203488bbd0e2fe85fcb74566d" + +[[package]] +name = "mio" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "713d550d9b44d89174e066b7a6217ae06234c10cb47819a88290d2b353c31799" +dependencies = [ + "libc", + "log", + "wasi", + "windows-sys", +] + +[[package]] +name = "num_cpus" +version = "1.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "19e64526ebdee182341572e50e9ad03965aa510cd94427a4549448f285e957a1" +dependencies = [ + "hermit-abi", + "libc", +] + +[[package]] +name = "once_cell" +version = "1.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87f3e037eac156d1775da914196f0f37741a274155e34a0b7e427c35d2a2ecb9" + +[[package]] +name = "parking_lot" +version = "0.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87f5ec2493a61ac0506c0f4199f99070cbe83857b0337006a30f3e6719b8ef58" +dependencies = [ + "lock_api", + "parking_lot_core", +] + +[[package]] +name = "parking_lot_core" +version = "0.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09a279cbf25cb0757810394fbc1e359949b59e348145c643a939a525692e6929" +dependencies = [ + "cfg-if", + "libc", + "redox_syscall", + "smallvec", + "windows-sys", +] + +[[package]] +name = "pin-project-lite" +version = "0.2.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e0a7ae3ac2f1173085d398531c705756c94a4c56843785df85a60c1a0afac116" + +[[package]] +name = "proc-macro2" +version = "1.0.38" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9027b48e9d4c9175fa2218adf3557f91c1137021739951d4932f5f8268ac48aa" +dependencies = [ + "unicode-xid", +] + +[[package]] +name = "quote" +version = "1.0.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a1feb54ed693b93a84e14094943b84b7c4eae204c512b7ccb95ab0c66d278ad1" +dependencies = [ + "proc-macro2", +] + +[[package]] +name = "redox_syscall" +version = "0.2.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62f25bc4c7e55e0b0b7a1d43fb893f4fa1361d0abe38b9ce4f323c2adfe6ef42" +dependencies = [ + "bitflags", +] + +[[package]] +name = "scopeguard" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd" + +[[package]] +name = "signal-hook-registry" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e51e73328dc4ac0c7ccbda3a494dfa03df1de2f46018127f60c693f2648455b0" +dependencies = [ + "libc", +] + +[[package]] +name = "smallvec" +version = "1.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f2dd574626839106c320a323308629dcb1acfc96e32a8cba364ddc61ac23ee83" + +[[package]] +name = "socket2" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "66d72b759436ae32898a2af0a14218dbf55efde3feeb170eb623637db85ee1e0" +dependencies = [ + "libc", + "winapi", +] + +[[package]] +name = "syn" +version = "1.0.94" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a07e33e919ebcd69113d5be0e4d70c5707004ff45188910106854f38b960df4a" +dependencies = [ + "proc-macro2", + "quote", + "unicode-xid", +] + +[[package]] +name = "tokio" +version = "1.18.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4903bf0427cf68dddd5aa6a93220756f8be0c34fcfa9f5e6191e103e15a31395" +dependencies = [ + "bytes", + "libc", + "memchr", + "mio", + "num_cpus", + "once_cell", + "parking_lot", + "pin-project-lite", + "signal-hook-registry", + "socket2", + "tokio-macros", + "winapi", +] + +[[package]] +name = "tokio-macros" +version = "1.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b557f72f448c511a979e2564e55d74e6c4432fc96ff4f6241bc6bded342643b7" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "unicode-xid" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "957e51f3646910546462e67d5f7599b9e4fb8acdd304b087a6494730f9eebf04" + +[[package]] +name = "wasi" +version = "0.11.0+wasi-snapshot-preview1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" + +[[package]] +name = "winapi" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419" +dependencies = [ + "winapi-i686-pc-windows-gnu", + "winapi-x86_64-pc-windows-gnu", +] + +[[package]] +name = "winapi-i686-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" + +[[package]] +name = "winapi-x86_64-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" + +[[package]] +name = "windows-sys" +version = "0.36.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ea04155a16a59f9eab786fe12a4a450e75cdb175f9e0d80da1e17db09f55b8d2" +dependencies = [ + "windows_aarch64_msvc", + "windows_i686_gnu", + "windows_i686_msvc", + "windows_x86_64_gnu", + "windows_x86_64_msvc", +] + +[[package]] +name = "windows_aarch64_msvc" +version = "0.36.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9bb8c3fd39ade2d67e9874ac4f3db21f0d710bee00fe7cab16949ec184eeaa47" + +[[package]] +name = "windows_i686_gnu" +version = "0.36.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "180e6ccf01daf4c426b846dfc66db1fc518f074baa793aa7d9b9aaeffad6a3b6" + +[[package]] +name = "windows_i686_msvc" +version = "0.36.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e2e7917148b2812d1eeafaeb22a97e4813dfa60a3f8f78ebe204bcc88f12f024" + +[[package]] +name = "windows_x86_64_gnu" +version = "0.36.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4dcd171b8776c41b97521e5da127a2d86ad280114807d0b2ab1e462bc764d9e1" + +[[package]] +name = "windows_x86_64_msvc" +version = "0.36.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c811ca4a8c853ef420abd8592ba53ddbbac90410fab6903b3e79972a631f7680" diff --git a/chat_app/Cargo.toml b/chat_app/Cargo.toml new file mode 100644 index 0000000..96a98d1 --- /dev/null +++ b/chat_app/Cargo.toml @@ -0,0 +1,9 @@ +[package] +name = "chat_app" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +tokio = { version = "1", features = ["full"] } \ No newline at end of file diff --git a/chat_app/src/main.rs b/chat_app/src/main.rs new file mode 100644 index 0000000..9a00176 --- /dev/null +++ b/chat_app/src/main.rs @@ -0,0 +1,40 @@ +use tokio::net::TcpListener; +use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; +use tokio::sync::broadcast; + +#[tokio::main] +async fn main() { + let listener = TcpListener::bind("localhost:8080").await.unwrap(); + + let (tx, _rx) = broadcast::channel(10); //sending strings to max 10 ppl who can connect + loop { + let (mut socket, addr) = listener.accept().await.unwrap(); + let tx = tx.clone();//klonowanie tx dla kazdego klienta + let mut rx = tx.subscribe();//nowy rx dla każdego klienta + + + tokio::spawn(async move { //spawnowanie taska obsługi klienta + let (reader, mut writer) = socket.split(); //podział socketa na czytanie i pisanie + + let mut reader = BufReader::new(reader); + let mut line = String::new(); + + loop { + tokio::select! { + result = reader.read_line( & mut line) => { //pierwszy branch - otrzymanie wiadomosci do przeslania + if result.unwrap() == 0 { + break; + } + tx.send((line.clone(), addr)).unwrap(); + line.clear(); + } + result = rx.recv() => { //drugi branch - otrzymanie wiadomosci przez broadcast + let (msg, other_addr) = result.unwrap(); + if addr != other_addr{ + writer.write_all(msg.as_bytes()).await.unwrap();} + } + } + } + }); + } +} From 1c9261f71307555d95a465e0a487421b89c1ce17 Mon Sep 17 00:00:00 2001 From: jkarmowska Date: Sun, 15 May 2022 12:51:58 +0200 Subject: [PATCH 04/17] add gitignore --- .gitignore | 1 + 1 file changed, 1 insertion(+) create mode 100644 .gitignore diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..2ca4ee2 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +/chat_app/target/ From a1e387f30fc3a843ab8ac48795ab88cc6b122339 Mon Sep 17 00:00:00 2001 From: jkarmowska Date: Sun, 15 May 2022 16:42:34 +0200 Subject: [PATCH 05/17] restructure files --- chat_app/Cargo.lock | 80 ++++++++++++++++++++++++- chat_app/Cargo.toml | 4 +- chat_app/src/bin/client.rs | 3 + chat_app/src/{main.rs => bin/server.rs} | 3 +- chat_app/src/lib.rs | 1 + chat_app/src/types.rs | 38 ++++++++++++ 6 files changed, 126 insertions(+), 3 deletions(-) create mode 100644 chat_app/src/bin/client.rs rename chat_app/src/{main.rs => bin/server.rs} (96%) create mode 100644 chat_app/src/lib.rs create mode 100644 chat_app/src/types.rs diff --git a/chat_app/Cargo.lock b/chat_app/Cargo.lock index afa1d5c..7ee34e8 100644 --- a/chat_app/Cargo.lock +++ b/chat_app/Cargo.lock @@ -30,7 +30,34 @@ checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" name = "chat_app" version = "0.1.0" dependencies = [ + "chrono", "tokio", + "uuid", +] + +[[package]] +name = "chrono" +version = "0.4.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "670ad68c9088c2a963aaa298cb369688cf3f9465ce5e2d4ca10e6e0098a1ce73" +dependencies = [ + "libc", + "num-integer", + "num-traits", + "serde", + "time", + "winapi", +] + +[[package]] +name = "getrandom" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9be70c98951c83b8d2f8f60d7065fa6d5146873094452a1008da8c2f1e4205ad" +dependencies = [ + "cfg-if", + "libc", + "wasi 0.10.2+wasi-snapshot-preview1", ] [[package]] @@ -81,10 +108,29 @@ checksum = "713d550d9b44d89174e066b7a6217ae06234c10cb47819a88290d2b353c31799" dependencies = [ "libc", "log", - "wasi", + "wasi 0.11.0+wasi-snapshot-preview1", "windows-sys", ] +[[package]] +name = "num-integer" +version = "0.1.45" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "225d3389fb3509a24c93f5c29eb6bde2586b98d9f016636dff58d7c6f7569cd9" +dependencies = [ + "autocfg", + "num-traits", +] + +[[package]] +name = "num-traits" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "578ede34cf02f8924ab9447f50c28075b4d3e5b269972345e7e0372b38c6cdcd" +dependencies = [ + "autocfg", +] + [[package]] name = "num_cpus" version = "1.13.1" @@ -163,6 +209,12 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd" +[[package]] +name = "serde" +version = "1.0.137" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "61ea8d54c77f8315140a05f4c7237403bf38b72704d031543aa1d16abbf517d1" + [[package]] name = "signal-hook-registry" version = "1.4.0" @@ -199,6 +251,16 @@ dependencies = [ "unicode-xid", ] +[[package]] +name = "time" +version = "0.1.43" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ca8a50ef2360fbd1eeb0ecd46795a87a19024eb4b53c5dc916ca1fd95fe62438" +dependencies = [ + "libc", + "winapi", +] + [[package]] name = "tokio" version = "1.18.2" @@ -236,6 +298,22 @@ version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "957e51f3646910546462e67d5f7599b9e4fb8acdd304b087a6494730f9eebf04" +[[package]] +name = "uuid" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bc5cf98d8186244414c848017f0e2676b3fcb46807f6668a97dfe67359a3c4b7" +dependencies = [ + "getrandom", + "serde", +] + +[[package]] +name = "wasi" +version = "0.10.2+wasi-snapshot-preview1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fd6fbd9a79829dd1ad0cc20627bf1ed606756a7f77edff7b66b7064f9cb327c6" + [[package]] name = "wasi" version = "0.11.0+wasi-snapshot-preview1" diff --git a/chat_app/Cargo.toml b/chat_app/Cargo.toml index 96a98d1..72a7f78 100644 --- a/chat_app/Cargo.toml +++ b/chat_app/Cargo.toml @@ -6,4 +6,6 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -tokio = { version = "1", features = ["full"] } \ No newline at end of file +tokio = { version = "1", features = ["full"] } +uuid = { version = "0.8.1", features = ["serde", "v4"] } +chrono = { version = "0.4.11", features = ["serde"] } diff --git a/chat_app/src/bin/client.rs b/chat_app/src/bin/client.rs new file mode 100644 index 0000000..cf74f49 --- /dev/null +++ b/chat_app/src/bin/client.rs @@ -0,0 +1,3 @@ +fn main() { + +} \ No newline at end of file diff --git a/chat_app/src/main.rs b/chat_app/src/bin/server.rs similarity index 96% rename from chat_app/src/main.rs rename to chat_app/src/bin/server.rs index 9a00176..4b1dadd 100644 --- a/chat_app/src/main.rs +++ b/chat_app/src/bin/server.rs @@ -1,6 +1,7 @@ -use tokio::net::TcpListener; +use tokio::net::{TcpListener}; use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; use tokio::sync::broadcast; +use chat_app::types; #[tokio::main] async fn main() { diff --git a/chat_app/src/lib.rs b/chat_app/src/lib.rs new file mode 100644 index 0000000..dd198c6 --- /dev/null +++ b/chat_app/src/lib.rs @@ -0,0 +1 @@ +pub mod types; \ No newline at end of file diff --git a/chat_app/src/types.rs b/chat_app/src/types.rs new file mode 100644 index 0000000..c9ab9f1 --- /dev/null +++ b/chat_app/src/types.rs @@ -0,0 +1,38 @@ +use uuid::Uuid; +use chrono::{DateTime, Utc}; + +#[derive(Debug, Clone, PartialEq)] +pub struct User { + pub id: Uuid, + pub name: String, +} + +impl User { + pub fn new(id: Uuid, name: &str) -> Self { + User { + id, + name: String::from(name), + } + } +} + +#[derive(Debug, Clone, PartialEq)] +pub struct Message { + pub id: Uuid, + pub user: User, + pub body: String, + pub group: Uuid, + pub time: DateTime, +} + +impl Message { + pub fn new(id: Uuid, user: User, msg: &str, group: Uuid, time: DateTime) -> Self { + Message { + id, + user, + body: String::from(msg), + group, + time, + } + } +} \ No newline at end of file From 87e53a09bc26f35f99d8f4dde337433b8dbd2c8a Mon Sep 17 00:00:00 2001 From: jkarmowska Date: Mon, 30 May 2022 15:24:59 +0200 Subject: [PATCH 06/17] add client --- chat_app/Cargo.lock | 78 ++++++++++++++++++++++++++++++++++ chat_app/Cargo.toml | 3 ++ chat_app/src/bin/client.rs | 87 +++++++++++++++++++++++++++++++++++++- chat_app/src/bin/server.rs | 22 ++++++---- 4 files changed, 181 insertions(+), 9 deletions(-) diff --git a/chat_app/Cargo.lock b/chat_app/Cargo.lock index 7ee34e8..81baa90 100644 --- a/chat_app/Cargo.lock +++ b/chat_app/Cargo.lock @@ -31,7 +31,10 @@ name = "chat_app" version = "0.1.0" dependencies = [ "chrono", + "serde", + "serde_json", "tokio", + "tracing", "uuid", ] @@ -69,6 +72,18 @@ dependencies = [ "libc", ] +[[package]] +name = "itoa" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "112c678d4050afce233f4f2852bb2eb519230b3cf12f33585275537d7e41578d" + +[[package]] +name = "lazy_static" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" + [[package]] name = "libc" version = "0.2.125" @@ -203,6 +218,12 @@ dependencies = [ "bitflags", ] +[[package]] +name = "ryu" +version = "1.0.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f3f6f92acf49d1b98f7a81226834412ada05458b7364277387724a237f062695" + [[package]] name = "scopeguard" version = "1.1.0" @@ -214,6 +235,31 @@ name = "serde" version = "1.0.137" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "61ea8d54c77f8315140a05f4c7237403bf38b72704d031543aa1d16abbf517d1" +dependencies = [ + "serde_derive", +] + +[[package]] +name = "serde_derive" +version = "1.0.137" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1f26faba0c3959972377d3b2d306ee9f71faee9714294e41bb777f83f88578be" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "serde_json" +version = "1.0.81" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b7ce2b32a1aed03c558dc61a5cd328f15aff2dbc17daad8fb8af04d2100e15c" +dependencies = [ + "itoa", + "ryu", + "serde", +] [[package]] name = "signal-hook-registry" @@ -292,6 +338,38 @@ dependencies = [ "syn", ] +[[package]] +name = "tracing" +version = "0.1.34" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5d0ecdcb44a79f0fe9844f0c4f33a342cbcbb5117de8001e6ba0dc2351327d09" +dependencies = [ + "cfg-if", + "pin-project-lite", + "tracing-attributes", + "tracing-core", +] + +[[package]] +name = "tracing-attributes" +version = "0.1.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cc6b8ad3567499f98a1db7a752b07a7c8c7c7c34c332ec00effb2b0027974b7c" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "tracing-core" +version = "0.1.26" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f54c8ca710e81886d498c2fd3331b56c93aa248d49de2222ad2742247c60072f" +dependencies = [ + "lazy_static", +] + [[package]] name = "unicode-xid" version = "0.2.3" diff --git a/chat_app/Cargo.toml b/chat_app/Cargo.toml index 72a7f78..f727795 100644 --- a/chat_app/Cargo.toml +++ b/chat_app/Cargo.toml @@ -9,3 +9,6 @@ edition = "2021" tokio = { version = "1", features = ["full"] } uuid = { version = "0.8.1", features = ["serde", "v4"] } chrono = { version = "0.4.11", features = ["serde"] } +serde = { version = "1.0.105", features = ["derive"] } +serde_json = "1.0.50" +tracing = "0.1.34" \ No newline at end of file diff --git a/chat_app/src/bin/client.rs b/chat_app/src/bin/client.rs index cf74f49..5352de6 100644 --- a/chat_app/src/bin/client.rs +++ b/chat_app/src/bin/client.rs @@ -1,3 +1,88 @@ -fn main() { +use std::io::stdin; +use std::net::SocketAddr; +use std::sync::mpsc::Receiver; +use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter}; +use tokio::net::{TcpStream, tcp::ReadHalf}; +use tokio::net::tcp::WriteHalf; +use tracing::{error, info}; +use tokio::sync::mpsc; +use tokio::sync::mpsc::error::SendError; +const SERVER: &str = "localhost:8080"; +const QUIT: &str = "/quit\n"; + + +//async fn receive(mut reader: BufReader, receiver: Receiver<(String, SocketAddr)>) {} + +#[tokio::main] +async fn main() { + let mut stream = TcpStream::connect(SERVER).await.unwrap(); + let (read, mut write) = stream.into_split(); + println!("Connection with server established at {}, enter {} to exit chat", &mut read.peer_addr().unwrap(), QUIT.trim()); + + println!("Wait until you will be asked to enter your username"); + + let mut reader = BufReader::new(read); + let mut writer = BufWriter::new(write); + let (sender, mut receiver) = mpsc::channel::(10);//10 wiadomości + + let mut line = String::new(); + + //reader + tokio::spawn(async move { + loop { + line.clear(); + + let received = reader.read_line(&mut line).await; + if let Ok(_received_msg) = receiver.try_recv() { //wiadomość od drugiego taska o zakończeniu + println!("You have left the chat."); + break; + } + match received { + Ok(len) => + { + if len == 0 { + break; + } + println!("{}", line); + } + Err(_) => { + println!("Connection lost! type {} to quit", QUIT.trim()); + } + } + } + }); + + //writer + let mut input = String::new(); + loop { + input.clear(); + let msg_send = stdin().read_line(&mut input); //user wpisuje wiadomość + match msg_send { + Ok(_) => { + if input == String::from(QUIT) {//user wpisał quit + let quit_msg = input.clone(); + match sender.send(quit_msg).await { //wysyłanie waidomości o zakończeniu + Ok(_) => {} + Err(_) => { + println!("Exiting "); + } + } + break; + } else { + let res = writer.write(input.as_bytes()).await; //wysyłanie do servera wiadomości + match res { + Ok(_) => { writer.flush().await.expect("Failed to flush buffer") } + Err(_) => { + println!("Connection lost! type {} to quit", QUIT.trim()); + break; + } + } + } + } + Err(_) => { + println!("Error while reading input. "); + } + } + } } \ No newline at end of file diff --git a/chat_app/src/bin/server.rs b/chat_app/src/bin/server.rs index 4b1dadd..817c431 100644 --- a/chat_app/src/bin/server.rs +++ b/chat_app/src/bin/server.rs @@ -1,35 +1,41 @@ +use std::net::SocketAddr; use tokio::net::{TcpListener}; use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; use tokio::sync::broadcast; use chat_app::types; + +async fn manageClient(){ + +} + #[tokio::main] async fn main() { let listener = TcpListener::bind("localhost:8080").await.unwrap(); - let (tx, _rx) = broadcast::channel(10); //sending strings to max 10 ppl who can connect + let (sender, _rx) = broadcast::channel::<(String, SocketAddr) >(10); //sending strings to max 10 ppl who can connect loop { let (mut socket, addr) = listener.accept().await.unwrap(); - let tx = tx.clone();//klonowanie tx dla kazdego klienta - let mut rx = tx.subscribe();//nowy rx dla każdego klienta - + let sender = sender.clone();//klonowanie tx dla kazdego klienta + let mut receiver = sender.subscribe();//nowy rx dla każdego klienta + println!("accepted player"); tokio::spawn(async move { //spawnowanie taska obsługi klienta let (reader, mut writer) = socket.split(); //podział socketa na czytanie i pisanie - let mut reader = BufReader::new(reader); + let mut reader = BufReader::new(reader); //buffer czyta z socketa tcp od klienta let mut line = String::new(); loop { tokio::select! { result = reader.read_line( & mut line) => { //pierwszy branch - otrzymanie wiadomosci do przeslania - if result.unwrap() == 0 { + if result.unwrap() == 0 { //koniec break; } - tx.send((line.clone(), addr)).unwrap(); + sender.send((line.clone(), addr)).unwrap(); line.clear(); } - result = rx.recv() => { //drugi branch - otrzymanie wiadomosci przez broadcast + result = receiver.recv() => { //drugi branch - otrzymanie wiadomosci przez broadcast let (msg, other_addr) = result.unwrap(); if addr != other_addr{ writer.write_all(msg.as_bytes()).await.unwrap();} From 98df58d053cf83e92a43bd90d7bd59a1f8a728ad Mon Sep 17 00:00:00 2001 From: jkarmowska Date: Mon, 30 May 2022 20:26:10 +0200 Subject: [PATCH 07/17] add server --- chat_app/Cargo.lock | 19 +++++ chat_app/Cargo.toml | 3 +- chat_app/src/bin/client.rs | 9 +- chat_app/src/bin/server.rs | 169 ++++++++++++++++++++++++++++++------- 4 files changed, 161 insertions(+), 39 deletions(-) diff --git a/chat_app/Cargo.lock b/chat_app/Cargo.lock index 81baa90..6208e85 100644 --- a/chat_app/Cargo.lock +++ b/chat_app/Cargo.lock @@ -31,6 +31,7 @@ name = "chat_app" version = "0.1.0" dependencies = [ "chrono", + "dashmap", "serde", "serde_json", "tokio", @@ -52,6 +53,18 @@ dependencies = [ "winapi", ] +[[package]] +name = "dashmap" +version = "5.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3495912c9c1ccf2e18976439f4443f3fee0fd61f424ff99fde6a66b15ecb448f" +dependencies = [ + "cfg-if", + "hashbrown", + "lock_api", + "parking_lot_core", +] + [[package]] name = "getrandom" version = "0.2.6" @@ -63,6 +76,12 @@ dependencies = [ "wasi 0.10.2+wasi-snapshot-preview1", ] +[[package]] +name = "hashbrown" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "db0d4cf898abf0081f964436dc980e96670a0f36863e4b83aaacdb65c9d7ccc3" + [[package]] name = "hermit-abi" version = "0.1.19" diff --git a/chat_app/Cargo.toml b/chat_app/Cargo.toml index f727795..d4c095d 100644 --- a/chat_app/Cargo.toml +++ b/chat_app/Cargo.toml @@ -11,4 +11,5 @@ uuid = { version = "0.8.1", features = ["serde", "v4"] } chrono = { version = "0.4.11", features = ["serde"] } serde = { version = "1.0.105", features = ["derive"] } serde_json = "1.0.50" -tracing = "0.1.34" \ No newline at end of file +tracing = "0.1.34" +dashmap = "5.3.4" \ No newline at end of file diff --git a/chat_app/src/bin/client.rs b/chat_app/src/bin/client.rs index 5352de6..23f091e 100644 --- a/chat_app/src/bin/client.rs +++ b/chat_app/src/bin/client.rs @@ -1,12 +1,7 @@ use std::io::stdin; -use std::net::SocketAddr; -use std::sync::mpsc::Receiver; use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter}; -use tokio::net::{TcpStream, tcp::ReadHalf}; -use tokio::net::tcp::WriteHalf; -use tracing::{error, info}; +use tokio::net::{TcpStream}; use tokio::sync::mpsc; -use tokio::sync::mpsc::error::SendError; const SERVER: &str = "localhost:8080"; const QUIT: &str = "/quit\n"; @@ -44,7 +39,7 @@ async fn main() { if len == 0 { break; } - println!("{}", line); + print!("{}", line); } Err(_) => { println!("Connection lost! type {} to quit", QUIT.trim()); diff --git a/chat_app/src/bin/server.rs b/chat_app/src/bin/server.rs index 817c431..f55bc96 100644 --- a/chat_app/src/bin/server.rs +++ b/chat_app/src/bin/server.rs @@ -1,47 +1,154 @@ -use std::net::SocketAddr; +use std::sync::Arc; use tokio::net::{TcpListener}; -use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; +use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, }; use tokio::sync::broadcast; -use chat_app::types; +use dashmap::DashMap; +use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf}; +use tokio::sync::broadcast::Sender; -async fn manageClient(){ +const LOCAL: &str = "localhost:8080"; +const MAX_CLIENT_NUM: usize = 20; +const NAME_SIZE: usize = 30; +const MESSAGE_SIZE: usize = 256; +const ERROR_MSG: &str = "**************ERROR_MSG*************"; + + +async fn write_buf<'a>(writer: &mut OwnedWriteHalf, message: &str) { + writer.write(message.as_bytes()).await.expect("Failed to write msg"); + writer.flush().await.expect("Failed to flush after write"); +} + +//czeka na username +async fn request_username(reader: &mut BufReader, + writer: &mut OwnedWriteHalf, + clients: &mut Arc>, +) -> Result { + let mut buffer = String::with_capacity(NAME_SIZE); + loop { + buffer.clear(); + match reader.read_line(&mut buffer).await { + Ok(_) => { + let mut name = String::from(buffer.trim()); //ucinanie '\n' + name.truncate(NAME_SIZE); //skracanie username + if clients.contains_key(name.as_str()) { + write_buf(writer, "Name already taken, please try another!").await; + } else { + return Ok(name); //ok - poprawny username + } + } + Err(_) => { return Err(String::from(ERROR_MSG)); } + } + } +} + + +async fn receive_msg(reader: &mut BufReader, user: &str) -> Result { + let mut buffer = String::with_capacity(MESSAGE_SIZE); + match reader.read_line(&mut buffer).await { + Ok(_) => { + let mut msg = format!("{}:{}", user, buffer.as_str()); + if msg.len() > MESSAGE_SIZE { + msg.truncate(MESSAGE_SIZE - 1); + msg.push('\n'); + } + Ok(msg) + } + Err(_) => { Err(String::from(ERROR_MSG)) } + } +} + +async fn broadcast(clients: &mut Arc>, message: &str, sender:&str) { + for mut entry in clients.iter_mut() { + if entry.key()!=sender{ + write_buf(entry.value_mut(), message).await; + } + } +} + +async fn handle_connection(reader: &mut BufReader, name: &str, sender: Sender<(String, String)>) +{ + loop { + let msg = receive_msg(reader, name).await; + match msg { + Ok(_) => { + if sender.send((msg.clone().unwrap(), name.parse().unwrap())).is_ok() {} else { + println!("DEBUG: Message from {} could not be send to broadcast", &name); + } + } + Err(_) => { + format!("{} has left the chat", &name); + if sender.send((msg.unwrap(), name.parse().unwrap())).is_ok() {} else { + println!("DEBUG: Exit message from {} vould not be send to broadcast", &name); + } + break; + } + } + } } #[tokio::main] async fn main() { - let listener = TcpListener::bind("localhost:8080").await.unwrap(); + let listener = TcpListener::bind(LOCAL).await.unwrap(); + + let clients: Arc> = Arc::new(DashMap::with_capacity(MAX_CLIENT_NUM));//klienci + + let (sender, mut receiver) = broadcast::channel::<(String,String) >(10); //sending strings to max 10 ppl who can connect + let mut clients_cp = Arc::clone(&clients); + + //task który wysyła do receivera gdy client otrzyma wiadomość - + // wtedy receiver robi broadcast do wszystkich klientów + tokio::spawn(async move { + loop { + if let Ok(msg) = receiver.try_recv() { + println!("Broadcasting message : {}", msg.0); + broadcast(&mut clients_cp, &msg.0, &msg.1).await; + } + } + }); - let (sender, _rx) = broadcast::channel::<(String, SocketAddr) >(10); //sending strings to max 10 ppl who can connect + + //nowi klienci loop { - let (mut socket, addr) = listener.accept().await.unwrap(); + let ( socket, addr) = listener.accept().await.unwrap(); let sender = sender.clone();//klonowanie tx dla kazdego klienta - let mut receiver = sender.subscribe();//nowy rx dla każdego klienta - println!("accepted player"); - - tokio::spawn(async move { //spawnowanie taska obsługi klienta - let (reader, mut writer) = socket.split(); //podział socketa na czytanie i pisanie - - let mut reader = BufReader::new(reader); //buffer czyta z socketa tcp od klienta - let mut line = String::new(); - - loop { - tokio::select! { - result = reader.read_line( & mut line) => { //pierwszy branch - otrzymanie wiadomosci do przeslania - if result.unwrap() == 0 { //koniec - break; - } - sender.send((line.clone(), addr)).unwrap(); - line.clear(); - } - result = receiver.recv() => { //drugi branch - otrzymanie wiadomosci przez broadcast - let (msg, other_addr) = result.unwrap(); - if addr != other_addr{ - writer.write_all(msg.as_bytes()).await.unwrap();} + //let mut receiver = sender.subscribe();//nowy rx dla każdego klienta + println!("Incoming connection from {}", addr); + let (reader, mut writer) = socket.into_split(); //podział socketa na czytanie i pisanie + + let mut clients_mut = clients.clone(); + let len = clients_mut.len(); + if len >= MAX_CLIENT_NUM { + println!("Refusing the connection. Chat is full"); + let refuse_msg = format!("Chat is full. ({}/{}). Try again later!", MAX_CLIENT_NUM, MAX_CLIENT_NUM); + writer.write(refuse_msg.as_bytes()).await.expect("Error on write to client!"); + } else { + println!("Initializing client no. {}. Requesting username", len); + + tokio::spawn(async move { //spawnowanie taska obsługi klienta + + let mut reader = BufReader::new(reader); //buffer czyta z socketa tcp od klienta + //let mut line = String::new(); + write_buf(&mut writer, "Enter username \n").await; + if let Ok(name) = request_username(&mut reader, &mut writer, &mut clients_mut).await + { + clients_mut.insert(name.clone(), writer); + println!("Client {} has joined", name); //debug msg + + let arrival_msg = format!("User {} has joined. \n", name); + if sender.send((arrival_msg, name.clone())).is_ok(){} + else{ + println!("DEBUG: message from {} could not be broadccast", name); } + + handle_connection(&mut reader, &name, sender).await; + //koniec połączenia + clients_mut.remove(&name); + println!("Client {} has left the chat", name); + } - } - }); + }); + }; } } From 6800a919c511493548f435e5b99a42800af962ed Mon Sep 17 00:00:00 2001 From: jkarmowska Date: Mon, 30 May 2022 21:04:06 +0200 Subject: [PATCH 08/17] update readme, fix disconnecting --- README.md | 13 +++++-- chat_app/Cargo.lock | 78 -------------------------------------- chat_app/Cargo.toml | 3 -- chat_app/src/bin/client.rs | 27 +++++++------ chat_app/src/bin/server.rs | 25 +++++++----- 5 files changed, 37 insertions(+), 109 deletions(-) diff --git a/README.md b/README.md index 980af2a..f7bd9e9 100644 --- a/README.md +++ b/README.md @@ -7,13 +7,18 @@ Klient i serwer czatu w Ruscie ## Funkcjonalność -- Logowanie użytkowników i ustawianie nicków -- Wysyłanie wiadomości do grupy użytkowników -- Połączenie przez TCP +- Logowanie użytkowników i ustawianie nicków [done] +- Wysyłanie wiadomości do grupy użytkowników [done] +- Połączenie przez TCP [done] ## Propozycja podziału na części -Pierwsza część - klient dołącza do jednej grupy na początku, grupy tworzone wraz z serwerem +Pierwsza część - klient dołącza do jednej grupy na początku, grupy tworzone wraz z serwerem [na razie tylko jedna grupa] Druga część - klient może być w kilku grupach, może opuścić grupę, stworzyć nową, wybrać do której grupy chce wysłać wiadomość + +## Todo +Serializacja wiadomości +Stworzenie kilku grup i wyboru do której user chce dołączyć + ## Biblioteki - Tokio - do obsługi współbieżności diff --git a/chat_app/Cargo.lock b/chat_app/Cargo.lock index 6208e85..974c351 100644 --- a/chat_app/Cargo.lock +++ b/chat_app/Cargo.lock @@ -32,10 +32,7 @@ version = "0.1.0" dependencies = [ "chrono", "dashmap", - "serde", - "serde_json", "tokio", - "tracing", "uuid", ] @@ -91,18 +88,6 @@ dependencies = [ "libc", ] -[[package]] -name = "itoa" -version = "1.0.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "112c678d4050afce233f4f2852bb2eb519230b3cf12f33585275537d7e41578d" - -[[package]] -name = "lazy_static" -version = "1.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" - [[package]] name = "libc" version = "0.2.125" @@ -237,12 +222,6 @@ dependencies = [ "bitflags", ] -[[package]] -name = "ryu" -version = "1.0.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f3f6f92acf49d1b98f7a81226834412ada05458b7364277387724a237f062695" - [[package]] name = "scopeguard" version = "1.1.0" @@ -254,31 +233,6 @@ name = "serde" version = "1.0.137" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "61ea8d54c77f8315140a05f4c7237403bf38b72704d031543aa1d16abbf517d1" -dependencies = [ - "serde_derive", -] - -[[package]] -name = "serde_derive" -version = "1.0.137" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1f26faba0c3959972377d3b2d306ee9f71faee9714294e41bb777f83f88578be" -dependencies = [ - "proc-macro2", - "quote", - "syn", -] - -[[package]] -name = "serde_json" -version = "1.0.81" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9b7ce2b32a1aed03c558dc61a5cd328f15aff2dbc17daad8fb8af04d2100e15c" -dependencies = [ - "itoa", - "ryu", - "serde", -] [[package]] name = "signal-hook-registry" @@ -357,38 +311,6 @@ dependencies = [ "syn", ] -[[package]] -name = "tracing" -version = "0.1.34" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5d0ecdcb44a79f0fe9844f0c4f33a342cbcbb5117de8001e6ba0dc2351327d09" -dependencies = [ - "cfg-if", - "pin-project-lite", - "tracing-attributes", - "tracing-core", -] - -[[package]] -name = "tracing-attributes" -version = "0.1.21" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cc6b8ad3567499f98a1db7a752b07a7c8c7c7c34c332ec00effb2b0027974b7c" -dependencies = [ - "proc-macro2", - "quote", - "syn", -] - -[[package]] -name = "tracing-core" -version = "0.1.26" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f54c8ca710e81886d498c2fd3331b56c93aa248d49de2222ad2742247c60072f" -dependencies = [ - "lazy_static", -] - [[package]] name = "unicode-xid" version = "0.2.3" diff --git a/chat_app/Cargo.toml b/chat_app/Cargo.toml index d4c095d..60e3a69 100644 --- a/chat_app/Cargo.toml +++ b/chat_app/Cargo.toml @@ -9,7 +9,4 @@ edition = "2021" tokio = { version = "1", features = ["full"] } uuid = { version = "0.8.1", features = ["serde", "v4"] } chrono = { version = "0.4.11", features = ["serde"] } -serde = { version = "1.0.105", features = ["derive"] } -serde_json = "1.0.50" -tracing = "0.1.34" dashmap = "5.3.4" \ No newline at end of file diff --git a/chat_app/src/bin/client.rs b/chat_app/src/bin/client.rs index 23f091e..c089830 100644 --- a/chat_app/src/bin/client.rs +++ b/chat_app/src/bin/client.rs @@ -11,8 +11,8 @@ const QUIT: &str = "/quit\n"; #[tokio::main] async fn main() { - let mut stream = TcpStream::connect(SERVER).await.unwrap(); - let (read, mut write) = stream.into_split(); + let stream = TcpStream::connect(SERVER).await.unwrap(); + let (read, write) = stream.into_split(); println!("Connection with server established at {}, enter {} to exit chat", &mut read.peer_addr().unwrap(), QUIT.trim()); println!("Wait until you will be asked to enter your username"); @@ -39,7 +39,7 @@ async fn main() { if len == 0 { break; } - print!("{}", line); + print!("{}", line); //wypisywanie otrzymanej wiadomości } Err(_) => { println!("Connection lost! type {} to quit", QUIT.trim()); @@ -55,23 +55,22 @@ async fn main() { let msg_send = stdin().read_line(&mut input); //user wpisuje wiadomość match msg_send { Ok(_) => { - if input == String::from(QUIT) {//user wpisał quit + if input == *QUIT {//user wpisał quit let quit_msg = input.clone(); - match sender.send(quit_msg).await { //wysyłanie waidomości o zakończeniu + match sender.send(quit_msg).await { //wysyłanie wiadomości o zakończeniu Ok(_) => {} Err(_) => { - println!("Exiting "); + println!("Error on sending exit"); } } break; - } else { - let res = writer.write(input.as_bytes()).await; //wysyłanie do servera wiadomości - match res { - Ok(_) => { writer.flush().await.expect("Failed to flush buffer") } - Err(_) => { - println!("Connection lost! type {} to quit", QUIT.trim()); - break; - } + } + let res = writer.write(input.as_bytes()).await; //wysyłanie do servera wiadomości + match res { + Ok(_) => { writer.flush().await.expect("Failed to flush buffer") } + Err(_) => { + println!("Connection lost! type {} to quit", QUIT.trim()); + break; } } } diff --git a/chat_app/src/bin/server.rs b/chat_app/src/bin/server.rs index f55bc96..0bbe087 100644 --- a/chat_app/src/bin/server.rs +++ b/chat_app/src/bin/server.rs @@ -12,7 +12,7 @@ const LOCAL: &str = "localhost:8080"; const MAX_CLIENT_NUM: usize = 20; const NAME_SIZE: usize = 30; const MESSAGE_SIZE: usize = 256; -const ERROR_MSG: &str = "**************ERROR_MSG*************"; +const ERROR_MSG: &str = "**************ERROR_MSG*************\n"; async fn write_buf<'a>(writer: &mut OwnedWriteHalf, message: &str) { @@ -48,11 +48,16 @@ async fn receive_msg(reader: &mut BufReader, user: &str) -> Resul let mut buffer = String::with_capacity(MESSAGE_SIZE); match reader.read_line(&mut buffer).await { Ok(_) => { - let mut msg = format!("{}:{}", user, buffer.as_str()); + if buffer.is_empty() { + return Err(String::from(ERROR_MSG)); + } + + let mut msg = format!("{} : {}", user, buffer.as_str()); if msg.len() > MESSAGE_SIZE { msg.truncate(MESSAGE_SIZE - 1); msg.push('\n'); } + Ok(msg) } Err(_) => { Err(String::from(ERROR_MSG)) } @@ -78,9 +83,9 @@ async fn handle_connection(reader: &mut BufReader, name: &str, se } } Err(_) => { - format!("{} has left the chat", &name); - if sender.send((msg.unwrap(), name.parse().unwrap())).is_ok() {} else { - println!("DEBUG: Exit message from {} vould not be send to broadcast", &name); + let err_msg = format!("{} has left the chat", &name); + if sender.send((err_msg, name.parse().unwrap())).is_ok() {} else { + println!("DEBUG: Exit message from {} could not be send to broadcast", &name); } break; } @@ -97,11 +102,11 @@ async fn main() { let (sender, mut receiver) = broadcast::channel::<(String,String) >(10); //sending strings to max 10 ppl who can connect let mut clients_cp = Arc::clone(&clients); - //task który wysyła do receivera gdy client otrzyma wiadomość - - // wtedy receiver robi broadcast do wszystkich klientów + //task do którego inne wątki wysyłają wiadomości od swoich klientów - + //robi broadcast do wszystkich klientów tokio::spawn(async move { loop { - if let Ok(msg) = receiver.try_recv() { + if let Ok(msg) = receiver.recv().await { println!("Broadcasting message : {}", msg.0); broadcast(&mut clients_cp, &msg.0, &msg.1).await; } @@ -109,7 +114,7 @@ async fn main() { }); - //nowi klienci + //nowy klient loop { let ( socket, addr) = listener.accept().await.unwrap(); let sender = sender.clone();//klonowanie tx dla kazdego klienta @@ -139,7 +144,7 @@ async fn main() { let arrival_msg = format!("User {} has joined. \n", name); if sender.send((arrival_msg, name.clone())).is_ok(){} else{ - println!("DEBUG: message from {} could not be broadccast", name); + println!("DEBUG: message from {} could not be broadcast", name); } handle_connection(&mut reader, &name, sender).await; From 1675587d5fc64ba08773a147b7e55efe173c7a40 Mon Sep 17 00:00:00 2001 From: jkarmowska Date: Fri, 17 Jun 2022 10:43:19 +0200 Subject: [PATCH 09/17] add logger --- chat_app/Cargo.lock | 78 +++++++++++++++++++++++++++++++++++++- chat_app/Cargo.toml | 4 +- chat_app/src/bin/client.rs | 16 +++++--- chat_app/src/bin/server.rs | 25 +++++++----- 4 files changed, 105 insertions(+), 18 deletions(-) diff --git a/chat_app/Cargo.lock b/chat_app/Cargo.lock index 974c351..5dd3cd0 100644 --- a/chat_app/Cargo.lock +++ b/chat_app/Cargo.lock @@ -2,6 +2,17 @@ # It is not intended for manual editing. version = 3 +[[package]] +name = "atty" +version = "0.2.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d9b39be18770d11421cdb1b9947a45dd3f37e93092cbf377614828a319d5fee8" +dependencies = [ + "hermit-abi", + "libc", + "winapi", +] + [[package]] name = "autocfg" version = "1.1.0" @@ -32,6 +43,8 @@ version = "0.1.0" dependencies = [ "chrono", "dashmap", + "log", + "simple_logger", "tokio", "uuid", ] @@ -46,7 +59,18 @@ dependencies = [ "num-integer", "num-traits", "serde", - "time", + "time 0.1.43", + "winapi", +] + +[[package]] +name = "colored" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b3616f750b84d8f0de8a58bda93e08e2a81ad3f523089b05f1dffecab48c6cbd" +dependencies = [ + "atty", + "lazy_static", "winapi", ] @@ -88,6 +112,18 @@ dependencies = [ "libc", ] +[[package]] +name = "itoa" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "112c678d4050afce233f4f2852bb2eb519230b3cf12f33585275537d7e41578d" + +[[package]] +name = "lazy_static" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" + [[package]] name = "libc" version = "0.2.125" @@ -160,6 +196,15 @@ dependencies = [ "libc", ] +[[package]] +name = "num_threads" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2819ce041d2ee131036f4fc9d6ae7ae125a3a40e97ba64d04fe799ad9dabbb44" +dependencies = [ + "libc", +] + [[package]] name = "once_cell" version = "1.10.0" @@ -243,6 +288,19 @@ dependencies = [ "libc", ] +[[package]] +name = "simple_logger" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c75a9723083573ace81ad0cdfc50b858aa3c366c48636edb4109d73122a0c0ea" +dependencies = [ + "atty", + "colored", + "log", + "time 0.3.9", + "winapi", +] + [[package]] name = "smallvec" version = "1.8.0" @@ -280,6 +338,24 @@ dependencies = [ "winapi", ] +[[package]] +name = "time" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c2702e08a7a860f005826c6815dcac101b19b5eb330c27fe4a5928fec1d20ddd" +dependencies = [ + "itoa", + "libc", + "num_threads", + "time-macros", +] + +[[package]] +name = "time-macros" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "42657b1a6f4d817cda8e7a0ace261fe0cc946cf3a80314390b22cc61ae080792" + [[package]] name = "tokio" version = "1.18.2" diff --git a/chat_app/Cargo.toml b/chat_app/Cargo.toml index 60e3a69..70ce134 100644 --- a/chat_app/Cargo.toml +++ b/chat_app/Cargo.toml @@ -9,4 +9,6 @@ edition = "2021" tokio = { version = "1", features = ["full"] } uuid = { version = "0.8.1", features = ["serde", "v4"] } chrono = { version = "0.4.11", features = ["serde"] } -dashmap = "5.3.4" \ No newline at end of file +dashmap = "5.3.4" +log = "0.4.17" +simple_logger = "2.1.0" \ No newline at end of file diff --git a/chat_app/src/bin/client.rs b/chat_app/src/bin/client.rs index c089830..72690f4 100644 --- a/chat_app/src/bin/client.rs +++ b/chat_app/src/bin/client.rs @@ -2,18 +2,22 @@ use std::io::stdin; use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter}; use tokio::net::{TcpStream}; use tokio::sync::mpsc; +use log::{info, warn}; +use simple_logger::SimpleLogger; const SERVER: &str = "localhost:8080"; const QUIT: &str = "/quit\n"; -//async fn receive(mut reader: BufReader, receiver: Receiver<(String, SocketAddr)>) {} + #[tokio::main] async fn main() { + SimpleLogger::new().init().unwrap(); + let stream = TcpStream::connect(SERVER).await.unwrap(); let (read, write) = stream.into_split(); - println!("Connection with server established at {}, enter {} to exit chat", &mut read.peer_addr().unwrap(), QUIT.trim()); + info!("Connection with server established at {}, enter {} to exit chat", &mut read.peer_addr().unwrap(), QUIT.trim()); println!("Wait until you will be asked to enter your username"); @@ -42,7 +46,7 @@ async fn main() { print!("{}", line); //wypisywanie otrzymanej wiadomości } Err(_) => { - println!("Connection lost! type {} to quit", QUIT.trim()); + info!("Connection lost! type {} to quit", QUIT.trim()); } } } @@ -60,7 +64,7 @@ async fn main() { match sender.send(quit_msg).await { //wysyłanie wiadomości o zakończeniu Ok(_) => {} Err(_) => { - println!("Error on sending exit"); + warn!("Error on sending exit"); } } break; @@ -69,13 +73,13 @@ async fn main() { match res { Ok(_) => { writer.flush().await.expect("Failed to flush buffer") } Err(_) => { - println!("Connection lost! type {} to quit", QUIT.trim()); + warn!("Connection lost! type {} to quit", QUIT.trim()); break; } } } Err(_) => { - println!("Error while reading input. "); + warn!("Error while reading input. "); } } } diff --git a/chat_app/src/bin/server.rs b/chat_app/src/bin/server.rs index 0bbe087..9374863 100644 --- a/chat_app/src/bin/server.rs +++ b/chat_app/src/bin/server.rs @@ -7,6 +7,10 @@ use dashmap::DashMap; use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf}; use tokio::sync::broadcast::Sender; +use log::{info, warn}; +use simple_logger::SimpleLogger; + + const LOCAL: &str = "localhost:8080"; const MAX_CLIENT_NUM: usize = 20; @@ -14,7 +18,6 @@ const NAME_SIZE: usize = 30; const MESSAGE_SIZE: usize = 256; const ERROR_MSG: &str = "**************ERROR_MSG*************\n"; - async fn write_buf<'a>(writer: &mut OwnedWriteHalf, message: &str) { writer.write(message.as_bytes()).await.expect("Failed to write msg"); writer.flush().await.expect("Failed to flush after write"); @@ -79,13 +82,13 @@ async fn handle_connection(reader: &mut BufReader, name: &str, se match msg { Ok(_) => { if sender.send((msg.clone().unwrap(), name.parse().unwrap())).is_ok() {} else { - println!("DEBUG: Message from {} could not be send to broadcast", &name); + warn!("Message from {} could not be send to broadcast", &name); } } Err(_) => { let err_msg = format!("{} has left the chat", &name); if sender.send((err_msg, name.parse().unwrap())).is_ok() {} else { - println!("DEBUG: Exit message from {} could not be send to broadcast", &name); + warn!("Exit message from {} could not be send to broadcast", &name); } break; } @@ -95,6 +98,8 @@ async fn handle_connection(reader: &mut BufReader, name: &str, se #[tokio::main] async fn main() { + SimpleLogger::new().init().unwrap(); + let listener = TcpListener::bind(LOCAL).await.unwrap(); let clients: Arc> = Arc::new(DashMap::with_capacity(MAX_CLIENT_NUM));//klienci @@ -107,7 +112,7 @@ async fn main() { tokio::spawn(async move { loop { if let Ok(msg) = receiver.recv().await { - println!("Broadcasting message : {}", msg.0); + info!("Broadcasting message : {}", msg.0); broadcast(&mut clients_cp, &msg.0, &msg.1).await; } } @@ -119,17 +124,17 @@ async fn main() { let ( socket, addr) = listener.accept().await.unwrap(); let sender = sender.clone();//klonowanie tx dla kazdego klienta //let mut receiver = sender.subscribe();//nowy rx dla każdego klienta - println!("Incoming connection from {}", addr); + info!("Incoming connection from {}", addr); let (reader, mut writer) = socket.into_split(); //podział socketa na czytanie i pisanie let mut clients_mut = clients.clone(); let len = clients_mut.len(); if len >= MAX_CLIENT_NUM { - println!("Refusing the connection. Chat is full"); + info!("Refusing the connection. Chat is full"); let refuse_msg = format!("Chat is full. ({}/{}). Try again later!", MAX_CLIENT_NUM, MAX_CLIENT_NUM); writer.write(refuse_msg.as_bytes()).await.expect("Error on write to client!"); } else { - println!("Initializing client no. {}. Requesting username", len); + info!("Initializing client no. {}. Requesting username", len); tokio::spawn(async move { //spawnowanie taska obsługi klienta @@ -139,18 +144,18 @@ async fn main() { if let Ok(name) = request_username(&mut reader, &mut writer, &mut clients_mut).await { clients_mut.insert(name.clone(), writer); - println!("Client {} has joined", name); //debug msg + info!("Client {} has joined", name); //debug msg let arrival_msg = format!("User {} has joined. \n", name); if sender.send((arrival_msg, name.clone())).is_ok(){} else{ - println!("DEBUG: message from {} could not be broadcast", name); + info!("DEBUG: message from {} could not be broadcast", name); } handle_connection(&mut reader, &name, sender).await; //koniec połączenia clients_mut.remove(&name); - println!("Client {} has left the chat", name); + info!("Client {} has left the chat", name); } }); From fbd1702ba9e14136edb2031d7fcae880594dbbdf Mon Sep 17 00:00:00 2001 From: jkarmowska Date: Fri, 17 Jun 2022 17:20:17 +0200 Subject: [PATCH 10/17] add messages, refactor client --- chat_app/Cargo.lock | 33 +++++++++ chat_app/Cargo.toml | 4 +- chat_app/src/bin/client.rs | 144 ++++++++++++++++++++++++++----------- chat_app/src/bin/server.rs | 30 +++++--- chat_app/src/types.rs | 51 ++++++++----- 5 files changed, 193 insertions(+), 69 deletions(-) diff --git a/chat_app/Cargo.lock b/chat_app/Cargo.lock index 5dd3cd0..89ab485 100644 --- a/chat_app/Cargo.lock +++ b/chat_app/Cargo.lock @@ -44,6 +44,8 @@ dependencies = [ "chrono", "dashmap", "log", + "serde", + "serde_json", "simple_logger", "tokio", "uuid", @@ -267,6 +269,12 @@ dependencies = [ "bitflags", ] +[[package]] +name = "ryu" +version = "1.0.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f3f6f92acf49d1b98f7a81226834412ada05458b7364277387724a237f062695" + [[package]] name = "scopeguard" version = "1.1.0" @@ -278,6 +286,31 @@ name = "serde" version = "1.0.137" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "61ea8d54c77f8315140a05f4c7237403bf38b72704d031543aa1d16abbf517d1" +dependencies = [ + "serde_derive", +] + +[[package]] +name = "serde_derive" +version = "1.0.137" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1f26faba0c3959972377d3b2d306ee9f71faee9714294e41bb777f83f88578be" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "serde_json" +version = "1.0.81" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b7ce2b32a1aed03c558dc61a5cd328f15aff2dbc17daad8fb8af04d2100e15c" +dependencies = [ + "itoa", + "ryu", + "serde", +] [[package]] name = "signal-hook-registry" diff --git a/chat_app/Cargo.toml b/chat_app/Cargo.toml index 70ce134..6373f84 100644 --- a/chat_app/Cargo.toml +++ b/chat_app/Cargo.toml @@ -11,4 +11,6 @@ uuid = { version = "0.8.1", features = ["serde", "v4"] } chrono = { version = "0.4.11", features = ["serde"] } dashmap = "5.3.4" log = "0.4.17" -simple_logger = "2.1.0" \ No newline at end of file +simple_logger = "2.1.0" +serde = { version = "1.0.137", features = ["derive"] } +serde_json = "1.0.81" \ No newline at end of file diff --git a/chat_app/src/bin/client.rs b/chat_app/src/bin/client.rs index 72690f4..7259e92 100644 --- a/chat_app/src/bin/client.rs +++ b/chat_app/src/bin/client.rs @@ -1,67 +1,57 @@ use std::io::stdin; +use std::string; use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter}; use tokio::net::{TcpStream}; use tokio::sync::mpsc; -use log::{info, warn}; +use log::{error, info, warn}; use simple_logger::SimpleLogger; +use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf}; +use tokio::sync::mpsc::{Receiver, Sender}; +use chat_app::types; +use types::Message; +use serde::{Serialize, Deserialize}; +use serde_json; +use std::string::String; const SERVER: &str = "localhost:8080"; const QUIT: &str = "/quit\n"; - - - -#[tokio::main] -async fn main() { - SimpleLogger::new().init().unwrap(); - - let stream = TcpStream::connect(SERVER).await.unwrap(); - let (read, write) = stream.into_split(); - info!("Connection with server established at {}, enter {} to exit chat", &mut read.peer_addr().unwrap(), QUIT.trim()); - - println!("Wait until you will be asked to enter your username"); - - let mut reader = BufReader::new(read); - let mut writer = BufWriter::new(write); - let (sender, mut receiver) = mpsc::channel::(10);//10 wiadomości - +async fn read_msg(mut reader: BufReader, mut receiver: Receiver) { let mut line = String::new(); + loop { + line.clear(); - //reader - tokio::spawn(async move { - loop { - line.clear(); - - let received = reader.read_line(&mut line).await; - if let Ok(_received_msg) = receiver.try_recv() { //wiadomość od drugiego taska o zakończeniu - println!("You have left the chat."); - break; - } - match received { - Ok(len) => - { - if len == 0 { - break; - } - print!("{}", line); //wypisywanie otrzymanej wiadomości + let received = reader.read_line(&mut line).await; + if let Ok(_received_msg) = receiver.try_recv() { //wiadomość od drugiego taska o zakończeniu + println!("You have left the chat."); + break; + } + match received { + Ok(len) => + { + if len == 0 { + break; } - Err(_) => { - info!("Connection lost! type {} to quit", QUIT.trim()); + print!("{}", line); //wypisywanie otrzymanej wiadomości } + Err(_) => { + info!("Connection lost! type {} to quit", QUIT.trim()); } } - }); + } +} - //writer +async fn write_msg(mut writer: BufWriter, sender: Sender) { let mut input = String::new(); loop { input.clear(); let msg_send = stdin().read_line(&mut input); //user wpisuje wiadomość match msg_send { Ok(_) => { + let mes: Message = serde_json::from_str(input.as_str()).unwrap(); if input == *QUIT {//user wpisał quit let quit_msg = input.clone(); - match sender.send(quit_msg).await { //wysyłanie wiadomości o zakończeniu + match sender.send(Message::Quit {}).await { //wysyłanie wiadomości o zakończeniu Ok(_) => {} Err(_) => { warn!("Error on sending exit"); @@ -83,4 +73,78 @@ async fn main() { } } } +} + +fn get_initial_data() -> Message { + println!("Enter your username"); + + let mut username = String::new(); + let user_size = stdin().read_line(&mut username); + let mut channel_str = String::new(); + println!("Enter channel number(1 - 10)"); + let channel_size = stdin().read_line(&mut channel_str); + let channel: usize = channel_str.trim().parse().unwrap(); + Message::Hello { username, channel } +} + + +async fn send_data(writer: &mut BufWriter, reader: &mut BufReader) { + loop { + let hello = get_initial_data(); + let sent = writer.write(serde_json::to_string(&hello).unwrap().as_bytes()).await; + let mut line = String::new(); + match reader.read_line(&mut line).await { + Ok(mess) => { + let message = serde_json::from_str(line.as_str()); + match message { + Ok(Message::UsernameTaken { .. }) => { + println!("Username taken. Enter data again"); + } + Ok(Message::Ok { .. }) => { break; } + _ => error!("Unexpected message from server!"), + } + } + Err(_) => { error!("Error on receiving from server") } + } + } +} + +#[tokio::main] +async fn main() { + SimpleLogger::new().init().unwrap(); + + let stream = TcpStream::connect(SERVER).await.unwrap(); + let (read, write) = stream.into_split(); + info!("Connection with server established at {}, enter {} to exit chat", &mut read.peer_addr().unwrap(), QUIT.trim()); + + + let mut reader = BufReader::new(read); + let mut writer = BufWriter::new(write); + let (sender, mut receiver) = mpsc::channel::(10);//10 wiadomości + + send_data(&mut writer, &mut reader).await; + let mut line = String::new(); + match reader.read_line(&mut line).await { + Ok(_) => { + match serde_json::from_str(line.as_str()) { + Ok(Message::Ok {}) => {} + Ok(Message::ChatFull {}) => { + println!("Chat is full. try again later!"); + return; + } + } + } + Err(_) => { + error!("error on reading from server"); + return; + } + }; + + tokio::spawn(async move { + read_msg(reader, receiver).await; + }); + + tokio::spawn(async move { + write_msg(writer, sender).await; + }); } \ No newline at end of file diff --git a/chat_app/src/bin/server.rs b/chat_app/src/bin/server.rs index 9374863..3220e9d 100644 --- a/chat_app/src/bin/server.rs +++ b/chat_app/src/bin/server.rs @@ -1,7 +1,8 @@ use std::sync::Arc; +use std::sync::mpsc::Receiver; use tokio::net::{TcpListener}; use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, }; -use tokio::sync::broadcast; +use tokio::sync::{broadcast, mpsc}; use dashmap::DashMap; use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf}; @@ -9,7 +10,7 @@ use tokio::sync::broadcast::Sender; use log::{info, warn}; use simple_logger::SimpleLogger; - +use chat_app::types::Message; const LOCAL: &str = "localhost:8080"; @@ -17,6 +18,7 @@ const MAX_CLIENT_NUM: usize = 20; const NAME_SIZE: usize = 30; const MESSAGE_SIZE: usize = 256; const ERROR_MSG: &str = "**************ERROR_MSG*************\n"; +const CHANNEL_COUNT:usize = 10; async fn write_buf<'a>(writer: &mut OwnedWriteHalf, message: &str) { writer.write(message.as_bytes()).await.expect("Failed to write msg"); @@ -95,7 +97,11 @@ async fn handle_connection(reader: &mut BufReader, name: &str, se } } } - +struct Channel{ + sender:Sender, + receiver: Receiver, + users: Arc>; +} #[tokio::main] async fn main() { SimpleLogger::new().init().unwrap(); @@ -104,7 +110,10 @@ async fn main() { let clients: Arc> = Arc::new(DashMap::with_capacity(MAX_CLIENT_NUM));//klienci - let (sender, mut receiver) = broadcast::channel::<(String,String) >(10); //sending strings to max 10 ppl who can connect + for i in 0..CHANNEL_COUNT{ + let (sender, mut receiver) = mpsc::channel::<(String,String) >(MAX_CLIENT_NUM); + + } let mut clients_cp = Arc::clone(&clients); //task do którego inne wątki wysyłają wiadomości od swoich klientów - @@ -131,17 +140,20 @@ async fn main() { let len = clients_mut.len(); if len >= MAX_CLIENT_NUM { info!("Refusing the connection. Chat is full"); - let refuse_msg = format!("Chat is full. ({}/{}). Try again later!", MAX_CLIENT_NUM, MAX_CLIENT_NUM); - writer.write(refuse_msg.as_bytes()).await.expect("Error on write to client!"); + //let refuse_msg = format!("Chat is full. ({}/{}). Try again later!", MAX_CLIENT_NUM, MAX_CLIENT_NUM); + writer.write(serde_json::to_string(&Message::ChatFull {}).unwrap().as_bytes()).await; } else { - info!("Initializing client no. {}. Requesting username", len); + writer.write(serde_json::to_string(&Message::Ok {}).unwrap().as_bytes()).await; + + info!("Initializing client no. {}. Waiting for username and channel number", len); tokio::spawn(async move { //spawnowanie taska obsługi klienta let mut reader = BufReader::new(reader); //buffer czyta z socketa tcp od klienta //let mut line = String::new(); - write_buf(&mut writer, "Enter username \n").await; - if let Ok(name) = request_username(&mut reader, &mut writer, &mut clients_mut).await + //write_buf(&mut writer, "Enter username \n").await; + let hello = Message::Hello { username: "".to_string(), channel: 0 }; + if let Ok(hello) = request_username(&mut reader, &mut writer, &mut clients_mut).await { clients_mut.insert(name.clone(), writer); info!("Client {} has joined", name); //debug msg diff --git a/chat_app/src/types.rs b/chat_app/src/types.rs index c9ab9f1..d53ec37 100644 --- a/chat_app/src/types.rs +++ b/chat_app/src/types.rs @@ -1,12 +1,15 @@ +use std::cmp::Ordering; +use std::collections::{BTreeMap, BTreeSet, HashMap}; use uuid::Uuid; use chrono::{DateTime, Utc}; +use serde::{Serialize, Deserialize}; +use serde_json; -#[derive(Debug, Clone, PartialEq)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct User { pub id: Uuid, pub name: String, } - impl User { pub fn new(id: Uuid, name: &str) -> Self { User { @@ -16,23 +19,33 @@ impl User { } } -#[derive(Debug, Clone, PartialEq)] -pub struct Message { - pub id: Uuid, - pub user: User, - pub body: String, - pub group: Uuid, - pub time: DateTime, +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] + +struct Text{ + pub time : DateTime, + pub user :String, + pub text: String, } -impl Message { - pub fn new(id: Uuid, user: User, msg: &str, group: Uuid, time: DateTime) -> Self { - Message { - id, - user, - body: String::from(msg), - group, - time, - } - } + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub enum Message { + ClientMessage { + message:String + }, + Hello { + username: String, + channel: usize, + }, + History { + messages: BTreeMap, + }, + Quit { + }, + SwitchChannel { + new_channel: usize, + }, + UsernameTaken{}, + Ok{}, + ChatFull{} } \ No newline at end of file From 98dcf73182d97fde53bedd64dd8bb70a1f85245d Mon Sep 17 00:00:00 2001 From: jkarmowska Date: Wed, 22 Jun 2022 15:22:09 +0200 Subject: [PATCH 11/17] refactor server --- chat_app/src/bin/server.rs | 181 ++++++++++++++++++++++--------------- chat_app/src/types.rs | 17 +++- 2 files changed, 119 insertions(+), 79 deletions(-) diff --git a/chat_app/src/bin/server.rs b/chat_app/src/bin/server.rs index 3220e9d..98cf1cb 100644 --- a/chat_app/src/bin/server.rs +++ b/chat_app/src/bin/server.rs @@ -1,12 +1,13 @@ -use std::sync::Arc; -use std::sync::mpsc::Receiver; +use std::borrow::BorrowMut; +use std::ops::{Deref, DerefMut}; +use std::sync::{Arc, }; use tokio::net::{TcpListener}; -use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, }; -use tokio::sync::{broadcast, mpsc}; +use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; +use tokio::sync::{mpsc, Mutex}; use dashmap::DashMap; use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf}; -use tokio::sync::broadcast::Sender; +use tokio::sync::mpsc::{Receiver, Sender}; use log::{info, warn}; use simple_logger::SimpleLogger; @@ -18,7 +19,7 @@ const MAX_CLIENT_NUM: usize = 20; const NAME_SIZE: usize = 30; const MESSAGE_SIZE: usize = 256; const ERROR_MSG: &str = "**************ERROR_MSG*************\n"; -const CHANNEL_COUNT:usize = 10; +const CHANNEL_COUNT: usize = 10; async fn write_buf<'a>(writer: &mut OwnedWriteHalf, message: &str) { writer.write(message.as_bytes()).await.expect("Failed to write msg"); @@ -27,8 +28,8 @@ async fn write_buf<'a>(writer: &mut OwnedWriteHalf, message: &str) { //czeka na username async fn request_username(reader: &mut BufReader, - writer: &mut OwnedWriteHalf, - clients: &mut Arc>, + writer: &mut OwnedWriteHalf, + clients: &mut Arc>, ) -> Result { let mut buffer = String::with_capacity(NAME_SIZE); loop { @@ -69,11 +70,12 @@ async fn receive_msg(reader: &mut BufReader, user: &str) -> Resul } } -async fn broadcast(clients: &mut Arc>, message: &str, sender:&str) { +async fn broadcast(clients: &mut Arc>, message: Message) { for mut entry in clients.iter_mut() { - if entry.key()!=sender{ - write_buf(entry.value_mut(), message).await; - } + //if entry.key() != sender { + //todo pisanie wiadomosci!! + write_buf(entry.value_mut(), "ff").await; + //} } } @@ -83,13 +85,13 @@ async fn handle_connection(reader: &mut BufReader, name: &str, se let msg = receive_msg(reader, name).await; match msg { Ok(_) => { - if sender.send((msg.clone().unwrap(), name.parse().unwrap())).is_ok() {} else { + if sender.send((msg.clone().unwrap(), name.parse().unwrap())).await.is_ok() {} else { warn!("Message from {} could not be send to broadcast", &name); } } Err(_) => { let err_msg = format!("{} has left the chat", &name); - if sender.send((err_msg, name.parse().unwrap())).is_ok() {} else { + if sender.send((err_msg, name.parse().unwrap())).await.is_ok() {} else { warn!("Exit message from {} could not be send to broadcast", &name); } break; @@ -97,80 +99,111 @@ async fn handle_connection(reader: &mut BufReader, name: &str, se } } } -struct Channel{ - sender:Sender, - receiver: Receiver, - users: Arc>; + +struct Channel { + sender: Arc>>, + receiver: Arc>>, + users: Arc>, +} + +impl Channel { + pub fn new(sender: Arc>>, + receiver: Arc>>, + users: Arc>) -> Self { + Channel { + sender, + receiver, + users, + } + } +} + + +async fn manage_client(reader: OwnedReadHalf, mut writer:OwnedWriteHalf, channels: Arc>>) { + let mut reader = BufReader::new(reader); //buffer czyta z socketa tcp od klienta + //let mut line = String::new(); + //write_buf(&mut writer, "Enter username \n").await; + let hello = Message::Hello { username: "".to_string(), channel: 0 }; + /*if let Ok(hello) = request_username(&mut reader, &mut writer, &mut clients_mut).await + { + clients_mut.insert(name.clone(), writer); + //todo wydzielic funkcje + //todo rozważac msg ChangeChannel - wtedy wysyłac mu historie i innym ze dołączył i reszcie że wyszedł + info!("Client {} has joined", name); //debug msg + + let arrival_msg = format!("User {} has joined. \n", name); + if sender.send((arrival_msg, name.clone())).is_ok() {} else { + info!("DEBUG: message from {} could not be broadcast", name); + } + + handle_connection(&mut reader, &name, sender).await; + //koniec połączenia + clients_mut.remove(&name); + info!("Client {} has left the chat", name); + }*/ } + + +async fn manage_communication(channels_cp: Arc>>, i: usize){ + loop { + let mut guard = channels_cp.lock().await; + let mut channel = guard.deref_mut().get_mut(i).unwrap(); + let mut receiver_ptr = Arc::clone(&channel.receiver); + let mut receiver_guard = receiver_ptr.lock().await; + let receiver = receiver_guard.deref_mut(); + if let Some(msg) = receiver.recv().await { + info!("Broadcasting message : {}","XD"); + broadcast(&mut channel.users, msg).await; + } + drop(guard); + } +} + #[tokio::main] async fn main() { SimpleLogger::new().init().unwrap(); let listener = TcpListener::bind(LOCAL).await.unwrap(); - let clients: Arc> = Arc::new(DashMap::with_capacity(MAX_CLIENT_NUM));//klienci + let mut channels: Arc>> = Arc::new(Mutex::new(Vec::with_capacity(CHANNEL_COUNT))); - for i in 0..CHANNEL_COUNT{ - let (sender, mut receiver) = mpsc::channel::<(String,String) >(MAX_CLIENT_NUM); + for _i in 0..CHANNEL_COUNT { + let (sender, mut receiver) = mpsc::channel::(MAX_CLIENT_NUM); + let users: Arc> = Arc::new(DashMap::with_capacity(MAX_CLIENT_NUM));//klienci + let sender = Arc::new(Mutex::new(sender)); + let receiver = Arc::new(Mutex::new(receiver)); + let channel = Channel { sender, receiver, users }; + let mut guard = channels.lock().await; + guard.push(channel); } - let mut clients_cp = Arc::clone(&clients); - - //task do którego inne wątki wysyłają wiadomości od swoich klientów - - //robi broadcast do wszystkich klientów - tokio::spawn(async move { - loop { - if let Ok(msg) = receiver.recv().await { - info!("Broadcasting message : {}", msg.0); - broadcast(&mut clients_cp, &msg.0, &msg.1).await; - } - } - }); + + + for i in 0..CHANNEL_COUNT { + let mut channels_cp = Arc::clone(&channels); + + tokio::spawn(async move { + manage_communication(channels_cp, i) + }); + } //nowy klient loop { - let ( socket, addr) = listener.accept().await.unwrap(); - let sender = sender.clone();//klonowanie tx dla kazdego klienta - //let mut receiver = sender.subscribe();//nowy rx dla każdego klienta + let (socket, addr) = listener.accept().await.unwrap(); info!("Incoming connection from {}", addr); - let (reader, mut writer) = socket.into_split(); //podział socketa na czytanie i pisanie - - let mut clients_mut = clients.clone(); - let len = clients_mut.len(); - if len >= MAX_CLIENT_NUM { - info!("Refusing the connection. Chat is full"); - //let refuse_msg = format!("Chat is full. ({}/{}). Try again later!", MAX_CLIENT_NUM, MAX_CLIENT_NUM); - writer.write(serde_json::to_string(&Message::ChatFull {}).unwrap().as_bytes()).await; - } else { - writer.write(serde_json::to_string(&Message::Ok {}).unwrap().as_bytes()).await; - - info!("Initializing client no. {}. Waiting for username and channel number", len); - - tokio::spawn(async move { //spawnowanie taska obsługi klienta - - let mut reader = BufReader::new(reader); //buffer czyta z socketa tcp od klienta - //let mut line = String::new(); - //write_buf(&mut writer, "Enter username \n").await; - let hello = Message::Hello { username: "".to_string(), channel: 0 }; - if let Ok(hello) = request_username(&mut reader, &mut writer, &mut clients_mut).await - { - clients_mut.insert(name.clone(), writer); - info!("Client {} has joined", name); //debug msg - - let arrival_msg = format!("User {} has joined. \n", name); - if sender.send((arrival_msg, name.clone())).is_ok(){} - else{ - info!("DEBUG: message from {} could not be broadcast", name); - } - - handle_connection(&mut reader, &name, sender).await; - //koniec połączenia - clients_mut.remove(&name); - info!("Client {} has left the chat", name); + let (reader, mut writer) = socket.into_split(); - } - }); - }; - } + let mut channels_cp = Arc::clone(&channels); + + + info!("Initializing new client. Waiting for username and channel number"); + + tokio::spawn(async move { + + manage_client(reader, writer, channels_cp); + + }); + // }; + }/**/ } diff --git a/chat_app/src/types.rs b/chat_app/src/types.rs index d53ec37..85e7684 100644 --- a/chat_app/src/types.rs +++ b/chat_app/src/types.rs @@ -40,12 +40,19 @@ pub enum Message { History { messages: BTreeMap, }, - Quit { - }, + Quit, SwitchChannel { new_channel: usize, }, - UsernameTaken{}, - Ok{}, - ChatFull{} + UsernameTaken, + Ok, + ChatFull, + BroadcastMessage{ + message:String, + user:String + } +} +pub struct ServerMessage{ + user:User, + text: String, } \ No newline at end of file From 3dd6014af9c6bddad4f4c943c22910531f10460b Mon Sep 17 00:00:00 2001 From: jkarmowska Date: Wed, 22 Jun 2022 20:48:55 +0200 Subject: [PATCH 12/17] initial communication --- chat_app/Cargo.lock | 7 ++ chat_app/Cargo.toml | 3 +- chat_app/src/bin/client.rs | 22 +++-- chat_app/src/bin/server.rs | 164 +++++++++++++++++-------------------- chat_app/src/types.rs | 26 +++--- 5 files changed, 116 insertions(+), 106 deletions(-) diff --git a/chat_app/Cargo.lock b/chat_app/Cargo.lock index 89ab485..a86b286 100644 --- a/chat_app/Cargo.lock +++ b/chat_app/Cargo.lock @@ -2,6 +2,12 @@ # It is not intended for manual editing. version = 3 +[[package]] +name = "anyhow" +version = "1.0.58" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bb07d2053ccdbe10e2af2995a2f116c1330396493dc1269f6a91d0ae82e19704" + [[package]] name = "atty" version = "0.2.14" @@ -41,6 +47,7 @@ checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" name = "chat_app" version = "0.1.0" dependencies = [ + "anyhow", "chrono", "dashmap", "log", diff --git a/chat_app/Cargo.toml b/chat_app/Cargo.toml index 6373f84..a7669c0 100644 --- a/chat_app/Cargo.toml +++ b/chat_app/Cargo.toml @@ -13,4 +13,5 @@ dashmap = "5.3.4" log = "0.4.17" simple_logger = "2.1.0" serde = { version = "1.0.137", features = ["derive"] } -serde_json = "1.0.81" \ No newline at end of file +serde_json = "1.0.81" +anyhow = "1.0.57" \ No newline at end of file diff --git a/chat_app/src/bin/client.rs b/chat_app/src/bin/client.rs index 7259e92..c1e2904 100644 --- a/chat_app/src/bin/client.rs +++ b/chat_app/src/bin/client.rs @@ -13,6 +13,7 @@ use serde::{Serialize, Deserialize}; use serde_json; use std::string::String; + const SERVER: &str = "localhost:8080"; const QUIT: &str = "/quit\n"; @@ -84,15 +85,21 @@ fn get_initial_data() -> Message { println!("Enter channel number(1 - 10)"); let channel_size = stdin().read_line(&mut channel_str); let channel: usize = channel_str.trim().parse().unwrap(); - Message::Hello { username, channel } + let username = username.trim(); + Message::Hello { username:username.to_string(), channel } } async fn send_data(writer: &mut BufWriter, reader: &mut BufReader) { loop { let hello = get_initial_data(); - let sent = writer.write(serde_json::to_string(&hello).unwrap().as_bytes()).await; + let mut serialized: String = serde_json::to_string(&hello).unwrap(); + serialized.push('\n'); + let sent = writer.write(serialized.as_bytes()).await; + let _ = writer.flush().await; let mut line = String::new(); + info!("sent message to server"); + match reader.read_line(&mut line).await { Ok(mess) => { let message = serde_json::from_str(line.as_str()); @@ -100,11 +107,14 @@ async fn send_data(writer: &mut BufWriter, reader: &mut BufReade Ok(Message::UsernameTaken { .. }) => { println!("Username taken. Enter data again"); } - Ok(Message::Ok { .. }) => { break; } - _ => error!("Unexpected message from server!"), + Ok(Message::Ok { .. }) => { + info!("received ok message from server"); + break; + } + _ => error!("Unexpected message from server(message): {} !", serde_json::to_string(&message.unwrap()).unwrap()), } } - Err(_) => { error!("Error on receiving from server") } + Err(_) => { error!("Error on receiving from server(ERR)") } } } } @@ -123,6 +133,7 @@ async fn main() { let (sender, mut receiver) = mpsc::channel::(10);//10 wiadomości send_data(&mut writer, &mut reader).await; + info!("sent username and channel no to server"); let mut line = String::new(); match reader.read_line(&mut line).await { Ok(_) => { @@ -132,6 +143,7 @@ async fn main() { println!("Chat is full. try again later!"); return; } + _ => error!("error deserializing message") } } Err(_) => { diff --git a/chat_app/src/bin/server.rs b/chat_app/src/bin/server.rs index 98cf1cb..da26d9b 100644 --- a/chat_app/src/bin/server.rs +++ b/chat_app/src/bin/server.rs @@ -1,15 +1,14 @@ -use std::borrow::BorrowMut; -use std::ops::{Deref, DerefMut}; -use std::sync::{Arc, }; +use std::ops::{ DerefMut}; +use std::sync::{Arc}; use tokio::net::{TcpListener}; use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; use tokio::sync::{mpsc, Mutex}; - +use anyhow::{anyhow, Result}; use dashmap::DashMap; use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf}; use tokio::sync::mpsc::{Receiver, Sender}; -use log::{info, warn}; +use log::{info, warn, }; use simple_logger::SimpleLogger; use chat_app::types::Message; @@ -18,7 +17,6 @@ const LOCAL: &str = "localhost:8080"; const MAX_CLIENT_NUM: usize = 20; const NAME_SIZE: usize = 30; const MESSAGE_SIZE: usize = 256; -const ERROR_MSG: &str = "**************ERROR_MSG*************\n"; const CHANNEL_COUNT: usize = 10; async fn write_buf<'a>(writer: &mut OwnedWriteHalf, message: &str) { @@ -26,66 +24,35 @@ async fn write_buf<'a>(writer: &mut OwnedWriteHalf, message: &str) { writer.flush().await.expect("Failed to flush after write"); } -//czeka na username -async fn request_username(reader: &mut BufReader, - writer: &mut OwnedWriteHalf, - clients: &mut Arc>, -) -> Result { - let mut buffer = String::with_capacity(NAME_SIZE); - loop { - buffer.clear(); - match reader.read_line(&mut buffer).await { - Ok(_) => { - let mut name = String::from(buffer.trim()); //ucinanie '\n' - name.truncate(NAME_SIZE); //skracanie username - if clients.contains_key(name.as_str()) { - write_buf(writer, "Name already taken, please try another!").await; - } else { - return Ok(name); //ok - poprawny username - } - } - Err(_) => { return Err(String::from(ERROR_MSG)); } - } - } -} - -async fn receive_msg(reader: &mut BufReader, user: &str) -> Result { +async fn receive_msg(reader: &mut BufReader, user: &str) -> Result { let mut buffer = String::with_capacity(MESSAGE_SIZE); match reader.read_line(&mut buffer).await { Ok(_) => { if buffer.is_empty() { - return Err(String::from(ERROR_MSG)); - } - - let mut msg = format!("{} : {}", user, buffer.as_str()); - if msg.len() > MESSAGE_SIZE { - msg.truncate(MESSAGE_SIZE - 1); - msg.push('\n'); + return Err(anyhow!("Buffer empty")); } - Ok(msg) + Ok(buffer) } - Err(_) => { Err(String::from(ERROR_MSG)) } + Err(_) => { Err(anyhow!("Error reading message")) } } } async fn broadcast(clients: &mut Arc>, message: Message) { for mut entry in clients.iter_mut() { - //if entry.key() != sender { - //todo pisanie wiadomosci!! - write_buf(entry.value_mut(), "ff").await; - //} + write_buf(entry.value_mut(), serde_json::to_string(&message).unwrap().as_str()).await; } } -async fn handle_connection(reader: &mut BufReader, name: &str, sender: Sender<(String, String)>) +async fn handle_connection(reader: &mut BufReader, name: &str, sender: Sender)->Result<()> { loop { - let msg = receive_msg(reader, name).await; - match msg { + let msg = receive_msg(reader, name).await.unwrap(); + //todo tutaj powinno byc switch po typie wiadomosci od usera. + /*match msg { Ok(_) => { - if sender.send((msg.clone().unwrap(), name.parse().unwrap())).await.is_ok() {} else { + if sender.send(msg.clone()).await.is_ok() {} else { warn!("Message from {} could not be send to broadcast", &name); } } @@ -96,8 +63,10 @@ async fn handle_connection(reader: &mut BufReader, name: &str, se } break; } - } + }*/ + break; } + Ok(()) } struct Channel { @@ -106,56 +75,75 @@ struct Channel { users: Arc>, } -impl Channel { - pub fn new(sender: Arc>>, - receiver: Arc>>, - users: Arc>) -> Self { - Channel { - sender, - receiver, - users, - } + + +async fn read_username_and_channel(reader: &mut BufReader) -> Result { + let mut buffer = String::with_capacity(NAME_SIZE); + loop { + buffer.clear(); + + reader.read_line(&mut buffer).await.expect("TODO: panic message"); + + let mut message: Message = serde_json::from_str(buffer.as_str())?; + println!("Message hello: {:?}", message); + if let Message::Hello { + ref mut username, + channel + } = message { + if channel >= CHANNEL_COUNT + { return Err(anyhow!("Channel number too big")); } + } else { return Err(anyhow!("wrong message from user")); }; + return Ok(message); } } - -async fn manage_client(reader: OwnedReadHalf, mut writer:OwnedWriteHalf, channels: Arc>>) { +async fn manage_client(reader: OwnedReadHalf, mut writer: OwnedWriteHalf, channels: Arc>>) ->Result<()>{ let mut reader = BufReader::new(reader); //buffer czyta z socketa tcp od klienta - //let mut line = String::new(); - //write_buf(&mut writer, "Enter username \n").await; - let hello = Message::Hello { username: "".to_string(), channel: 0 }; - /*if let Ok(hello) = request_username(&mut reader, &mut writer, &mut clients_mut).await + info!("Client managing"); + if let Ok(Message::Hello {username, channel}) = read_username_and_channel(&mut reader).await { - clients_mut.insert(name.clone(), writer); - //todo wydzielic funkcje - //todo rozważac msg ChangeChannel - wtedy wysyłac mu historie i innym ze dołączył i reszcie że wyszedł - info!("Client {} has joined", name); //debug msg - - let arrival_msg = format!("User {} has joined. \n", name); - if sender.send((arrival_msg, name.clone())).is_ok() {} else { - info!("DEBUG: message from {} could not be broadcast", name); - } - - handle_connection(&mut reader, &name, sender).await; + info!("read ok"); + let mut channels_lock = channels.lock().await; + info!("locked mutex"); + let channel_r = channels_lock.deref_mut().get_mut(channel).unwrap(); + channel_r.users.insert(username.clone(), writer); + info!("Client {} has joined", username); //debug msg + let mut serialized = serde_json::to_string(&Message::Ok).unwrap(); + serialized.push('\n'); + channel_r.users.get_mut(username.as_str()).unwrap().value_mut().write(serialized.as_bytes()).await?; + let _ = channel_r.users.get_mut(username.as_str()).unwrap().value_mut().flush().await; + info!("sent ok"); + broadcast(&mut channel_r.users, Message::UserJoined {user:username.clone()}).await; + /*if sender.send(serde_json::to_string(&Message::UserJoined {user:username.clone()}).unwrap().as_bytes()).is_ok() {} else { + info!("DEBUG: message from {} could not be broadcast", username); + }*/ + + + //handle_connection(&mut reader, &name, sender).await; //koniec połączenia - clients_mut.remove(&name); - info!("Client {} has left the chat", name); - }*/ + channel_r.users.remove(username.as_str()); +drop(channels_lock); + info!("Client {} has left the chat", username.as_str()); + } + else { warn!("Wrong message from client") }/**/ + Ok(()) } -async fn manage_communication(channels_cp: Arc>>, i: usize){ +async fn manage_communication(channels_cp: Arc>>, i: usize) { loop { - let mut guard = channels_cp.lock().await; - let mut channel = guard.deref_mut().get_mut(i).unwrap(); - let mut receiver_ptr = Arc::clone(&channel.receiver); + let mut guard = channels_cp.lock().await; + let mut receiver_ptr = Arc::clone(&guard.deref_mut().get_mut(i).unwrap().receiver); + //let mut receiver_ptr = Arc::clone(&channel.receiver); let mut receiver_guard = receiver_ptr.lock().await; let receiver = receiver_guard.deref_mut(); + drop(guard); if let Some(msg) = receiver.recv().await { info!("Broadcasting message : {}","XD"); - broadcast(&mut channel.users, msg).await; + broadcast(&mut channels_cp.lock().await.deref_mut().get_mut(i).unwrap().users, msg).await; } - drop(guard); + //drop(guard); + drop(receiver_guard); } } @@ -165,7 +153,7 @@ async fn main() { let listener = TcpListener::bind(LOCAL).await.unwrap(); - let mut channels: Arc>> = Arc::new(Mutex::new(Vec::with_capacity(CHANNEL_COUNT))); + let channels: Arc>> = Arc::new(Mutex::new(Vec::with_capacity(CHANNEL_COUNT))); for _i in 0..CHANNEL_COUNT { let (sender, mut receiver) = mpsc::channel::(MAX_CLIENT_NUM); @@ -175,7 +163,7 @@ async fn main() { let channel = Channel { sender, receiver, users }; let mut guard = channels.lock().await; guard.push(channel); - + drop(guard); } @@ -183,7 +171,7 @@ async fn main() { let mut channels_cp = Arc::clone(&channels); tokio::spawn(async move { - manage_communication(channels_cp, i) + manage_communication(channels_cp, i).await }); } @@ -200,9 +188,7 @@ async fn main() { info!("Initializing new client. Waiting for username and channel number"); tokio::spawn(async move { - - manage_client(reader, writer, channels_cp); - + manage_client(reader, writer, channels_cp).await.unwrap(); }); // }; }/**/ diff --git a/chat_app/src/types.rs b/chat_app/src/types.rs index 85e7684..e90dcb5 100644 --- a/chat_app/src/types.rs +++ b/chat_app/src/types.rs @@ -10,6 +10,7 @@ pub struct User { pub id: Uuid, pub name: String, } + impl User { pub fn new(id: Uuid, name: &str) -> Self { User { @@ -20,10 +21,9 @@ impl User { } #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] - -struct Text{ - pub time : DateTime, - pub user :String, +struct Text { + pub time: DateTime, + pub user: String, pub text: String, } @@ -31,7 +31,7 @@ struct Text{ #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub enum Message { ClientMessage { - message:String + message: String }, Hello { username: String, @@ -47,12 +47,16 @@ pub enum Message { UsernameTaken, Ok, ChatFull, - BroadcastMessage{ - message:String, - user:String - } + BroadcastMessage { + message: String, + user: String, + }, + UserJoined { + user: String, + }, } -pub struct ServerMessage{ - user:User, + +pub struct ServerMessage { + user: User, text: String, } \ No newline at end of file From 25aa237cfe55106db8afa85194c68c349880fd43 Mon Sep 17 00:00:00 2001 From: jkarmowska Date: Wed, 22 Jun 2022 20:51:51 +0200 Subject: [PATCH 13/17] run clippy --- chat_app/src/bin/client.rs | 20 ++++++++++---------- chat_app/src/bin/server.rs | 22 +++++++++++----------- chat_app/src/types.rs | 6 +++--- 3 files changed, 24 insertions(+), 24 deletions(-) diff --git a/chat_app/src/bin/client.rs b/chat_app/src/bin/client.rs index c1e2904..397537e 100644 --- a/chat_app/src/bin/client.rs +++ b/chat_app/src/bin/client.rs @@ -1,5 +1,5 @@ use std::io::stdin; -use std::string; + use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter}; use tokio::net::{TcpStream}; use tokio::sync::mpsc; @@ -9,8 +9,8 @@ use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf}; use tokio::sync::mpsc::{Receiver, Sender}; use chat_app::types; use types::Message; -use serde::{Serialize, Deserialize}; -use serde_json; + + use std::string::String; @@ -49,9 +49,9 @@ async fn write_msg(mut writer: BufWriter, sender: Sender { - let mes: Message = serde_json::from_str(input.as_str()).unwrap(); + let _mes: Message = serde_json::from_str(input.as_str()).unwrap(); if input == *QUIT {//user wpisał quit - let quit_msg = input.clone(); + let _quit_msg = input.clone(); match sender.send(Message::Quit {}).await { //wysyłanie wiadomości o zakończeniu Ok(_) => {} Err(_) => { @@ -80,10 +80,10 @@ fn get_initial_data() -> Message { println!("Enter your username"); let mut username = String::new(); - let user_size = stdin().read_line(&mut username); + let _user_size = stdin().read_line(&mut username); let mut channel_str = String::new(); println!("Enter channel number(1 - 10)"); - let channel_size = stdin().read_line(&mut channel_str); + let _channel_size = stdin().read_line(&mut channel_str); let channel: usize = channel_str.trim().parse().unwrap(); let username = username.trim(); Message::Hello { username:username.to_string(), channel } @@ -95,13 +95,13 @@ async fn send_data(writer: &mut BufWriter, reader: &mut BufReade let hello = get_initial_data(); let mut serialized: String = serde_json::to_string(&hello).unwrap(); serialized.push('\n'); - let sent = writer.write(serialized.as_bytes()).await; + let _sent = writer.write(serialized.as_bytes()).await; let _ = writer.flush().await; let mut line = String::new(); info!("sent message to server"); match reader.read_line(&mut line).await { - Ok(mess) => { + Ok(_mess) => { let message = serde_json::from_str(line.as_str()); match message { Ok(Message::UsernameTaken { .. }) => { @@ -130,7 +130,7 @@ async fn main() { let mut reader = BufReader::new(read); let mut writer = BufWriter::new(write); - let (sender, mut receiver) = mpsc::channel::(10);//10 wiadomości + let (sender, receiver) = mpsc::channel::(10);//10 wiadomości send_data(&mut writer, &mut reader).await; info!("sent username and channel no to server"); diff --git a/chat_app/src/bin/server.rs b/chat_app/src/bin/server.rs index da26d9b..33ff6a0 100644 --- a/chat_app/src/bin/server.rs +++ b/chat_app/src/bin/server.rs @@ -25,7 +25,7 @@ async fn write_buf<'a>(writer: &mut OwnedWriteHalf, message: &str) { } -async fn receive_msg(reader: &mut BufReader, user: &str) -> Result { +async fn receive_msg(reader: &mut BufReader, _user: &str) -> Result { let mut buffer = String::with_capacity(MESSAGE_SIZE); match reader.read_line(&mut buffer).await { Ok(_) => { @@ -45,10 +45,10 @@ async fn broadcast(clients: &mut Arc>, message: } } -async fn handle_connection(reader: &mut BufReader, name: &str, sender: Sender)->Result<()> +async fn handle_connection(reader: &mut BufReader, name: &str, _sender: Sender)->Result<()> { loop { - let msg = receive_msg(reader, name).await.unwrap(); + let _msg = receive_msg(reader, name).await.unwrap(); //todo tutaj powinno byc switch po typie wiadomosci od usera. /*match msg { Ok(_) => { @@ -84,10 +84,10 @@ async fn read_username_and_channel(reader: &mut BufReader) -> Res reader.read_line(&mut buffer).await.expect("TODO: panic message"); - let mut message: Message = serde_json::from_str(buffer.as_str())?; + let message: Message = serde_json::from_str(buffer.as_str())?; println!("Message hello: {:?}", message); if let Message::Hello { - ref mut username, + username: _, channel } = message { if channel >= CHANNEL_COUNT @@ -97,7 +97,7 @@ async fn read_username_and_channel(reader: &mut BufReader) -> Res } } -async fn manage_client(reader: OwnedReadHalf, mut writer: OwnedWriteHalf, channels: Arc>>) ->Result<()>{ +async fn manage_client(reader: OwnedReadHalf, writer: OwnedWriteHalf, channels: Arc>>) ->Result<()>{ let mut reader = BufReader::new(reader); //buffer czyta z socketa tcp od klienta info!("Client managing"); if let Ok(Message::Hello {username, channel}) = read_username_and_channel(&mut reader).await @@ -133,7 +133,7 @@ drop(channels_lock); async fn manage_communication(channels_cp: Arc>>, i: usize) { loop { let mut guard = channels_cp.lock().await; - let mut receiver_ptr = Arc::clone(&guard.deref_mut().get_mut(i).unwrap().receiver); + let receiver_ptr = Arc::clone(&guard.deref_mut().get_mut(i).unwrap().receiver); //let mut receiver_ptr = Arc::clone(&channel.receiver); let mut receiver_guard = receiver_ptr.lock().await; let receiver = receiver_guard.deref_mut(); @@ -156,7 +156,7 @@ async fn main() { let channels: Arc>> = Arc::new(Mutex::new(Vec::with_capacity(CHANNEL_COUNT))); for _i in 0..CHANNEL_COUNT { - let (sender, mut receiver) = mpsc::channel::(MAX_CLIENT_NUM); + let (sender, receiver) = mpsc::channel::(MAX_CLIENT_NUM); let users: Arc> = Arc::new(DashMap::with_capacity(MAX_CLIENT_NUM));//klienci let sender = Arc::new(Mutex::new(sender)); let receiver = Arc::new(Mutex::new(receiver)); @@ -168,7 +168,7 @@ async fn main() { for i in 0..CHANNEL_COUNT { - let mut channels_cp = Arc::clone(&channels); + let channels_cp = Arc::clone(&channels); tokio::spawn(async move { manage_communication(channels_cp, i).await @@ -180,9 +180,9 @@ async fn main() { loop { let (socket, addr) = listener.accept().await.unwrap(); info!("Incoming connection from {}", addr); - let (reader, mut writer) = socket.into_split(); + let (reader, writer) = socket.into_split(); - let mut channels_cp = Arc::clone(&channels); + let channels_cp = Arc::clone(&channels); info!("Initializing new client. Waiting for username and channel number"); diff --git a/chat_app/src/types.rs b/chat_app/src/types.rs index e90dcb5..6f41dfa 100644 --- a/chat_app/src/types.rs +++ b/chat_app/src/types.rs @@ -1,9 +1,9 @@ -use std::cmp::Ordering; -use std::collections::{BTreeMap, BTreeSet, HashMap}; + +use std::collections::{BTreeMap}; use uuid::Uuid; use chrono::{DateTime, Utc}; use serde::{Serialize, Deserialize}; -use serde_json; + #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct User { From 56e521869bc072429a238d6d7d5252feee20dc38 Mon Sep 17 00:00:00 2001 From: jkarmowska Date: Wed, 22 Jun 2022 21:20:07 +0200 Subject: [PATCH 14/17] change server confirmation --- chat_app/src/bin/client.rs | 33 +++++++++++---------------------- chat_app/src/bin/server.rs | 36 ++++++++++++++---------------------- 2 files changed, 25 insertions(+), 44 deletions(-) diff --git a/chat_app/src/bin/client.rs b/chat_app/src/bin/client.rs index 397537e..a3b4080 100644 --- a/chat_app/src/bin/client.rs +++ b/chat_app/src/bin/client.rs @@ -23,6 +23,7 @@ async fn read_msg(mut reader: BufReader, mut receiver: Receiver, sender: Sender {} Err(_) => { warn!("Error on sending exit"); @@ -60,7 +61,9 @@ async fn write_msg(mut writer: BufWriter, sender: Sender { writer.flush().await.expect("Failed to flush buffer") } Err(_) => { @@ -86,7 +89,7 @@ fn get_initial_data() -> Message { let _channel_size = stdin().read_line(&mut channel_str); let channel: usize = channel_str.trim().parse().unwrap(); let username = username.trim(); - Message::Hello { username:username.to_string(), channel } + Message::Hello { username: username.to_string(), channel } } @@ -104,14 +107,17 @@ async fn send_data(writer: &mut BufWriter, reader: &mut BufReade Ok(_mess) => { let message = serde_json::from_str(line.as_str()); match message { - Ok(Message::UsernameTaken { .. }) => { + Ok(Message::UsernameTaken) => { println!("Username taken. Enter data again"); } Ok(Message::Ok { .. }) => { info!("received ok message from server"); break; } - _ => error!("Unexpected message from server(message): {} !", serde_json::to_string(&message.unwrap()).unwrap()), + Ok(Message::ChatFull) => { + println!("Chat is full. try again later!"); + } + _ => error!("Unexpected message from server (message): {} !", serde_json::to_string(&message.unwrap()).unwrap()), } } Err(_) => { error!("Error on receiving from server(ERR)") } @@ -134,23 +140,6 @@ async fn main() { send_data(&mut writer, &mut reader).await; info!("sent username and channel no to server"); - let mut line = String::new(); - match reader.read_line(&mut line).await { - Ok(_) => { - match serde_json::from_str(line.as_str()) { - Ok(Message::Ok {}) => {} - Ok(Message::ChatFull {}) => { - println!("Chat is full. try again later!"); - return; - } - _ => error!("error deserializing message") - } - } - Err(_) => { - error!("error on reading from server"); - return; - } - }; tokio::spawn(async move { read_msg(reader, receiver).await; diff --git a/chat_app/src/bin/server.rs b/chat_app/src/bin/server.rs index 33ff6a0..4596c83 100644 --- a/chat_app/src/bin/server.rs +++ b/chat_app/src/bin/server.rs @@ -1,4 +1,4 @@ -use std::ops::{ DerefMut}; +use std::ops::{DerefMut}; use std::sync::{Arc}; use tokio::net::{TcpListener}; use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; @@ -8,7 +8,7 @@ use dashmap::DashMap; use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf}; use tokio::sync::mpsc::{Receiver, Sender}; -use log::{info, warn, }; +use log::{info, warn}; use simple_logger::SimpleLogger; use chat_app::types::Message; @@ -45,7 +45,7 @@ async fn broadcast(clients: &mut Arc>, message: } } -async fn handle_connection(reader: &mut BufReader, name: &str, _sender: Sender)->Result<()> +async fn handle_connection(reader: &mut BufReader, name: &str, _sender: Sender) -> Result<()> { loop { let _msg = receive_msg(reader, name).await.unwrap(); @@ -76,7 +76,6 @@ struct Channel { } - async fn read_username_and_channel(reader: &mut BufReader) -> Result { let mut buffer = String::with_capacity(NAME_SIZE); loop { @@ -88,7 +87,7 @@ async fn read_username_and_channel(reader: &mut BufReader) -> Res println!("Message hello: {:?}", message); if let Message::Hello { username: _, - channel + channel } = message { if channel >= CHANNEL_COUNT { return Err(anyhow!("Channel number too big")); } @@ -97,35 +96,28 @@ async fn read_username_and_channel(reader: &mut BufReader) -> Res } } -async fn manage_client(reader: OwnedReadHalf, writer: OwnedWriteHalf, channels: Arc>>) ->Result<()>{ +async fn manage_client(reader: OwnedReadHalf, writer: OwnedWriteHalf, channels: Arc>>) -> Result<()> { let mut reader = BufReader::new(reader); //buffer czyta z socketa tcp od klienta - info!("Client managing"); - if let Ok(Message::Hello {username, channel}) = read_username_and_channel(&mut reader).await + if let Ok(Message::Hello { username, channel }) = read_username_and_channel(&mut reader).await { - info!("read ok"); - let mut channels_lock = channels.lock().await; - info!("locked mutex"); + let mut channels_lock = channels.lock().await; let channel_r = channels_lock.deref_mut().get_mut(channel).unwrap(); channel_r.users.insert(username.clone(), writer); - info!("Client {} has joined", username); //debug msg + info!("Client {} has joined", username); let mut serialized = serde_json::to_string(&Message::Ok).unwrap(); serialized.push('\n'); channel_r.users.get_mut(username.as_str()).unwrap().value_mut().write(serialized.as_bytes()).await?; let _ = channel_r.users.get_mut(username.as_str()).unwrap().value_mut().flush().await; info!("sent ok"); - broadcast(&mut channel_r.users, Message::UserJoined {user:username.clone()}).await; - /*if sender.send(serde_json::to_string(&Message::UserJoined {user:username.clone()}).unwrap().as_bytes()).is_ok() {} else { - info!("DEBUG: message from {} could not be broadcast", username); - }*/ - - - //handle_connection(&mut reader, &name, sender).await; + //todo user musi odbierac userjoined po tym jak dolaczy + broadcast(&mut channel_r.users, Message::UserJoined { user: username.clone() }).await; + //todo!!!!!!!!!! + //handle_connection(&mut reader, &name, sender).await?; //koniec połączenia channel_r.users.remove(username.as_str()); -drop(channels_lock); + drop(channels_lock); info!("Client {} has left the chat", username.as_str()); - } - else { warn!("Wrong message from client") }/**/ + } else { warn!("Wrong message from client") }/**/ Ok(()) } From dd43389bf13d09746f02e54b19b2b6f7f14f2e52 Mon Sep 17 00:00:00 2001 From: jkarmowska Date: Sat, 25 Jun 2022 17:30:14 +0200 Subject: [PATCH 15/17] add multiple channels --- chat_app/src/bin/client.rs | 80 +++++++++++++++++++++------------ chat_app/src/bin/server.rs | 90 +++++++++++++++++++++----------------- chat_app/src/types.rs | 35 ++------------- 3 files changed, 106 insertions(+), 99 deletions(-) diff --git a/chat_app/src/bin/client.rs b/chat_app/src/bin/client.rs index a3b4080..2d1c606 100644 --- a/chat_app/src/bin/client.rs +++ b/chat_app/src/bin/client.rs @@ -9,19 +9,19 @@ use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf}; use tokio::sync::mpsc::{Receiver, Sender}; use chat_app::types; use types::Message; - - use std::string::String; const SERVER: &str = "localhost:8080"; const QUIT: &str = "/quit\n"; +const CHANNEL_COUNT: usize = 10; +const MESSAGE_COUNT: usize = 1000; async fn read_msg(mut reader: BufReader, mut receiver: Receiver) { let mut line = String::new(); loop { line.clear(); - + info!("waiting for msg from server!"); let received = reader.read_line(&mut line).await; info!("received msg from server"); if let Ok(_received_msg) = receiver.try_recv() { //wiadomość od drugiego taska o zakończeniu @@ -34,7 +34,11 @@ async fn read_msg(mut reader: BufReader, mut receiver: Receiver println!("{} : {}", user, message), + Message::UserJoined { user } => println!("User {} joined!", user), + _ => {} + } } Err(_) => { info!("Connection lost! type {} to quit", QUIT.trim()); @@ -47,28 +51,42 @@ async fn write_msg(mut writer: BufWriter, sender: Sender { - let _mes: Message = serde_json::from_str(input.as_str()).unwrap(); - if input == *QUIT {//user wpisał quit - let _quit_msg = input.clone(); + if input == *QUIT { match sender.send(Message::Quit).await { //wysyłanie wiadomości o zakończeniu Ok(_) => {} Err(_) => { warn!("Error on sending exit"); } } + let mut serialized = serde_json::to_string(&Message::Quit).unwrap(); + serialized.push('\n'); + let res = writer.write(serialized.as_bytes()).await; + match res { + Ok(_) => { writer.flush().await.expect("Failed to flush buffer") } + Err(_) => { + warn!("Connection lost! type {} to quit", QUIT.trim()); + break; + } + } break; - } - let mut serialized = serde_json::to_string(&Message::ClientMessage {message: input.trim().parse().unwrap() }).unwrap(); - serialized.push('\n'); - let res = writer.write(serialized.as_bytes()).await; //wysyłanie do servera wiadomości - match res { - Ok(_) => { writer.flush().await.expect("Failed to flush buffer") } - Err(_) => { - warn!("Connection lost! type {} to quit", QUIT.trim()); - break; + } else { + let mut serialized = serde_json::to_string(&Message::ClientMessage { message: input.trim().parse().unwrap() }).unwrap(); + serialized.push('\n'); + let res = writer.write(serialized.as_bytes()).await; //wysyłanie do servera wiadomości + match res { + Ok(_) => { + writer.flush().await.expect("Failed to flush buffer"); + info!("Sent message to server"); + } + Err(_) => { + warn!("Connection lost! type {} to quit", QUIT.trim()); + break; + } } } } @@ -84,12 +102,22 @@ fn get_initial_data() -> Message { let mut username = String::new(); let _user_size = stdin().read_line(&mut username); - let mut channel_str = String::new(); - println!("Enter channel number(1 - 10)"); - let _channel_size = stdin().read_line(&mut channel_str); - let channel: usize = channel_str.trim().parse().unwrap(); + let mut channel_res: usize = 0; + + loop { + println!("Enter channel number (1 - {})", CHANNEL_COUNT); + let mut channel_str = String::new(); + let _channel_size = stdin().read_line(&mut channel_str); + if let Ok(channel) = channel_str.trim().parse::() { + channel_res = channel + } + if channel_res < 1 || channel_res > CHANNEL_COUNT { + println!("Wrong channel number!"); + } else { break; } + } + let username = username.trim(); - Message::Hello { username: username.to_string(), channel } + Message::Hello { username: username.to_string(), channel: channel_res } } @@ -101,7 +129,6 @@ async fn send_data(writer: &mut BufWriter, reader: &mut BufReade let _sent = writer.write(serialized.as_bytes()).await; let _ = writer.flush().await; let mut line = String::new(); - info!("sent message to server"); match reader.read_line(&mut line).await { Ok(_mess) => { @@ -111,7 +138,7 @@ async fn send_data(writer: &mut BufWriter, reader: &mut BufReade println!("Username taken. Enter data again"); } Ok(Message::Ok { .. }) => { - info!("received ok message from server"); + println!("Welcome to the chat!"); break; } Ok(Message::ChatFull) => { @@ -136,10 +163,8 @@ async fn main() { let mut reader = BufReader::new(read); let mut writer = BufWriter::new(write); - let (sender, receiver) = mpsc::channel::(10);//10 wiadomości - send_data(&mut writer, &mut reader).await; - info!("sent username and channel no to server"); + let (sender, receiver) = mpsc::channel::(MESSAGE_COUNT); tokio::spawn(async move { read_msg(reader, receiver).await; @@ -148,4 +173,5 @@ async fn main() { tokio::spawn(async move { write_msg(writer, sender).await; }); + info!("main"); } \ No newline at end of file diff --git a/chat_app/src/bin/server.rs b/chat_app/src/bin/server.rs index 4596c83..88e5f3f 100644 --- a/chat_app/src/bin/server.rs +++ b/chat_app/src/bin/server.rs @@ -19,9 +19,11 @@ const NAME_SIZE: usize = 30; const MESSAGE_SIZE: usize = 256; const CHANNEL_COUNT: usize = 10; -async fn write_buf<'a>(writer: &mut OwnedWriteHalf, message: &str) { - writer.write(message.as_bytes()).await.expect("Failed to write msg"); - writer.flush().await.expect("Failed to flush after write"); +async fn write_buf<'a>(writer: &mut OwnedWriteHalf, message: &str) -> Result<()> { + writer.write(message.as_bytes()).await?; + writer.flush().await?; + info!("flushed message"); + Ok(()) } @@ -32,39 +34,52 @@ async fn receive_msg(reader: &mut BufReader, _user: &str) -> Resu if buffer.is_empty() { return Err(anyhow!("Buffer empty")); } - + info!("received message from client"); Ok(buffer) } - Err(_) => { Err(anyhow!("Error reading message")) } + Err(_) => { + Err(anyhow!("Error reading message")) + } } } -async fn broadcast(clients: &mut Arc>, message: Message) { +async fn broadcast(clients: &mut Arc>, message: Message) -> Result<()> { for mut entry in clients.iter_mut() { - write_buf(entry.value_mut(), serde_json::to_string(&message).unwrap().as_str()).await; + write_buf(entry.value_mut(), serde_json::to_string(&message).unwrap().as_str()).await?; } + Ok(()) } -async fn handle_connection(reader: &mut BufReader, name: &str, _sender: Sender) -> Result<()> +async fn handle_connection(reader: &mut BufReader, name: &str, channel_num: usize, channels: Arc>>) -> Result<()> { loop { - let _msg = receive_msg(reader, name).await.unwrap(); - //todo tutaj powinno byc switch po typie wiadomosci od usera. - /*match msg { - Ok(_) => { - if sender.send(msg.clone()).await.is_ok() {} else { - warn!("Message from {} could not be send to broadcast", &name); - } + let msg = receive_msg(reader, name).await?; + + match serde_json::from_str(msg.as_str())? { + Message::ClientMessage { message } => { + let mut guard = channels.lock().await; + let channel = guard.get_mut(channel_num).unwrap(); + let sender_guard = channel.sender.lock().await; + let message = Message::BroadcastMessage { message, user: name.to_string() }; + sender_guard.send(message).await?; + info!("Broadcast client message"); + drop(sender_guard); + drop(guard); } - Err(_) => { - let err_msg = format!("{} has left the chat", &name); - if sender.send((err_msg, name.parse().unwrap())).await.is_ok() {} else { - warn!("Exit message from {} could not be send to broadcast", &name); - } + Message::Quit => { + let mut guard = channels.lock().await; + let channel = guard.get_mut(channel_num).unwrap(); + let sender_guard = channel.sender.lock().await; + let message = Message::UserQuit { user: name.to_string() }; + sender_guard.send(message).await?; + drop(sender_guard); + drop(guard); break; } - }*/ - break; + _ => { + break; + } + } } Ok(()) } @@ -80,11 +95,10 @@ async fn read_username_and_channel(reader: &mut BufReader) -> Res let mut buffer = String::with_capacity(NAME_SIZE); loop { buffer.clear(); - reader.read_line(&mut buffer).await.expect("TODO: panic message"); let message: Message = serde_json::from_str(buffer.as_str())?; - println!("Message hello: {:?}", message); + if let Message::Hello { username: _, channel @@ -108,33 +122,31 @@ async fn manage_client(reader: OwnedReadHalf, writer: OwnedWriteHalf, channels: serialized.push('\n'); channel_r.users.get_mut(username.as_str()).unwrap().value_mut().write(serialized.as_bytes()).await?; let _ = channel_r.users.get_mut(username.as_str()).unwrap().value_mut().flush().await; - info!("sent ok"); - //todo user musi odbierac userjoined po tym jak dolaczy - broadcast(&mut channel_r.users, Message::UserJoined { user: username.clone() }).await; - //todo!!!!!!!!!! - //handle_connection(&mut reader, &name, sender).await?; - //koniec połączenia - channel_r.users.remove(username.as_str()); + broadcast(&mut channel_r.users, Message::UserJoined { user: username.clone() }).await?; drop(channels_lock); + + info!("Broadcast user joined"); + handle_connection(&mut reader, &username, channel, Arc::clone(&channels)).await?; + + //koniec połączenia + //channel_r.users.remove(username.as_str()); info!("Client {} has left the chat", username.as_str()); } else { warn!("Wrong message from client") }/**/ Ok(()) } -async fn manage_communication(channels_cp: Arc>>, i: usize) { +async fn manage_communication(channels_cp: Arc>>, i: usize) -> Result<()> { loop { let mut guard = channels_cp.lock().await; let receiver_ptr = Arc::clone(&guard.deref_mut().get_mut(i).unwrap().receiver); - //let mut receiver_ptr = Arc::clone(&channel.receiver); let mut receiver_guard = receiver_ptr.lock().await; let receiver = receiver_guard.deref_mut(); drop(guard); if let Some(msg) = receiver.recv().await { - info!("Broadcasting message : {}","XD"); - broadcast(&mut channels_cp.lock().await.deref_mut().get_mut(i).unwrap().users, msg).await; + info!("Broadcasting message to channel {}",i); + broadcast(&mut channels_cp.lock().await.deref_mut().get_mut(i).unwrap().users, msg).await?; } - //drop(guard); drop(receiver_guard); } } @@ -149,7 +161,7 @@ async fn main() { for _i in 0..CHANNEL_COUNT { let (sender, receiver) = mpsc::channel::(MAX_CLIENT_NUM); - let users: Arc> = Arc::new(DashMap::with_capacity(MAX_CLIENT_NUM));//klienci + let users: Arc> = Arc::new(DashMap::with_capacity(MAX_CLIENT_NUM)); let sender = Arc::new(Mutex::new(sender)); let receiver = Arc::new(Mutex::new(receiver)); let channel = Channel { sender, receiver, users }; @@ -168,7 +180,6 @@ async fn main() { } - //nowy klient loop { let (socket, addr) = listener.accept().await.unwrap(); info!("Incoming connection from {}", addr); @@ -182,6 +193,5 @@ async fn main() { tokio::spawn(async move { manage_client(reader, writer, channels_cp).await.unwrap(); }); - // }; - }/**/ + } } diff --git a/chat_app/src/types.rs b/chat_app/src/types.rs index 6f41dfa..330500f 100644 --- a/chat_app/src/types.rs +++ b/chat_app/src/types.rs @@ -1,33 +1,6 @@ - use std::collections::{BTreeMap}; -use uuid::Uuid; -use chrono::{DateTime, Utc}; use serde::{Serialize, Deserialize}; - -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] -pub struct User { - pub id: Uuid, - pub name: String, -} - -impl User { - pub fn new(id: Uuid, name: &str) -> Self { - User { - id, - name: String::from(name), - } - } -} - -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] -struct Text { - pub time: DateTime, - pub user: String, - pub text: String, -} - - #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub enum Message { ClientMessage { @@ -41,6 +14,9 @@ pub enum Message { messages: BTreeMap, }, Quit, + UserQuit { + user: String, + }, SwitchChannel { new_channel: usize, }, @@ -55,8 +31,3 @@ pub enum Message { user: String, }, } - -pub struct ServerMessage { - user: User, - text: String, -} \ No newline at end of file From 37feaef93d9ed3aaab739bd24cdeabbe2f3392ad Mon Sep 17 00:00:00 2001 From: jkarmowska Date: Sat, 25 Jun 2022 17:34:54 +0200 Subject: [PATCH 16/17] run clippy --- chat_app/src/bin/client.rs | 2 +- chat_app/src/bin/server.rs | 28 +++++++++++++--------------- 2 files changed, 14 insertions(+), 16 deletions(-) diff --git a/chat_app/src/bin/client.rs b/chat_app/src/bin/client.rs index 2d1c606..a01816e 100644 --- a/chat_app/src/bin/client.rs +++ b/chat_app/src/bin/client.rs @@ -111,7 +111,7 @@ fn get_initial_data() -> Message { if let Ok(channel) = channel_str.trim().parse::() { channel_res = channel } - if channel_res < 1 || channel_res > CHANNEL_COUNT { + if !(1..=CHANNEL_COUNT).contains(&channel_res) { println!("Wrong channel number!"); } else { break; } } diff --git a/chat_app/src/bin/server.rs b/chat_app/src/bin/server.rs index 88e5f3f..2f7bf61 100644 --- a/chat_app/src/bin/server.rs +++ b/chat_app/src/bin/server.rs @@ -93,21 +93,19 @@ struct Channel { async fn read_username_and_channel(reader: &mut BufReader) -> Result { let mut buffer = String::with_capacity(NAME_SIZE); - loop { - buffer.clear(); - reader.read_line(&mut buffer).await.expect("TODO: panic message"); - - let message: Message = serde_json::from_str(buffer.as_str())?; - - if let Message::Hello { - username: _, - channel - } = message { - if channel >= CHANNEL_COUNT - { return Err(anyhow!("Channel number too big")); } - } else { return Err(anyhow!("wrong message from user")); }; - return Ok(message); - } + buffer.clear(); + reader.read_line(&mut buffer).await.expect("TODO: panic message"); + + let message: Message = serde_json::from_str(buffer.as_str())?; + + if let Message::Hello { + username: _, + channel + } = message { + if channel >= CHANNEL_COUNT + { return Err(anyhow!("Channel number too big")); } + } else { return Err(anyhow!("wrong message from user")); }; + Ok(message) } async fn manage_client(reader: OwnedReadHalf, writer: OwnedWriteHalf, channels: Arc>>) -> Result<()> { From 1fdcaa2c6a59b96be4a6f84c78237e8feb0ef248 Mon Sep 17 00:00:00 2001 From: jkarmowska Date: Sun, 26 Jun 2022 10:20:05 +0200 Subject: [PATCH 17/17] add endline after sending message --- chat_app/src/bin/client.rs | 3 ++- chat_app/src/bin/server.rs | 13 +++++-------- 2 files changed, 7 insertions(+), 9 deletions(-) diff --git a/chat_app/src/bin/client.rs b/chat_app/src/bin/client.rs index a01816e..f0b4592 100644 --- a/chat_app/src/bin/client.rs +++ b/chat_app/src/bin/client.rs @@ -34,7 +34,8 @@ async fn read_msg(mut reader: BufReader, mut receiver: Receiver println!("{} : {}", user, message), Message::UserJoined { user } => println!("User {} joined!", user), _ => {} diff --git a/chat_app/src/bin/server.rs b/chat_app/src/bin/server.rs index 2f7bf61..3e67466 100644 --- a/chat_app/src/bin/server.rs +++ b/chat_app/src/bin/server.rs @@ -21,8 +21,8 @@ const CHANNEL_COUNT: usize = 10; async fn write_buf<'a>(writer: &mut OwnedWriteHalf, message: &str) -> Result<()> { writer.write(message.as_bytes()).await?; + writer.write("\n".as_bytes()).await?; writer.flush().await?; - info!("flushed message"); Ok(()) } @@ -63,8 +63,6 @@ async fn handle_connection(reader: &mut BufReader, name: &str, ch let message = Message::BroadcastMessage { message, user: name.to_string() }; sender_guard.send(message).await?; info!("Broadcast client message"); - drop(sender_guard); - drop(guard); } Message::Quit => { let mut guard = channels.lock().await; @@ -72,8 +70,6 @@ async fn handle_connection(reader: &mut BufReader, name: &str, ch let sender_guard = channel.sender.lock().await; let message = Message::UserQuit { user: name.to_string() }; sender_guard.send(message).await?; - drop(sender_guard); - drop(guard); break; } _ => { @@ -94,7 +90,7 @@ struct Channel { async fn read_username_and_channel(reader: &mut BufReader) -> Result { let mut buffer = String::with_capacity(NAME_SIZE); buffer.clear(); - reader.read_line(&mut buffer).await.expect("TODO: panic message"); + reader.read_line(&mut buffer).await?; let message: Message = serde_json::from_str(buffer.as_str())?; @@ -127,7 +123,9 @@ async fn manage_client(reader: OwnedReadHalf, writer: OwnedWriteHalf, channels: handle_connection(&mut reader, &username, channel, Arc::clone(&channels)).await?; //koniec połączenia - //channel_r.users.remove(username.as_str()); + let mut channels_lock = channels.lock().await; + let channel_r = channels_lock.deref_mut().get_mut(channel).unwrap(); + channel_r.users.remove(username.as_str()); info!("Client {} has left the chat", username.as_str()); } else { warn!("Wrong message from client") }/**/ Ok(()) @@ -145,7 +143,6 @@ async fn manage_communication(channels_cp: Arc>>, i: usize) - info!("Broadcasting message to channel {}",i); broadcast(&mut channels_cp.lock().await.deref_mut().get_mut(i).unwrap().users, msg).await?; } - drop(receiver_guard); } }