From ffeffa1ddd8c415f9940cf050e0b72d2eaddeb6c Mon Sep 17 00:00:00 2001 From: Iustin Pop <iustin@google.com> Date: Tue, 1 Apr 2008 14:45:47 +0000 Subject: [PATCH] Initial tests with ganeti-masterd This patch adds a very in-progress master daemon. This needs to be launched manually, does not background itself, but can be used for opcode execution. Also parts of this code should be moved to luxi.py. Reviewed-by: ultrotter --- daemons/Makefile.am | 2 +- daemons/ganeti-masterd | 247 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 248 insertions(+), 1 deletion(-) create mode 100644 daemons/ganeti-masterd diff --git a/daemons/Makefile.am b/daemons/Makefile.am index 8414a7289..66c2216db 100644 --- a/daemons/Makefile.am +++ b/daemons/Makefile.am @@ -1 +1 @@ -dist_sbin_SCRIPTS = ganeti-noded ganeti-watcher ganeti-master +dist_sbin_SCRIPTS = ganeti-noded ganeti-watcher ganeti-master ganeti-masterd diff --git a/daemons/ganeti-masterd b/daemons/ganeti-masterd new file mode 100644 index 000000000..427f2a431 --- /dev/null +++ b/daemons/ganeti-masterd @@ -0,0 +1,247 @@ +#!/usr/bin/python +# + +# Copyright (C) 2006, 2007 Google Inc. +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA +# 02110-1301, USA. + + +"""Master daemon program. + +Some classes deviates from the standard style guide since the +inheritance from parent classes requires it. + +""" + + +import SocketServer +import threading +import time +import collections +import Queue +import random +import signal +import simplejson + + +from cStringIO import StringIO + +from ganeti import constants +from ganeti import mcpu +from ganeti import opcodes +from ganeti import jqueue +from ganeti import luxi +from ganeti import utils + + +class IOServer(SocketServer.UnixStreamServer): + """IO thread class. + + This class takes care of initializing the other threads, setting + signal handlers (which are processed only in this thread), and doing + cleanup at shutdown. + + """ + QUEUE_PROCESSOR_SIZE = 1 + + def __init__(self, address, rqhandler): + SocketServer.UnixStreamServer.__init__(self, address, rqhandler) + self.do_quit = False + self.queue = jqueue.QueueManager() + self.processors = [] + for i in range(self.QUEUE_PROCESSOR_SIZE): + self.processors.append(threading.Thread(target=PoolWorker, + args=(i, self.queue.new_queue))) + for t in self.processors: + t.start() + signal.signal(signal.SIGINT, self.handle_sigint) + + def process_request_thread(self, request, client_address): + """Process the request. + + This is copied from the code in ThreadingMixIn. + + """ + try: + self.finish_request(request, client_address) + self.close_request(request) + except: + self.handle_error(request, client_address) + self.close_request(request) + + def process_request(self, request, client_address): + """Start a new thread to process the request. + + This is copied from the coode in ThreadingMixIn. + + """ + t = threading.Thread(target=self.process_request_thread, + args=(request, client_address)) + t.start() + + def handle_sigint(self, signum, frame): + print "received %s in %s" % (signum, frame) + self.do_quit = True + self.server_close() + for i in range(self.QUEUE_PROCESSOR_SIZE): + self.queue.new_queue.put(None) + + def serve_forever(self): + """Handle one request at a time until told to quit.""" + while not self.do_quit: + self.handle_request() + + +class ClientRqHandler(SocketServer.BaseRequestHandler): + """Client handler""" + EOM = '\3' + READ_SIZE = 4096 + + def setup(self): + self._buffer = "" + self._msgs = collections.deque() + self._ops = ClientOps(self.server) + + def handle(self): + while True: + msg = self.read_message() + if msg is None: + print "client closed connection" + break + request = simplejson.loads(msg) + if not isinstance(request, dict): + print "wrong request received: %s" % msg + break + method = request.get('request', None) + data = request.get('data', None) + if method is None or data is None: + print "no method or data in request" + break + print "request:", method, data + result = self._ops.handle_request(method, data) + print "result:", result + self.send_message(simplejson.dumps({'success': True, 'result': result})) + + def read_message(self): + while not self._msgs: + data = self.request.recv(self.READ_SIZE) + if not data: + return None + new_msgs = (self._buffer + data).split(self.EOM) + self._buffer = new_msgs.pop() + self._msgs.extend(new_msgs) + return self._msgs.popleft() + + def send_message(self, msg): + #print "sending", msg + self.request.sendall(msg + self.EOM) + + +class ClientOps: + """Class holding high-level client operations.""" + def __init__(self, server): + self.server = server + self._cpu = None + + def _getcpu(self): + if self._cpu is None: + self._cpu = mcpu.Processor(lambda x: None) + return self._cpu + + def handle_request(self, operation, args): + print operation, args + if operation == "submit": + return self.put(args) + elif operation == "query": + path = args["object"] + if path == "instances": + return self.query(args) + else: + raise ValueError("Invalid operation") + + def put(self, args): + job = luxi.UnserializeJob(args) + rid = self.server.queue.put(job) + return rid + + def query(self, args): + path = args["object"] + fields = args["fields"] + names = args["names"] + if path == "instances": + opclass = opcodes.OpQueryInstances + else: + raise ValueError("Invalid object %s" % path) + + op = opclass(output_fields = fields, names=names) + cpu = self._getcpu() + result = cpu.ExecOpCode(op) + return result + + def query_job(self, rid): + rid = int(data) + job = self.server.queue.query(rid) + return job + + +def JobRunner(proc, job): + """Job executor. + + This functions processes a single job in the context of given + processor instance. + + """ + job.SetStatus(opcodes.Job.STATUS_RUNNING) + for op in job.data.op_list: + proc.ExecOpCode(op) + job.SetStatus(opcodes.Job.STATUS_FINISHED, result=opcodes.Job.RESULT_OK) + + +def PoolWorker(worker_id, incoming_queue): + """A worker thread function. + + This is the actual processor of a single thread of Job execution. + + """ + while True: + print "worker %s sleeping" % worker_id + item = incoming_queue.get(True) + if item is None: + break + print "worker %s processing job %s" % (worker_id, item.data.job_id) + utils.Lock('cmd') + try: + proc = mcpu.Processor(feedback=lambda x: None) + try: + JobRunner(proc, item) + except errors.GenericError, err: + print "ganeti exception %s" % err + finally: + utils.Unlock('cmd') + utils.LockCleanup() + print "worker %s finish job %s" % (worker_id, item.data.job_id) + print "worker %s exiting" % worker_id + + +def main(): + """Main function""" + + master = IOServer(constants.MASTER_SOCKET, ClientRqHandler) + master.serve_forever() + + +if __name__ == "__main__": + main() -- GitLab