Author: Pedro Lucas Porcellis <porcellis@eletrotupi.com>
first working version of an actual server
cmd/beterrabad/client.ha | 77 +++++++++++ cmd/beterrabad/executor.ha | 86 +++++++++++++ cmd/beterrabad/main.ha | 143 +-------------------- cmd/beterrabad/socket.ha | 258 ++++++++++++++++++++++++++++++++++++++++
diff --git a/cmd/beterrabad/client.ha b/cmd/beterrabad/client.ha new file mode 100644 index 0000000000000000000000000000000000000000..3303ebf7aba954419d688ef00013a8085973a883 --- /dev/null +++ b/cmd/beterrabad/client.ha @@ -0,0 +1,77 @@ +use fmt; +use io; +use strings; +use unix::poll; +use unix::poll::{event}; + +type state = enum { + READ, + WRITE, + WRITE_ERROR, +}; + +type client = struct { + server: *server, + sock: io::file, + state: state, + pollfd: *poll::pollfd, + rbuf: []u8, + wbuf: []u8, +}; + +// Immediately disconnects a client, without sending them an error message. +fn disconnect(c: *client) void = { + io::close(c.sock)!; + free(c.rbuf); + free(c.wbuf); + + let serv = c.server; + let i = (c: uintptr - serv.clients: *[*]client: uintptr): size / size(client); + delete(serv.clients[i]); + delete(serv.pollfd[i + POLLFD_RESERVED]); + for (i < len(serv.clients); i += 1) { + serv.clients[i].pollfd = &serv.pollfd[i + POLLFD_RESERVED]; + }; + + serv.disconnected = true; +}; + +// Prepares an error message to send to a client. They will be disconnected once +// the error message is sent. +fn disconnect_error( + client: *client, + fmt: str, + args: fmt::field... +) void = { + free(client.wbuf); + const msg = fmt::asprintf(fmt, args...); + client.wbuf = strings::toutf8(msg); + client.state = state::WRITE_ERROR; + client.pollfd.events = event::POLLOUT | event::POLLHUP; +}; + +// Writes data to the client. Takes ownership over the buffer. +fn write(client: *client, buf: []u8) void = { + assert(client.state != state::WRITE + && client.state != state::WRITE_ERROR); + client.wbuf = buf; + client.state = state::WRITE; + client.pollfd.events = event::POLLOUT | event::POLLHUP; +}; + +// Writes data to the client. Duplicates the buffer. +fn writebuf(client: *client, buf: []u8) void = { + write(client, alloc(buf...)); +}; + +// Writes a formatted string to the client. +fn writefmt( + client: *client, + fmt: str, + args: fmt::field... +) void = { + let buf = fmt::asprintf(fmt, args...); + let buf = strings::toutf8(buf); + append(buf, '\n'); + write(client, buf); +}; diff --git a/cmd/beterrabad/executor.ha b/cmd/beterrabad/executor.ha new file mode 100644 index 0000000000000000000000000000000000000000..6917b14606d569697df145ae27dfaefc507709b6 --- /dev/null +++ b/cmd/beterrabad/executor.ha @@ -0,0 +1,86 @@ +use log; +use fmt; +use os; +use os::exec; +use beterraba; + +fn start_service(name: str) beterraba::service = { + let dummy_serv: beterraba::servdef = beterraba::servdef { + name = "Dummy", + desc = "Dummy", + cmd = "bash", + args = "-c ~/test.sh", + ... + }; + + let service = beterraba::service { + name = "Dummy", + desc = "DUmmy", + definition = dummy_serv, + status = beterraba::status::STOPPED, + ... + }; + + boot(&service); + + return service; +}; + +// Check if the process is still alive and bail out if it's ok +fn peek(service: *beterraba::service) void = { + // TODO: Move this kind of thing into a better structured log facility + log::printfln("Peeking on {}", service.name); + + match (exec::peek(&service.process)) { + case let s: exec::status => + let status = exec::exit(&s); + + match (status) { + case exec::exit_status => + service.status = beterraba::status::STOPPED; + case exec::signaled => + // TODO: Add the exit msg + service.status = beterraba::status::CRASHED; + }; + case void => + log::println("Still running... Moving on"); + return; + case let err: exec::error => + service.status = beterraba::status::CRASHED; + }; +}; + +// Try to recover a service which might have crashed +fn recover(service: *beterraba::service) void = { + fmt::printfln("Implement!")!; +}; + +fn boot(service: *beterraba::service) void = { + log::printfln("Booting up {}", service.name); + + match (exec::fork()) { + case let childpid: int => + service.status = beterraba::status::STARTED; + service.process = childpid; + log::printfln("Starting process {}", service.name); + case let err: exec::error => + log::fatal("Couldn't fork {}, error", exec::strerror(err)); + case void => + let cmd = exec::cmd("bash", "-c", "~/test.sh"); + + match (cmd) { + case let cmddef: exec::command => + exec::exec(&cmddef); + case exec::nocmd => + fmt::printfln("Couldn't build cmd {}", service.definition.cmd)!; + case exec::error => + fmt::printfln("Couldn't execute cmd {}, error", service.definition.cmd)!; + case => + fmt::println("Something went terrible wrong, good luck!")!; + }; + }; + + //let cmd = exec::cmd(service.definition.cmd, service.definition.args); + let cmd = exec::cmd("bash", "-c", "~/test.sh"); + +}; diff --git a/cmd/beterrabad/main.ha b/cmd/beterrabad/main.ha index 03ca71b51481f484397c2196127ec4b45780e76d..1be097f72b9da20f075df57c70051a1165159ee0 100644 --- a/cmd/beterrabad/main.ha +++ b/cmd/beterrabad/main.ha @@ -6,143 +6,24 @@ use log; use io; use strings; use path; +use unix; use unix::signal; use fmt; +use rt; export fn main() void = { - fmt::println("Hello world")!; - // TODO: Write opts and flesh cli a little better - - // TODO: Lookup files on the default systemd place and allow user to config - // where are those files - const servdir = "/home/eletrotupi/.config/beterraba"; - let servnames: []str = []; - let services: []beterraba::service = []; - defer free(servnames); - defer free(services); - - const it = os::iter(servdir)!; - defer os::finish(it); - - // TODO: Deal with a SIGTERM gracefully and clean up resources - //signal::block(signal::SIGTERM, signal::SIGINT); - //const sigfd = signal::signalfd(signal::SIGINT, signal::SIGTERM)!; - //defer io::close(sigfd)!; - - for (true) { - match (fs::next(it)) { - case let ent: fs::dirent => - if (ent.name == "." || ent.name == "..") { - continue; - }; - - if (strings::hassuffix(ent.name, ".service")) { - append(servnames, strings::dup(ent.name)); - }; - - case void => - break; - }; - }; - - for (let i = 0z; i < len(servnames); i += 1) { - const servpath = path::join(servdir, servnames[i]); - const servfile = os::open(servpath)!; - - append(services, beterraba::parse(servfile)); - }; - - // TODO: We'll prolly need a socket in order to execute commands later - // on and add some sort of control binary (as rc-service does). But in - // order to do that I need to actually grok how sockets/poll(2) works. + signal::block(signal::SIGINT, signal::SIGTERM); + const sigfd = signal::signalfd(signal::SIGINT, signal::SIGTERM)!; + defer io::close(sigfd)!; - //const sock = bind(&services, sigfd); - //defer shutdown(&sock); - //for (dispatch(&sock)) void; + const sock = bind(sigfd); - log::println("beterrabad service running"); + defer shutdown(&sock); - // Boot all services up - for (let i = 0z; i < len(services); i += 1) { - // TODO: Only start things up if we have any services at all, - // otherwise, bail out - boot(&services[i]); - }; + const flags = rt::fcntl(sock.sock, rt::F_GETFL, 0)!; + rt::fcntl(sock.sock, rt::F_SETFL, flags | rt::O_CLOEXEC)!; - // Keep checking them 'til you die - for (true) { - for (let i = 0z; i < len(services); i += 1) { - switch (services[i].status) { - case beterraba::status::STOPPED => - log::printfln("Service is stopped"); - case beterraba::status::STARTED => - log::printfln("Service is started"); - peek(&services[i]); - case beterraba::status::CRASHED => - log::printfln("Service is crashed"); - recover(&services[i]); - }; - }; - }; - - log::println("beterrabad service shutdown"); -}; - -// Check if the process is still alive and bail out if it's ok -fn peek(service: *beterraba::service) void = { - // TODO: Move this kind of thing into a better structured log facility - log::printfln("Peeking on {}", service.name); - - match (exec::peek(&service.process)) { - case let s: exec::status => - let status = exec::exit(&s); - - match (status) { - case exec::exit_status => - service.status = beterraba::status::STOPPED; - case exec::signaled => - // TODO: Add the exit msg - service.status = beterraba::status::CRASHED; - }; - case void => - log::println("Still running... Moving on"); - return; - case let err: exec::error => - service.status = beterraba::status::CRASHED; - }; -}; - -// Try to recover a service which might have crashed -fn recover(service: *beterraba::service) void = { - fmt::printfln("Implement!")!; -}; - -fn boot(service: *beterraba::service) void = { - log::printfln("Booting up {}", service.name); - - match (exec::fork()) { - case let childpid: int => - service.status = beterraba::status::STARTED; - service.process = childpid; - log::printfln("Starting process {}", service.name); - case let err: exec::error => - log::fatal("Couldn't fork {}, error", exec::strerror(err)); - case void => - let cmd = exec::cmd("bash", "-c", "~/test.sh"); - - match (cmd) { - case let cmddef: exec::command => - exec::exec(&cmddef); - case exec::nocmd => - fmt::printfln("Couldn't build cmd {}", service.definition.cmd)!; - case exec::error => - fmt::printfln("Couldn't execute cmd {}, error", service.definition.cmd)!; - case => - fmt::println("Something went terrible wrong, good luck!")!; - }; - }; - - //let cmd = exec::cmd(service.definition.cmd, service.definition.args); - let cmd = exec::cmd("bash", "-c", "~/test.sh"); - + log::println("beterrabad running"); + for (dispatch(&sock)) void; + log::println("beterrabad terminated"); }; diff --git a/cmd/beterrabad/socket.ha b/cmd/beterrabad/socket.ha new file mode 100644 index 0000000000000000000000000000000000000000..bcb845a6b769b001c303b294d167e0feebcd87f9 --- /dev/null +++ b/cmd/beterrabad/socket.ha @@ -0,0 +1,258 @@ +use bytes; +use bufio; +use dirs; +use encoding::utf8; +use errors; +use fmt; +use fs; +use io; +use log; +use net::unix; +use net; +use os; +use path; +use beterraba; +use strings; +use unix::poll::{event}; +use unix::poll; +use unix::signal; + +def CLIENT_MAXBUF: size = 16777216; // 16 MiB +// TODO: Might not even use this, as a client will connect and then drop +def MAX_CLIENTS: size = 256; +def POLLFD_RESERVED: size = 2; + +type servererror = !(io::error | fs::error); + +type server = struct { + sock: net::socket, + signalfd: io::file, + pollfd: []poll::pollfd, + clients: []client, + disconnected: bool, // XXX: Bit of a hack here + terminate: bool, +}; + +fn bind( + signalfd: io::file, +) server = { + const statedir = match (dirs::runtime()) { + case let err: fs::error => + fmt::fatalf("Unable to initialize socket: {}", fs::strerror(err)); + case let dir: str => + yield dir; + }; + + let buf = path::init(); + path::set(&buf, statedir, "beterrabad")!; + const sockpath = path::string(&buf); + const sock = match (unix::listen(sockpath, net::sockflags::NOCLOEXEC)) { + case let l: net::socket => + yield l; + case let err: net::error => + log::fatal(net::strerror(err)); + }; + os::chmod(sockpath, 0o700)!; + + let pollfd = alloc([poll::pollfd { + fd = sock, + events = event::POLLIN, + ... + }, poll::pollfd { + fd = signalfd, + events = event::POLLIN, + ... + }]); + + return server { + sock = sock, + signalfd = signalfd, + pollfd = pollfd, + ... + }; +}; + +fn shutdown(serv: *server) void = { + for (let i = 0z; i < len(serv.clients); i += 1) { + io::close(serv.clients[i].sock)!; + free(serv.clients[i].rbuf); + free(serv.clients[i].wbuf); + }; + free(serv.clients); + free(serv.pollfd); + + let buf = path::init(); + path::set(&buf, dirs::runtime()!, "beterrabad")!; + os::remove(path::string(&buf))!; + + net::close(serv.sock)!; +}; + +fn dispatch(serv: *server) bool = { + if (serv.terminate) { + return false; + }; + + match (poll::poll(serv.pollfd, poll::INDEF)) { + case uint => + if (serv.pollfd[0].revents & event::POLLIN != 0) { + accept(serv); + }; + if (serv.pollfd[1].revents & event::POLLIN != 0) { + signal::read(serv.signalfd)!; + return false; + }; + for (let i = POLLFD_RESERVED; i < len(serv.pollfd); i += 1) { + dispatch_client(serv, &serv.clients[i - POLLFD_RESERVED]); + if (serv.disconnected) { + // Restart loop on client disconnect + serv.disconnected = false; + i = POLLFD_RESERVED; + }; + }; + case errors::interrupted => + yield; + case let err: errors::error => + log::fatal("poll:", errors::strerror(err)); + }; + + return true; +}; + +fn accept(serv: *server) void = { + // TODO: O_NONBLOCK + const sock = match (net::accept(serv.sock)) { + case let sock: net::socket => + yield sock; + case let err: net::error => + log::fatal(net::strerror(err)); + }; + if (len(serv.clients) >= MAX_CLIENTS) { + log::println("Max clients exceeded; dropping client"); + io::close(sock)!; + return; + }; + + append(serv.pollfd, poll::pollfd { + fd = sock, + events = event::POLLIN | event::POLLHUP, + ... + }); + const pollfd = &serv.pollfd[len(serv.pollfd) - 1]; + append(serv.clients, client { + server = serv, + sock = sock, + pollfd = pollfd, + ... + }); +}; + +fn dispatch_client(serv: *server, client: *client) void = { + const pollfd = client.pollfd; + if (pollfd.revents & event::POLLERR != 0) { + disconnect(client); + return; + }; + if (pollfd.revents & event::POLLHUP != 0) { + disconnect(client); + return; + }; + if (pollfd.revents & event::POLLIN != 0) { + client_readable(serv, client); + }; + if (pollfd.revents & event::POLLOUT != 0) { + client_writable(serv, client); + }; +}; + +fn client_readable(serv: *server, client: *client) void = { + let buf: [os::BUFSIZ]u8 = [0...]; + const z = match (io::read(client.sock, buf)) { + case let z: size => + yield z; + case io::EOF => + disconnect(client); + return; + case let err: io::error => + disconnect(client); + return; + }; + append(client.rbuf, buf[..z]...); + if (len(client.rbuf) >= CLIENT_MAXBUF) { + disconnect_error(client, "error Buffer exceeded\n"); + return; + }; + + for (true) { + let i = match (bytes::index(client.rbuf, '\n')) { + case let i: size => + yield i; + case void => + return; + }; + + const line = match (strings::try_fromutf8(client.rbuf[..i])) { + case let s: str => + yield s; + case utf8::invalid => + disconnect_error(client, "error Invalid UTF-8\n"); + return; + }; + defer delete(client.rbuf[..i+1]); + match (exec(serv, client, line)) { + case let err: servererror => + log::printfln("Error processing user command: {}", + strerror(err)); + case void => + yield; + }; + if (serv.disconnected) { + break; + }; + }; +}; + +fn client_writable(serv: *server, client: *client) void = { + const z = match (io::write(client.sock, client.wbuf)) { + case let z: size => + yield z; + case errors::again => + return; + case let err: io::error => + disconnect(client); + return; + }; + delete(client.wbuf[..z]); + if (len(client.wbuf) != 0) { + return; + }; + + switch (client.state) { + case state::WRITE => + client.state = state::READ; + client.pollfd.events = event::POLLIN | event::POLLHUP; + case state::WRITE_ERROR => + disconnect(client); + case => abort(); + }; +}; + +fn exec(serv: *server, client: *client, line: str) (servererror | void) = { + log::printfln("Received {}", line); + + let buf = bufio::dynamic(io::mode::WRITE); + + switch (line) { + case "start test" => + start_service("test"); + case "ping" => + fmt::fprintf(&buf, "pong\n")?; + }; + + fmt::fprintln(&buf, "end")?; + writebuf(client, bufio::buffer(&buf)); +}; + +fn strerror(err: servererror) str = { + return "Error"; +};