kwasm-net fixes and optimizations.

This commit is contained in:
Heyang Zhou 2019-05-13 10:15:03 -07:00
parent 3bcdfb4263
commit 884a7e1713
2 changed files with 51 additions and 44 deletions

View File

@ -1,7 +1,6 @@
#![feature(wasi_ext)]
use kwasm_net::{Epoll, Tcp4Listener, TcpStream, schedule};
use std::io::{Read, Write};
use std::sync::Arc;
fn do_echo(stream: Arc<TcpStream>, buf: Vec<u8>) {
@ -40,7 +39,7 @@ fn main() {
listener.accept_async(epoll.clone(), |stream| {
match stream {
Ok(stream) => {
do_echo(stream, Vec::with_capacity(4096 * 4));
do_echo(stream, Vec::with_capacity(16384));
Ok(())
},
Err(code) => {

View File

@ -2,12 +2,8 @@
use std::fs::File;
use std::os::wasi::io::FromRawFd;
use std::io::{Read, Write};
use std::net::{Ipv4Addr, AddrParseError};
use std::ops::{Deref, DerefMut};
use std::task::{Waker, RawWaker, RawWakerVTable, Poll};
use std::future::Future;
use std::sync::{Arc, Mutex};
use std::sync::Arc;
use std::cell::RefCell;
const AF_INET: i32 = 2;
@ -55,11 +51,13 @@ extern "C" {
thread_local! {
static GLOBAL_EPOLL: RefCell<Option<Arc<Epoll>>> = RefCell::new(None);
static ASYNC_STATE_POOL: RefCell<Vec<Box<AsyncState>>> = RefCell::new(Vec::new());
}
#[derive(Default)]
struct AsyncState {
callback: Box<FnOnce()>,
_epoll: Arc<Epoll>,
callback: Option<Box<FnOnce()>>,
_epoll: Option<Arc<Epoll>>,
}
pub struct Epoll {
@ -81,21 +79,20 @@ impl Epoll {
GLOBAL_EPOLL.with(|x| {
*x.borrow_mut() = Some(self.clone());
});
let mut events: Vec<EpollEvent> = vec! [ EpollEvent::default(); 16 ];
let mut events: Vec<EpollEvent> = vec! [ EpollEvent::default(); 32 ];
loop {
let events_len = events.len();
let n_ready = unsafe {
_epoll_wait(self.fd, events.as_mut_ptr(), events_len, -1)
};
let n_ready = _epoll_wait(self.fd, events.as_mut_ptr(), events_len, -1);
assert!(n_ready >= 0);
//println!("n_ready = {}", n_ready);
/*if n_ready > 1 {
println!("n_ready = {}", n_ready);
}*/
for ev in &events[..n_ready as usize] {
if ev.events & (EPOLLIN | EPOLLOUT) != 0 {
//println!("Free event {:x} {:?}", ev.events, ev.data as usize as *mut AsyncState);
let state = unsafe {
Box::from_raw(ev.data as usize as *mut AsyncState)
};
(state.callback)();
let mut state = Box::from_raw(ev.data as usize as *mut AsyncState);
(state.callback.take().unwrap())();
put_async_state(state);
//println!("After callback");
} else {
println!("unknown event(s): 0x{:x}", ev.events);
@ -119,22 +116,32 @@ pub enum EpollDirection {
Out
}
fn get_async_state() -> Box<AsyncState> {
ASYNC_STATE_POOL.with(|pool| pool.borrow_mut().pop().unwrap_or_else(|| Box::new(AsyncState::default())))
}
fn put_async_state(mut x: Box<AsyncState>) {
x.callback = None;
x._epoll = None;
ASYNC_STATE_POOL.with(|pool| pool.borrow_mut().push(x));
}
pub fn schedule<F: FnOnce() + 'static>(f: F) {
let epoll = GLOBAL_EPOLL.with(|x| x.borrow().as_ref().unwrap().clone());
let epfd = epoll.fd;
let imm_fd = unsafe { _get_immediate_fd() };
assert!(imm_fd >= 0);
let state = Box::new(AsyncState {
callback: Box::new(move || {
assert!(unsafe {
_epoll_ctl(epfd, EPOLL_CTL_DEL, imm_fd, ::std::ptr::null())
} >= 0);
unsafe { File::from_raw_fd(imm_fd as _) };
f();
}),
_epoll: epoll,
});
let mut state = get_async_state();
state.callback = Some(Box::new(move || {
assert!(unsafe {
_epoll_ctl(epfd, EPOLL_CTL_DEL, imm_fd, ::std::ptr::null())
} >= 0);
unsafe { File::from_raw_fd(imm_fd as _) };
f();
}));
state._epoll = Some(epoll);
let ev = EpollEvent {
events: EPOLLIN | EPOLLET | EPOLLONESHOT,
data: Box::into_raw(state) as usize as _,
@ -168,10 +175,9 @@ fn __get_async_io_payload<T: 'static, P: FnMut(i32) -> Result<T, i32> + 'static,
//println!("async io payload (after poll_action)");
match ret {
Err(x) if x == -EAGAIN || x == -EWOULDBLOCK => {
let state = Box::new(AsyncState {
callback: __get_async_io_payload(epoll.clone(), fd, direction, poll_action, on_ready, true),
_epoll: epoll,
});
let mut state = get_async_state();
state.callback = Some(__get_async_io_payload(epoll.clone(), fd, direction, poll_action, on_ready, true));
state._epoll = Some(epoll);
let direction_flag = match direction {
EpollDirection::In => EPOLLIN,
EpollDirection::Out => EPOLLOUT,
@ -219,7 +225,7 @@ struct InAddr {
s_addr: u32,
}
#[repr(C)]
#[repr(C, packed)]
#[derive(Copy, Clone, Default)]
struct EpollEvent {
events: u32,
@ -246,8 +252,8 @@ pub enum SocketError {
}
pub struct Tcp4Listener {
addr: Ipv4Addr,
port: u16,
_addr: Ipv4Addr,
_port: u16,
fd: i32,
}
@ -270,14 +276,14 @@ impl Tcp4Listener {
if fd < 0 {
return Err(SocketError::SocketCreate);
}
if(unsafe {
if unsafe {
_bind(fd, &sa, ::std::mem::size_of::<SockaddrIn>())
} < 0) {
} < 0 {
return Err(SocketError::Bind);
}
if(unsafe {
if unsafe {
_listen(fd, backlog as _)
} < 0) {
} < 0 {
return Err(SocketError::Listen);
}
@ -288,8 +294,8 @@ impl Tcp4Listener {
}
Ok(Tcp4Listener {
addr: addr,
port: port,
_addr: addr,
_port: port,
fd: fd,
})
}
@ -316,9 +322,11 @@ impl Tcp4Listener {
Err(conn)
}
}, move |x| {
if let Ok(()) = cb(x) {
self.accept_async(ep2, cb);
}
schedule(|| {
if let Ok(()) = cb(x) {
self.accept_async(ep2, cb);
}
});
}))();
}
}