ref: master
cmd/beterrabad/socket.ha
use beterraba; use bufio; use dirs; use encoding::utf8; use errors; use fmt; use fs; use io; use log; use net::unix; use net; use os; use path; use strings; use unix::poll::{event}; use unix::poll; use unix::signal; type servererror = !(io::error | fs::error); type server = struct { sock: net::socket, signalfd: io::file, pollfd: []poll::pollfd, clients: []client, services: []beterraba::service, disconnected: bool }; type client = struct { server: *server, sock: io::file, pollfd: *poll::pollfd, state: state, wbuf: []u8, rbuf: []u8 }; type state = enum { READ, WRITE, WRITE_ERROR, }; fn bind(fd: io::file) server = { let runtime = match(dirs::runtime()) { case let dir: str => yield dir; case let err: fs::error => log::fatalf("Some error on trying to find runtime dir {}", fs::strerror(err)); }; let pathbuf = path::init(); path::add(&pathbuf, runtime, "beterrabad")!; let sockpath = path::string(&pathbuf); const sock = match(unix::listen(sockpath, net::sockflags::NOCLOEXEC)) { case let err: net::error => log::fatalf("Could not create socket: ", net::strerror(err)); case let unixsock: net::socket => yield unixsock; }; let pollfd = alloc([poll::pollfd { fd = sock, events = event::POLLIN, ... }, poll::pollfd { fd = fd, events = event::POLLIN, ... }]); return server { sock = sock, pollfd = pollfd, signalfd = fd, ... }; }; fn dispatch(s: *server) bool = { match(poll::poll(s.pollfd, poll::INDEF)) { case uint => { if (s.pollfd[0].revents & event::POLLIN != 0) { accept(s); }; if (s.pollfd[1].revents & event::POLLIN != 0) { signal::read(s.signalfd)!; return false; }; for (let i = 2z; i < len(s.pollfd); i += 1) { dispatch_client(s, &s.clients[i - 2]); if (s.disconnected) { // Restart loop on client disconnect s.disconnected = false; i = 2z; }; }; }; case let err: errors::error => log::fatal("poll:", errors::strerror(err)); }; return true; }; fn dispatch_client(s: *server, client: *client) void = { let cpollfd = client.pollfd; if (cpollfd.revents & event::POLLERR != 0) { disconnect_client(client); return; }; if (cpollfd.revents & event::POLLHUP != 0) { disconnect_client(client); return; }; if (cpollfd.revents & event::POLLIN != 0) { read_client(s, client); }; if (cpollfd.revents & event::POLLOUT != 0) { write_client(s, client); }; }; fn write_client(s: *server, client: *client) void = { let sz = match (io::write(client.sock, client.wbuf)) { case let z: size => yield z; case errors::again => return; case let err: io::error => log::printfln("Couldn't write to client sock due to {}", io::strerror(err)); disconnect_client(client); return; }; // Clean up the buffer delete(client.wbuf[..sz]); switch (client.state) { case state::WRITE => client.state = state::READ; client.pollfd.events = event::POLLIN | event::POLLHUP; case state::WRITE_ERROR => disconnect_client(client); case => abort(); }; }; fn read_client(s: *server, client: *client) void = { let bufline = match (bufio::scanline(client.sock)) { case let l: []u8 => yield l; case io::EOF => disconnect_client(client); case io::error => disconnect_client(client); }; let line = match (strings::fromutf8(bufline as []u8)) { case let s: str => yield s; case utf8::invalid => log::fatal("invalid utf-8"); }; match (exec(line, s, client)) { case void => void; case servererror => log::fatal("Fudeu"); }; }; fn exec(line: str, server: *server, client: *client) (servererror | void) = { let buf = bufio::dynamic(io::mode::WRITE); let sline = strings::cut(line, " "); let cmd = sline.0; let args = sline.1; // TODO: Flesh out command execution here switch (cmd) { case "start" => let status = start_service(args, server); fmt::fprintf(&buf, status)?; case "status" => let status = status_service(args, server); fmt::fprintf(&buf, status)?; case "started" => let status = notify_service("started", args, server); fmt::fprintf(&buf, status)?; case "crashed" => let status = notify_service("crashed", args, server); fmt::fprintf(&buf, status)?; case "stopped" => let status = notify_service("stopped", args, server); fmt::fprintf(&buf, status)?; case "list" => let status = list_services(args, server); fmt::fprintf(&buf, status)?; case "ping" => fmt::fprintf(&buf, "pong")?; case => fmt::fprintf(&buf, "unknown command")?; }; fmt::fprintln(&buf, "\nend")?; writebuf(client, bufio::buffer(&buf)); }; // Writes data to the client. Takes ownership over the buffer. fn commit_write(client: *client, buf: []u8) void = { assert(client.state != state::WRITE && client.state != state::WRITE_ERROR); client.wbuf = buf; client.state = state::WRITE; client.pollfd.events = event::POLLOUT | event::POLLHUP; }; // Writes data to the client. Duplicates the buffer. fn writebuf(client: *client, buf: []u8) void = { commit_write(client, alloc(buf...)); }; // TODO: Check if we reached max client connections first fn accept(s: *server) void = { let clientsock = match(net::accept(s.sock)) { case let sock: net::socket => yield sock; case let err: net::error => log::fatalf("Couldn't grab a client sock due to {}", net::strerror(err)); }; append(s.pollfd, poll::pollfd { fd = clientsock, events = event::POLLIN | event::POLLHUP, ... }); let pollfd = &s.pollfd[len(s.pollfd) - 1]; append(s.clients, client { server = s, sock = clientsock, pollfd = pollfd, ... }); }; // Immediately disconnects a client, without sending them an error message. fn disconnect_client(c: *client) void = { io::close(c.sock)!; free(c.rbuf); free(c.wbuf); let serv = c.server; let i = (c: uintptr - serv.clients: *[*]client: uintptr): size / size(client); delete(serv.clients[i]); delete(serv.pollfd[i + 2z]); for (i < len(serv.clients); i += 1) { serv.clients[i].pollfd = &serv.pollfd[i + 2z]; }; serv.disconnected = true; }; // TODO: Walk over every connected client and disconnect them & free the clients fn shutdown(s: *server) void = { for (let i = 0z; i < len(s.clients); i += 1) { net::close(s.clients[i].sock)!; free(s.clients[i].wbuf); free(s.clients[i].rbuf); }; free(s.clients); free(s.pollfd); let pathbuf = path::init(); path::add(&pathbuf, dirs::runtime()!, "beterrabad")!; os::remove(path::string(&pathbuf))!; match(net::close(s.sock)) { case let err: net::error => log::printfln( "There was some error trying to close the socket, {}", net::strerror(err) ); case void => void; }; };