mirror of
https://github.com/fluencelabs/wasmer
synced 2024-12-15 07:05:41 +00:00
467 lines
13 KiB
Rust
467 lines
13 KiB
Rust
#![cfg(all(target_arch = "wasm32", target_os = "wasi"))]
|
|
#![feature(wasi_ext)]
|
|
|
|
use std::cell::RefCell;
|
|
use std::fs::File;
|
|
use std::net::{AddrParseError, Ipv4Addr};
|
|
use std::os::wasi::io::FromRawFd;
|
|
use std::sync::{Arc, Mutex};
|
|
|
|
const AF_INET: i32 = 2;
|
|
const SOCK_STREAM: i32 = 1;
|
|
const O_NONBLOCK: u32 = 2048;
|
|
const F_GETFL: i32 = 3;
|
|
const F_SETFL: i32 = 4;
|
|
const EPOLLIN: u32 = 1u32;
|
|
const EPOLLOUT: u32 = 4u32;
|
|
const EPOLLONESHOT: u32 = 1u32 << 30;
|
|
const EPOLLET: u32 = 1u32 << 31;
|
|
const EAGAIN: i32 = 11;
|
|
const EWOULDBLOCK: i32 = EAGAIN;
|
|
const EPOLL_CTL_ADD: i32 = 1;
|
|
const EPOLL_CTL_DEL: i32 = 2;
|
|
|
|
#[link(wasm_import_module = "net")]
|
|
extern "C" {
|
|
fn _socket(family: i32, _type: i32, proto: i32) -> i32;
|
|
fn _bind(fd: i32, sa: *const SockaddrIn, sa_len: usize) -> i32;
|
|
fn _listen(fd: i32, backlog: i32) -> i32;
|
|
fn _accept4(fd: i32, sa: *mut SockaddrIn, sa_len: *mut usize, flags: u32) -> i32;
|
|
fn _sendto(
|
|
fd: i32,
|
|
buf: *const u8,
|
|
buf_len: usize,
|
|
flags: u32,
|
|
addr: *const SockaddrIn,
|
|
addr_len: usize,
|
|
) -> i32;
|
|
fn _recvfrom(
|
|
fd: i32,
|
|
buf: *mut u8,
|
|
buf_len: usize,
|
|
flags: u32,
|
|
addr: *mut SockaddrIn,
|
|
addr_len: *mut usize,
|
|
) -> i32;
|
|
fn _eventfd_sem(initial: u32) -> i32;
|
|
fn _epoll_create() -> i32;
|
|
fn _epoll_ctl(epfd: i32, op: i32, fd: i32, event: *const EpollEvent) -> i32;
|
|
fn _epoll_wait(epfd: i32, events: *mut EpollEvent, maxevents: usize, timeout: i32) -> i32;
|
|
fn _fcntl(fd: i32, cmd: i32, arg: u32) -> i32;
|
|
}
|
|
|
|
thread_local! {
|
|
static GLOBAL_EPOLL: RefCell<Option<Arc<Epoll>>> = RefCell::new(None);
|
|
static ASYNC_STATE_POOL: RefCell<Vec<Box<AsyncState>>> = RefCell::new(Vec::new());
|
|
}
|
|
|
|
#[derive(Default)]
|
|
struct AsyncState {
|
|
callback: Option<Box<FnOnce()>>,
|
|
_epoll: Option<Arc<Epoll>>,
|
|
}
|
|
|
|
pub struct Epoll {
|
|
fd: i32,
|
|
imm_queue: Mutex<Vec<Box<FnOnce()>>>,
|
|
}
|
|
|
|
impl Epoll {
|
|
pub fn new() -> Epoll {
|
|
let fd = unsafe { _epoll_create() };
|
|
assert!(fd >= 0);
|
|
Epoll {
|
|
fd: fd,
|
|
imm_queue: Mutex::new(Vec::new()),
|
|
}
|
|
}
|
|
|
|
pub fn schedule<F: FnOnce() + 'static>(&self, f: F) {
|
|
self.imm_queue.lock().unwrap().push(Box::new(f));
|
|
}
|
|
|
|
pub unsafe fn run(self: Arc<Self>) -> ! {
|
|
GLOBAL_EPOLL.with(|x| {
|
|
*x.borrow_mut() = Some(self.clone());
|
|
});
|
|
let mut events: Vec<EpollEvent> = vec![EpollEvent::default(); 32];
|
|
loop {
|
|
loop {
|
|
let imm_queue =
|
|
::std::mem::replace(&mut *self.imm_queue.lock().unwrap(), Vec::new());
|
|
if imm_queue.len() == 0 {
|
|
break;
|
|
}
|
|
for f in imm_queue {
|
|
f();
|
|
}
|
|
}
|
|
let events_len = events.len();
|
|
let n_ready = _epoll_wait(self.fd, events.as_mut_ptr(), events_len, -1);
|
|
assert!(n_ready >= 0);
|
|
/*if n_ready > 1 {
|
|
println!("n_ready = {}", n_ready);
|
|
}*/
|
|
for ev in &events[..n_ready as usize] {
|
|
if ev.events & (EPOLLIN | EPOLLOUT) != 0 {
|
|
//println!("Free event {:x} {:?}", ev.events, ev.data as usize as *mut AsyncState);
|
|
let mut state = Box::from_raw(ev.data as usize as *mut AsyncState);
|
|
(state.callback.take().unwrap())();
|
|
put_async_state(state);
|
|
//println!("After callback");
|
|
} else {
|
|
println!("unknown event(s): 0x{:x}", ev.events);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
impl Drop for Epoll {
|
|
fn drop(&mut self) {
|
|
unsafe {
|
|
File::from_raw_fd(self.fd as _);
|
|
}
|
|
}
|
|
}
|
|
|
|
#[derive(Copy, Clone, Debug)]
|
|
pub enum EpollDirection {
|
|
In,
|
|
Out,
|
|
}
|
|
|
|
fn get_async_state() -> Box<AsyncState> {
|
|
ASYNC_STATE_POOL.with(|pool| {
|
|
pool.borrow_mut()
|
|
.pop()
|
|
.unwrap_or_else(|| Box::new(AsyncState::default()))
|
|
})
|
|
}
|
|
|
|
fn put_async_state(mut x: Box<AsyncState>) {
|
|
x.callback = None;
|
|
x._epoll = None;
|
|
ASYNC_STATE_POOL.with(|pool| pool.borrow_mut().push(x));
|
|
}
|
|
|
|
pub fn schedule<F: FnOnce() + 'static>(f: F) {
|
|
//println!("schedule");
|
|
let epoll = GLOBAL_EPOLL.with(|x| x.borrow().as_ref().unwrap().clone());
|
|
epoll.schedule(f);
|
|
}
|
|
|
|
fn get_async_io_payload<
|
|
T: 'static,
|
|
P: FnMut(i32) -> Result<T, i32> + 'static,
|
|
F: FnOnce(Result<T, i32>) + 'static,
|
|
>(
|
|
epoll: Arc<Epoll>,
|
|
fd: i32,
|
|
direction: EpollDirection,
|
|
poll_action: P,
|
|
on_ready: F,
|
|
) -> Box<FnOnce()> {
|
|
__get_async_io_payload(epoll, fd, direction, poll_action, on_ready, false)
|
|
}
|
|
|
|
fn __get_async_io_payload<
|
|
T: 'static,
|
|
P: FnMut(i32) -> Result<T, i32> + 'static,
|
|
F: FnOnce(Result<T, i32>) + 'static,
|
|
>(
|
|
epoll: Arc<Epoll>,
|
|
fd: i32,
|
|
direction: EpollDirection,
|
|
mut poll_action: P,
|
|
on_ready: F,
|
|
registered: bool,
|
|
) -> Box<FnOnce()> {
|
|
let epfd = epoll.fd;
|
|
Box::new(move || {
|
|
//println!("async io payload");
|
|
let ret = poll_action(fd);
|
|
//println!("async io payload (after poll_action)");
|
|
match ret {
|
|
Err(x) if x == -EAGAIN || x == -EWOULDBLOCK => {
|
|
let mut state = get_async_state();
|
|
state.callback = Some(__get_async_io_payload(
|
|
epoll.clone(),
|
|
fd,
|
|
direction,
|
|
poll_action,
|
|
on_ready,
|
|
true,
|
|
));
|
|
state._epoll = Some(epoll);
|
|
let direction_flag = match direction {
|
|
EpollDirection::In => EPOLLIN,
|
|
EpollDirection::Out => EPOLLOUT,
|
|
};
|
|
let ev = EpollEvent {
|
|
events: direction_flag | EPOLLET | EPOLLONESHOT,
|
|
data: Box::into_raw(state) as usize as _,
|
|
};
|
|
//println!("Alloc event {:?}", ev.data as usize as *mut AsyncState);
|
|
let ret = unsafe { _epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev) };
|
|
assert!(ret >= 0);
|
|
}
|
|
x => {
|
|
if registered {
|
|
assert!(
|
|
unsafe { _epoll_ctl(epfd, EPOLL_CTL_DEL, fd, ::std::ptr::null(),) } >= 0
|
|
);
|
|
}
|
|
on_ready(x); // fast path
|
|
}
|
|
}
|
|
})
|
|
}
|
|
|
|
#[repr(C)]
|
|
#[derive(Default, Copy, Clone)]
|
|
struct SockaddrIn {
|
|
sin_family: u16, // e.g. AF_INET
|
|
sin_port: u16, // e.g. htons(3490)
|
|
sin_addr: InAddr,
|
|
sin_zero: [u8; 8],
|
|
}
|
|
|
|
#[repr(C)]
|
|
#[derive(Copy, Clone)]
|
|
struct InAddr {
|
|
s_addr: u32,
|
|
}
|
|
|
|
#[repr(C, packed)]
|
|
#[derive(Copy, Clone, Default)]
|
|
struct EpollEvent {
|
|
events: u32,
|
|
data: u64,
|
|
}
|
|
|
|
fn invert_byteorder_u16(x: u16) -> u16 {
|
|
unsafe {
|
|
use std::mem::transmute;
|
|
let buf: [u8; 2] = transmute(x);
|
|
let out: [u8; 2] = [buf[1], buf[0]];
|
|
transmute(out)
|
|
}
|
|
}
|
|
|
|
#[derive(Debug)]
|
|
pub enum SocketError {
|
|
AddrParse(AddrParseError),
|
|
SocketCreate,
|
|
Bind,
|
|
Listen,
|
|
Accept,
|
|
Message(String),
|
|
}
|
|
|
|
pub struct Tcp4Listener {
|
|
_addr: Ipv4Addr,
|
|
_port: u16,
|
|
fd: i32,
|
|
}
|
|
|
|
impl Tcp4Listener {
|
|
pub fn new<A: AsRef<str>>(
|
|
addr: A,
|
|
port: u16,
|
|
backlog: u32,
|
|
) -> Result<Tcp4Listener, SocketError> {
|
|
let addr: Ipv4Addr = addr.as_ref().parse().map_err(SocketError::AddrParse)?;
|
|
let sa = SockaddrIn {
|
|
sin_family: AF_INET as _,
|
|
sin_port: invert_byteorder_u16(port),
|
|
sin_addr: InAddr {
|
|
s_addr: unsafe { ::std::mem::transmute(addr.octets()) },
|
|
},
|
|
sin_zero: [0; 8],
|
|
};
|
|
let fd = unsafe { _socket(AF_INET, SOCK_STREAM, 0) };
|
|
if fd < 0 {
|
|
return Err(SocketError::SocketCreate);
|
|
}
|
|
if unsafe { _bind(fd, &sa, ::std::mem::size_of::<SockaddrIn>()) } < 0 {
|
|
return Err(SocketError::Bind);
|
|
}
|
|
if unsafe { _listen(fd, backlog as _) } < 0 {
|
|
return Err(SocketError::Listen);
|
|
}
|
|
|
|
unsafe {
|
|
let mut socket_flags = _fcntl(fd, F_GETFL, 0) as u32;
|
|
socket_flags |= O_NONBLOCK;
|
|
assert!(_fcntl(fd, F_SETFL, socket_flags) >= 0);
|
|
}
|
|
|
|
Ok(Tcp4Listener {
|
|
_addr: addr,
|
|
_port: port,
|
|
fd: fd,
|
|
})
|
|
}
|
|
|
|
pub fn accept_async<F: Fn(Result<Arc<TcpStream>, i32>) -> Result<(), ()> + 'static>(
|
|
self: Arc<Self>,
|
|
ep: Arc<Epoll>,
|
|
cb: F,
|
|
) {
|
|
let ep2 = ep.clone();
|
|
(get_async_io_payload(
|
|
ep.clone(),
|
|
self.fd,
|
|
EpollDirection::In,
|
|
move |fd| -> Result<Arc<TcpStream>, i32> {
|
|
let mut incoming_sa: SockaddrIn = SockaddrIn::default();
|
|
let mut real_len: usize = ::std::mem::size_of::<SockaddrIn>();
|
|
let conn = unsafe { _accept4(fd, &mut incoming_sa, &mut real_len, O_NONBLOCK) };
|
|
if conn >= 0 {
|
|
unsafe {
|
|
let mut socket_flags = _fcntl(conn, F_GETFL, 0) as u32;
|
|
socket_flags |= O_NONBLOCK;
|
|
assert!(_fcntl(conn, F_SETFL, socket_flags) >= 0);
|
|
}
|
|
Ok(Arc::new(TcpStream {
|
|
fd: conn,
|
|
epoll: ep.clone(),
|
|
}))
|
|
} else {
|
|
Err(conn)
|
|
}
|
|
},
|
|
move |x| {
|
|
schedule(|| {
|
|
if let Ok(()) = cb(x) {
|
|
self.accept_async(ep2, cb);
|
|
}
|
|
});
|
|
},
|
|
))();
|
|
}
|
|
}
|
|
|
|
pub struct TcpStream {
|
|
fd: i32,
|
|
epoll: Arc<Epoll>,
|
|
}
|
|
|
|
impl TcpStream {
|
|
pub fn __write_async(
|
|
self: Arc<Self>,
|
|
data: Vec<u8>,
|
|
offset: usize,
|
|
cb: impl FnOnce(Result<(usize, Vec<u8>), i32>) + 'static,
|
|
) {
|
|
let mut data = Some(data);
|
|
|
|
(get_async_io_payload(
|
|
self.epoll.clone(),
|
|
self.fd,
|
|
EpollDirection::Out,
|
|
move |fd| -> Result<(usize, Vec<u8>), i32> {
|
|
let _data = data.as_ref().unwrap();
|
|
let _data = &_data[offset..];
|
|
let ret =
|
|
unsafe { _sendto(fd, _data.as_ptr(), _data.len(), 0, ::std::ptr::null(), 0) };
|
|
if ret >= 0 {
|
|
Ok((ret as usize, data.take().unwrap()))
|
|
} else {
|
|
Err(ret)
|
|
}
|
|
},
|
|
move |x| {
|
|
drop(self);
|
|
cb(x);
|
|
},
|
|
))();
|
|
}
|
|
|
|
pub fn write_async(
|
|
self: Arc<Self>,
|
|
data: Vec<u8>,
|
|
cb: impl FnOnce(Result<(usize, Vec<u8>), i32>) + 'static,
|
|
) {
|
|
self.__write_async(data, 0, cb)
|
|
}
|
|
|
|
pub fn write_all_async(
|
|
self: Arc<Self>,
|
|
data: Vec<u8>,
|
|
cb: impl FnOnce(Result<Vec<u8>, i32>) + 'static,
|
|
) {
|
|
fn inner(
|
|
me: Arc<TcpStream>,
|
|
data: Vec<u8>,
|
|
offset: usize,
|
|
cb: impl FnOnce(Result<Vec<u8>, i32>) + 'static,
|
|
) {
|
|
let me2 = me.clone();
|
|
me.__write_async(data, offset, move |result| match result {
|
|
Ok((len, data)) => {
|
|
let new_offset = offset + len;
|
|
if new_offset == data.len() {
|
|
cb(Ok(data));
|
|
} else {
|
|
inner(me2, data, new_offset, cb);
|
|
}
|
|
}
|
|
Err(code) => {
|
|
cb(Err(code));
|
|
}
|
|
})
|
|
}
|
|
inner(self, data, 0, cb);
|
|
}
|
|
|
|
pub fn read_async(
|
|
self: Arc<Self>,
|
|
out: Vec<u8>,
|
|
cb: impl FnOnce(Result<Vec<u8>, i32>) + 'static,
|
|
) {
|
|
let mut out = Some(out);
|
|
(get_async_io_payload(
|
|
self.epoll.clone(),
|
|
self.fd,
|
|
EpollDirection::In,
|
|
move |fd| -> Result<Vec<u8>, i32> {
|
|
let _out = out.as_mut().unwrap();
|
|
let out_cap = _out.capacity();
|
|
let ret = unsafe {
|
|
_recvfrom(
|
|
fd,
|
|
_out.as_mut_ptr(),
|
|
out_cap,
|
|
0,
|
|
::std::ptr::null_mut(),
|
|
::std::ptr::null_mut(),
|
|
)
|
|
};
|
|
if ret >= 0 {
|
|
assert!(ret as usize <= out_cap);
|
|
unsafe {
|
|
_out.set_len(ret as usize);
|
|
}
|
|
Ok(out.take().unwrap())
|
|
} else {
|
|
Err(ret)
|
|
}
|
|
},
|
|
move |x| {
|
|
drop(self);
|
|
cb(x);
|
|
},
|
|
))();
|
|
}
|
|
}
|
|
|
|
impl Drop for TcpStream {
|
|
fn drop(&mut self) {
|
|
unsafe {
|
|
File::from_raw_fd(self.fd as _);
|
|
}
|
|
}
|
|
}
|