diff --git a/daemons/Makefile.am b/daemons/Makefile.am index 8414a7289c8d1400fd0cefa969a25c64f6ed9cef..66c2216dbbd4be29ec44137bd2ccf13fb183a810 100644 --- a/daemons/Makefile.am +++ b/daemons/Makefile.am @@ -1 +1 @@ -dist_sbin_SCRIPTS = ganeti-noded ganeti-watcher ganeti-master +dist_sbin_SCRIPTS = ganeti-noded ganeti-watcher ganeti-master ganeti-masterd diff --git a/daemons/ganeti-masterd b/daemons/ganeti-masterd new file mode 100644 index 0000000000000000000000000000000000000000..427f2a431de88c5fa775e53a291ddf9749d56d85 --- /dev/null +++ b/daemons/ganeti-masterd @@ -0,0 +1,247 @@ +#!/usr/bin/python +# + +# Copyright (C) 2006, 2007 Google Inc. +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA +# 02110-1301, USA. + + +"""Master daemon program. + +Some classes deviates from the standard style guide since the +inheritance from parent classes requires it. + +""" + + +import SocketServer +import threading +import time +import collections +import Queue +import random +import signal +import simplejson + + +from cStringIO import StringIO + +from ganeti import constants +from ganeti import mcpu +from ganeti import opcodes +from ganeti import jqueue +from ganeti import luxi +from ganeti import utils + + +class IOServer(SocketServer.UnixStreamServer): + """IO thread class. + + This class takes care of initializing the other threads, setting + signal handlers (which are processed only in this thread), and doing + cleanup at shutdown. + + """ + QUEUE_PROCESSOR_SIZE = 1 + + def __init__(self, address, rqhandler): + SocketServer.UnixStreamServer.__init__(self, address, rqhandler) + self.do_quit = False + self.queue = jqueue.QueueManager() + self.processors = [] + for i in range(self.QUEUE_PROCESSOR_SIZE): + self.processors.append(threading.Thread(target=PoolWorker, + args=(i, self.queue.new_queue))) + for t in self.processors: + t.start() + signal.signal(signal.SIGINT, self.handle_sigint) + + def process_request_thread(self, request, client_address): + """Process the request. + + This is copied from the code in ThreadingMixIn. + + """ + try: + self.finish_request(request, client_address) + self.close_request(request) + except: + self.handle_error(request, client_address) + self.close_request(request) + + def process_request(self, request, client_address): + """Start a new thread to process the request. + + This is copied from the coode in ThreadingMixIn. + + """ + t = threading.Thread(target=self.process_request_thread, + args=(request, client_address)) + t.start() + + def handle_sigint(self, signum, frame): + print "received %s in %s" % (signum, frame) + self.do_quit = True + self.server_close() + for i in range(self.QUEUE_PROCESSOR_SIZE): + self.queue.new_queue.put(None) + + def serve_forever(self): + """Handle one request at a time until told to quit.""" + while not self.do_quit: + self.handle_request() + + +class ClientRqHandler(SocketServer.BaseRequestHandler): + """Client handler""" + EOM = '\3' + READ_SIZE = 4096 + + def setup(self): + self._buffer = "" + self._msgs = collections.deque() + self._ops = ClientOps(self.server) + + def handle(self): + while True: + msg = self.read_message() + if msg is None: + print "client closed connection" + break + request = simplejson.loads(msg) + if not isinstance(request, dict): + print "wrong request received: %s" % msg + break + method = request.get('request', None) + data = request.get('data', None) + if method is None or data is None: + print "no method or data in request" + break + print "request:", method, data + result = self._ops.handle_request(method, data) + print "result:", result + self.send_message(simplejson.dumps({'success': True, 'result': result})) + + def read_message(self): + while not self._msgs: + data = self.request.recv(self.READ_SIZE) + if not data: + return None + new_msgs = (self._buffer + data).split(self.EOM) + self._buffer = new_msgs.pop() + self._msgs.extend(new_msgs) + return self._msgs.popleft() + + def send_message(self, msg): + #print "sending", msg + self.request.sendall(msg + self.EOM) + + +class ClientOps: + """Class holding high-level client operations.""" + def __init__(self, server): + self.server = server + self._cpu = None + + def _getcpu(self): + if self._cpu is None: + self._cpu = mcpu.Processor(lambda x: None) + return self._cpu + + def handle_request(self, operation, args): + print operation, args + if operation == "submit": + return self.put(args) + elif operation == "query": + path = args["object"] + if path == "instances": + return self.query(args) + else: + raise ValueError("Invalid operation") + + def put(self, args): + job = luxi.UnserializeJob(args) + rid = self.server.queue.put(job) + return rid + + def query(self, args): + path = args["object"] + fields = args["fields"] + names = args["names"] + if path == "instances": + opclass = opcodes.OpQueryInstances + else: + raise ValueError("Invalid object %s" % path) + + op = opclass(output_fields = fields, names=names) + cpu = self._getcpu() + result = cpu.ExecOpCode(op) + return result + + def query_job(self, rid): + rid = int(data) + job = self.server.queue.query(rid) + return job + + +def JobRunner(proc, job): + """Job executor. + + This functions processes a single job in the context of given + processor instance. + + """ + job.SetStatus(opcodes.Job.STATUS_RUNNING) + for op in job.data.op_list: + proc.ExecOpCode(op) + job.SetStatus(opcodes.Job.STATUS_FINISHED, result=opcodes.Job.RESULT_OK) + + +def PoolWorker(worker_id, incoming_queue): + """A worker thread function. + + This is the actual processor of a single thread of Job execution. + + """ + while True: + print "worker %s sleeping" % worker_id + item = incoming_queue.get(True) + if item is None: + break + print "worker %s processing job %s" % (worker_id, item.data.job_id) + utils.Lock('cmd') + try: + proc = mcpu.Processor(feedback=lambda x: None) + try: + JobRunner(proc, item) + except errors.GenericError, err: + print "ganeti exception %s" % err + finally: + utils.Unlock('cmd') + utils.LockCleanup() + print "worker %s finish job %s" % (worker_id, item.data.job_id) + print "worker %s exiting" % worker_id + + +def main(): + """Main function""" + + master = IOServer(constants.MASTER_SOCKET, ClientRqHandler) + master.serve_forever() + + +if __name__ == "__main__": + main()