Source code for superdatabase3000.superdatabase3000

"""
This is the main module of superdatabase3000.

You'll find here the client/server classes.
"""

import sys
import collections

from superdatabase3000.signal import ExitSignalWatcher
from superdatabase3000.socket import SocketClient, SocketServer
from superdatabase3000.hdf import HdfStoreManager


Args = collections.namedtuple(
    "Args",
    ["table", "where", "columns", "start", "stop", "df"],
    # defaults=[None, None, None, None, None, None]  # >=3.7
)
Args.__new__.__defaults__ = (None,) * len(Args._fields)


def append_doc_of(fun):
    """A decorator to append the documentation of a function to another."""
    def decorator(this_fun):
        this_fun.__doc__ += fun.__doc__
        return this_fun

    return decorator


[docs]class DbClient(): """A client to interact with your favorite database.""" def __init__(self, sock_filename=None): """ Parameters ---------- sock_filename : str path to the DbServer socket (default: "/tmp/superdatabase3000.sock") Example -------- Assuming a server is already launched >>> client = DbClient("/tmp/db.sock") >>> df = client.select("/toto") """ self.socket = SocketClient(sock_filename) def __del__(self): """Close the socket.""" del self.socket def _query(self, method, args): """Send a query to the server.""" args_dic = args._asdict() if args_dic["table"][0] != "/": args_dic["table"] = "/" + args_dic["table"] self.socket.send({ "method": method, "args": { k: args_dic[k] for k in args_dic if args_dic[k] is not None } }) return self.socket.recv()
[docs] @append_doc_of(HdfStoreManager.select) def select(self, table, where=None, columns=None, start=None, stop=None): """""" return self._query("select", Args(table, where, columns, start, stop))
[docs] @append_doc_of(HdfStoreManager.insert) def insert(self, table, df): """""" return self._query("insert", Args(table, df=df))
[docs] @append_doc_of(HdfStoreManager.delete) def delete(self, table, where): """""" return self._query("delete", Args(table, where))
[docs] @append_doc_of(HdfStoreManager.drop) def drop(self, table): """""" return self._query("drop", Args(table))
[docs]class DbServer(): """A server running your favorite database.""" def __init__(self, sock_filename=None, hdf_filename=None): """ Parameters ---------- sock_filename : str path to the DbServer socket (default: "/tmp/superdatabase3000.sock") hdf_filename : str path to the DbServer hdf store (default: "~/.superdatabase3000.hdf") Example -------- >>> server = DbServer( ... sock_filename="/tmp/db.sock", ... hdf_filename="/tmp/db.h5" ... ) >>> server.read_loop() """ self.socket = SocketServer(sock_filename) self.hdf = HdfStoreManager(hdf_filename) self.sig = ExitSignalWatcher() self.sig.catch() self.hdf.maintain() # TODO: not sure when to do this
[docs] @append_doc_of(SocketServer.poll_events) def read_loop(self): """Poll events in an infinite loop.""" while not self.sig.EXIT: self.socket.poll_events(self._on_msg) exit_code = self.sig.EXIT self.__del__() sys.exit(exit_code)
def _on_msg(self, client_sock_fd, msg): """Callback to send hdf query result to the client.""" hdf_method = self.hdf.__class__.__dict__[msg["method"]] query = hdf_method(self.hdf, **msg["args"]) self.socket.send_to(client_sock_fd, query) if msg["method"] != "select": # insert/delete/drop self.hdf.flush() def __del__(self): """Close what needs to be.""" if getattr(self, "hdf", None) is not None: del self.hdf if getattr(self, "socket", None) is not None: del self.socket if getattr(self, "sig", None) is not None: self.sig.restore() del self.sig