Make schedule() faster.

This commit is contained in:
Heyang Zhou 2019-05-14 00:04:32 -07:00
parent 4f77f4d024
commit 2963b0a2eb

View File

@ -3,7 +3,7 @@
use std::fs::File;
use std::os::wasi::io::FromRawFd;
use std::net::{Ipv4Addr, AddrParseError};
use std::sync::Arc;
use std::sync::{Arc, Mutex};
use std::cell::RefCell;
const AF_INET: i32 = 2;
@ -28,7 +28,7 @@ extern "C" {
fn _accept4(fd: i32, sa: *mut SockaddrIn, sa_len: *mut usize, flags: u32) -> i32;
fn _sendto(fd: i32, buf: *const u8, buf_len: usize, flags: u32, addr: *const SockaddrIn, addr_len: usize) -> i32;
fn _recvfrom(fd: i32, buf: *mut u8, buf_len: usize, flags: u32, addr: *mut SockaddrIn, addr_len: *mut usize) -> i32;
fn _get_immediate_fd() -> i32;
fn _eventfd_sem(initial: u32) -> i32;
fn _epoll_create() -> i32;
fn _epoll_ctl(
epfd: i32,
@ -62,6 +62,7 @@ struct AsyncState {
pub struct Epoll {
fd: i32,
imm_queue: Mutex<Vec<Box<FnOnce()>>>,
}
impl Epoll {
@ -72,15 +73,29 @@ impl Epoll {
assert!(fd >= 0);
Epoll {
fd: fd,
imm_queue: Mutex::new(Vec::new()),
}
}
pub fn schedule<F: FnOnce() + 'static>(&self, f: F) {
self.imm_queue.lock().unwrap().push(Box::new(f));
}
pub unsafe fn run(self: Arc<Self>) -> ! {
GLOBAL_EPOLL.with(|x| {
*x.borrow_mut() = Some(self.clone());
});
let mut events: Vec<EpollEvent> = vec! [ EpollEvent::default(); 32 ];
loop {
loop {
let imm_queue = ::std::mem::replace(&mut *self.imm_queue.lock().unwrap(), Vec::new());
if imm_queue.len() == 0 {
break;
}
for f in imm_queue {
f();
}
}
let events_len = events.len();
let n_ready = _epoll_wait(self.fd, events.as_mut_ptr(), events_len, -1);
assert!(n_ready >= 0);
@ -127,27 +142,9 @@ fn put_async_state(mut x: Box<AsyncState>) {
}
pub fn schedule<F: FnOnce() + 'static>(f: F) {
//println!("schedule");
let epoll = GLOBAL_EPOLL.with(|x| x.borrow().as_ref().unwrap().clone());
let epfd = epoll.fd;
let imm_fd = unsafe { _get_immediate_fd() };
assert!(imm_fd >= 0);
let mut state = get_async_state();
state.callback = Some(Box::new(move || {
assert!(unsafe {
_epoll_ctl(epfd, EPOLL_CTL_DEL, imm_fd, ::std::ptr::null())
} >= 0);
unsafe { File::from_raw_fd(imm_fd as _) };
f();
}));
state._epoll = Some(epoll);
let ev = EpollEvent {
events: EPOLLIN | EPOLLET | EPOLLONESHOT,
data: Box::into_raw(state) as usize as _,
};
let ret = unsafe { _epoll_ctl(epfd, EPOLL_CTL_ADD, imm_fd, &ev) };
assert!(ret >= 0);
epoll.schedule(f);
}
fn get_async_io_payload<T: 'static, P: FnMut(i32) -> Result<T, i32> + 'static, F: FnOnce(Result<T, i32>) + 'static>(