From 457ea78d6253c27f440a104691c21b1e6b6e2b51 Mon Sep 17 00:00:00 2001 From: ben kornmeier Date: Tue, 21 Jul 2020 20:19:38 -0600 Subject: [PATCH 1/3] going back to vec --- src/packet.rs | 18 +++++++++--------- src/server.rs | 22 +++++++++++----------- 2 files changed, 20 insertions(+), 20 deletions(-) diff --git a/src/packet.rs b/src/packet.rs index 65dc35a..4dec6d6 100644 --- a/src/packet.rs +++ b/src/packet.rs @@ -2,18 +2,18 @@ use serde::{Deserialize, Serialize}; pub trait AsIpcPacket { fn timestamp(&self) -> &std::time::SystemTime; - fn data(&self) -> &[u8]; + fn data(&self) -> Vec; } #[derive(Debug, Deserialize, Serialize)] -pub struct IpcPacket<'a> { +pub struct IpcPacket { timestamp: std::time::SystemTime, #[serde(with = "serde_bytes")] - data: &'a [u8], + data: Vec, } -impl<'a, T: AsIpcPacket> From<&'a T> for IpcPacket<'a> { - fn from(v: &'a T) -> Self { +impl From<&T> for IpcPacket { + fn from(v: &T) -> Self { IpcPacket { timestamp: v.timestamp().clone(), data: v.data(), @@ -21,8 +21,8 @@ impl<'a, T: AsIpcPacket> From<&'a T> for IpcPacket<'a> { } } -impl<'a> From> for Packet { - fn from(v: IpcPacket<'a>) -> Self { +impl From for Packet { + fn from(v: IpcPacket) -> Self { Packet { ts: v.timestamp.clone(), data: v.data.to_vec(), @@ -50,8 +50,8 @@ impl AsIpcPacket for Packet { fn timestamp(&self) -> &std::time::SystemTime { &self.ts } - fn data(&self) -> &[u8] { - self.data.as_ref() + fn data(&self) -> Vec { + self.data.clone() } } diff --git a/src/server.rs b/src/server.rs index f33c9a4..f2a6c4f 100644 --- a/src/server.rs +++ b/src/server.rs @@ -4,20 +4,20 @@ use crate::packet::{AsIpcPacket, IpcPacket}; use ipc_channel::ipc::{IpcOneShotServer, IpcSender}; use log::*; -pub type SenderMessage<'a> = Option>>; -pub type Sender<'a> = IpcSender>; +pub type SenderMessage = Option>; +pub type Sender = IpcSender; -pub struct Server<'a> { - server: IpcOneShotServer>, +pub struct Server { + server: IpcOneShotServer, name: String, } -impl<'a> Server<'a> { +impl Server { pub fn name(&self) -> &String { &self.name } - pub fn new() -> Result, Error> { + pub fn new() -> Result { let (server, server_name) = IpcOneShotServer::new().map_err(Error::Io)?; Ok(Server { @@ -26,7 +26,7 @@ impl<'a> Server<'a> { }) } - pub fn accept(self) -> Result, Error> { + pub fn accept(self) -> Result { let (_, tx) = self.server.accept().map_err(Error::Bincode)?; info!("Accepted connection from {:?}", tx); @@ -35,12 +35,12 @@ impl<'a> Server<'a> { } } -pub struct ConnectedIpc<'a> { - connection: Sender<'a>, +pub struct ConnectedIpc { + connection: Sender, } -impl<'a> ConnectedIpc<'a> { - pub fn send(&'a self, packets: &'a [T]) -> Result<(), Error> { +impl ConnectedIpc { + pub fn send(&self, packets: Vec) -> Result<(), Error> { let ipc_packets: Vec<_> = packets.iter().map(IpcPacket::from).collect(); self.connection.send(Some(ipc_packets)).map_err(|e| { error!("Failed to send {:?}", e); From dae894eab13602c170724b135fbd2ee2c056d454 Mon Sep 17 00:00:00 2001 From: ben kornmeier Date: Wed, 22 Jul 2020 10:38:54 -0600 Subject: [PATCH 2/3] oof not sure if we want to include this code now that it has a mutex in it... maybe a seperate type of connection? --- Cargo.toml | 1 + src/errors.rs | 2 ++ src/server.rs | 30 +++++++++++++++++++++++------- 3 files changed, 26 insertions(+), 7 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 821c392..6ab2a25 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,6 +17,7 @@ documentation = "https://docs.rs/packet-ipc/" repository = "https://github.com/protectwise/packet-ipc" [dependencies] +blocking = "0.5" bincode = "1" crossbeam-channel = "0.3" ipc-channel = "0.14" diff --git a/src/errors.rs b/src/errors.rs index 6986cdf..9db5ce1 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -12,6 +12,8 @@ pub enum Error { Bincode(#[from] bincode::Error), #[error("Error receiving: {0:?}")] Recv(#[from] crossbeam_channel::RecvError), + #[error("Mutex was poisoned")] + Mutex(), } unsafe impl Sync for Error {} diff --git a/src/server.rs b/src/server.rs index f2a6c4f..a496656 100644 --- a/src/server.rs +++ b/src/server.rs @@ -3,6 +3,7 @@ use crate::errors::Error; use crate::packet::{AsIpcPacket, IpcPacket}; use ipc_channel::ipc::{IpcOneShotServer, IpcSender}; use log::*; +use std::sync::{Arc, Mutex}; pub type SenderMessage = Option>; pub type Sender = IpcSender; @@ -31,25 +32,40 @@ impl Server { info!("Accepted connection from {:?}", tx); + let tx = Arc::new(Mutex::new(tx)); + Ok(ConnectedIpc { connection: tx }) } } + pub struct ConnectedIpc { - connection: Sender, + connection: Arc>, } impl ConnectedIpc { - pub fn send(&self, packets: Vec) -> Result<(), Error> { + pub async fn send(&self, packets:Vec) -> Result<(), Error> { let ipc_packets: Vec<_> = packets.iter().map(IpcPacket::from).collect(); - self.connection.send(Some(ipc_packets)).map_err(|e| { - error!("Failed to send {:?}", e); - Error::Bincode(e) - }) + Self::internal_send(Arc::clone(&self.connection), ipc_packets).await + } + + async fn internal_send(sender: Arc>, ipc_packets: Vec) -> Result<(), Error> { + blocking::Unblock::new(()).with_mut(move |_| { + let sender = Arc::clone(&sender); + let sender = sender.lock().map_err(|_| Error::Mutex())?; + sender.send(Some(ipc_packets)).map_err(|e| { + error!("Failed to send {:?}", e); + Error::Bincode(e) + }); + Ok(()) + }).await + } pub fn close(&mut self) -> Result<(), Error> { - self.connection.send(None).map_err(Error::Bincode)?; + let connection = Arc::clone(&self.connection); + let connection = connection.lock().map_err(|_| Error::Mutex())?; + connection.send(None).map_err(Error::Bincode)?; Ok(()) } } From 99a37401e84bcac47a63b81df3320434fc5bcd58 Mon Sep 17 00:00:00 2001 From: ben kornmeier Date: Wed, 22 Jul 2020 10:57:31 -0600 Subject: [PATCH 3/3] fix tests --- Cargo.toml | 1 + src/server.rs | 3 +-- tests/integration_test.rs | 13 +++++++------ 3 files changed, 9 insertions(+), 8 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 6ab2a25..088a669 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,3 +28,4 @@ thiserror = "1" [dev-dependencies] env_logger = "0.7" +tokio-test = "0.2" diff --git a/src/server.rs b/src/server.rs index a496656..e144dad 100644 --- a/src/server.rs +++ b/src/server.rs @@ -56,8 +56,7 @@ impl ConnectedIpc { sender.send(Some(ipc_packets)).map_err(|e| { error!("Failed to send {:?}", e); Error::Bincode(e) - }); - Ok(()) + }) }).await } diff --git a/tests/integration_test.rs b/tests/integration_test.rs index 4d60a54..ec03bbe 100644 --- a/tests/integration_test.rs +++ b/tests/integration_test.rs @@ -1,4 +1,5 @@ use packet_ipc::{AsIpcPacket, Client, Error, IpcPacket, Packet, Server}; +use tokio_test::block_on; #[test] fn test_roundtrip() { @@ -45,9 +46,9 @@ fn test_packet_receive() { }); let mut server_tx = server.accept().expect("Failed to accept connection"); - + block_on( server_tx - .send(&vec![Packet::new(std::time::SystemTime::now(), vec![3u8])]) + .send(vec![Packet::new(std::time::SystemTime::now(), vec![3u8])])) .expect("Failed to send"); server_tx.close().expect("Failed to close"); @@ -86,11 +87,11 @@ fn test_multiple_packet_receive() { let mut server_tx = server.accept().expect("Failed to accept connection"); - server_tx - .send(&vec![Packet::new(std::time::SystemTime::now(), vec![3u8])]) + block_on(server_tx + .send(vec![Packet::new(std::time::SystemTime::now(), vec![3u8])])) .expect("Failed to send"); - server_tx - .send(&vec![Packet::new(std::time::SystemTime::now(), vec![4u8])]) + block_on(server_tx + .send(vec![Packet::new(std::time::SystemTime::now(), vec![4u8])])) .expect("Failed to send"); server_tx.close().expect("Failed to close");