From 685ee993992d017268113283054b8cb595814105 Mon Sep 17 00:00:00 2001 From: Iustin Pop <iustin@google.com> Date: Mon, 28 Apr 2008 13:01:49 +0000 Subject: [PATCH] Convert cli.SubmitOpCode to use the master This patch converts the cli.py SubmitOpCode method to use the unix protocol and thus execute the opcodes via the master. The patch allows a partial burnin to work with the master. Currently the query opcodes, since they are executed via the SubmitOpCode, are executed inside a job too, which is suboptimal, but they work fine. The cmd lock has been removed from the master, but the cli.py still takes the lock. This is ok for this in-progress patch (since the master still has only one executor thread). This will be fixed in a future patch. Reviewed-by: ultrotter --- daemons/ganeti-masterd | 9 +++++---- lib/cli.py | 31 +++++++++++++++++++++++++++++++ lib/cmdlib.py | 6 ++++-- lib/errors.py | 9 +++++++++ tools/burnin | 2 +- 5 files changed, 50 insertions(+), 7 deletions(-) diff --git a/daemons/ganeti-masterd b/daemons/ganeti-masterd index d67b6fd4d..05ef91d61 100644 --- a/daemons/ganeti-masterd +++ b/daemons/ganeti-masterd @@ -1,4 +1,4 @@ -#!/usr/bin/python +#!/usr/bin/python -u # # Copyright (C) 2006, 2007 Google Inc. @@ -264,7 +264,7 @@ def PoolWorker(worker_id, incoming_queue): if item is None: break print "worker %s processing job %s" % (worker_id, item.data.job_id) - utils.Lock('cmd') + #utils.Lock('cmd') try: proc = mcpu.Processor(feedback=lambda x: None) try: @@ -272,8 +272,9 @@ def PoolWorker(worker_id, incoming_queue): except errors.GenericError, err: print "ganeti exception %s" % err finally: - utils.Unlock('cmd') - utils.LockCleanup() + #utils.Unlock('cmd') + #utils.LockCleanup() + pass print "worker %s finish job %s" % (worker_id, item.data.job_id) print "worker %s exiting" % worker_id diff --git a/lib/cli.py b/lib/cli.py index 31bc0b7cc..a5af42747 100644 --- a/lib/cli.py +++ b/lib/cli.py @@ -26,6 +26,7 @@ import sys import textwrap import os.path import copy +import time from cStringIO import StringIO from ganeti import utils @@ -381,6 +382,36 @@ def SubmitOpCode(op, proc=None, feedback_fn=None): interaction functions. """ + cl = luxi.Client() + job = opcodes.Job(op_list=[op]) + jid = SubmitJob(job) + + query = { + "object": "jobs", + "fields": ["status"], + "names": [jid], + } + + while True: + jdata = SubmitQuery(query) + if not jdata: + # job not found, go away! + raise errors.JobLost("Job with id %s lost" % jid) + + status = jdata[0][0] + if status in (opcodes.Job.STATUS_SUCCESS, opcodes.Job.STATUS_FAIL): + break + time.sleep(1) + + query["fields"].extend(["op_list", "op_status", "op_result"]) + jdata = SubmitQuery(query) + if not jdata: + raise errors.JobLost("Job with id %s lost" % jid) + status, op_list, op_status, op_result = jdata[0] + if status != opcodes.Job.STATUS_SUCCESS: + raise errors.OpExecError(op_result[0]) + return op_result[0] + if feedback_fn is None: feedback_fn = logger.ToStdout if proc is None: diff --git a/lib/cmdlib.py b/lib/cmdlib.py index 05484cf8a..27ac7df76 100644 --- a/lib/cmdlib.py +++ b/lib/cmdlib.py @@ -1221,12 +1221,14 @@ def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False): break if unlock: - utils.Unlock('cmd') + #utils.Unlock('cmd') + pass try: time.sleep(min(60, max_time)) finally: if unlock: - utils.Lock('cmd') + #utils.Lock('cmd') + pass if done: proc.LogInfo("Instance %s's disks are in sync." % instance.name) diff --git a/lib/errors.py b/lib/errors.py index 95125e969..6276ac0d0 100644 --- a/lib/errors.py +++ b/lib/errors.py @@ -152,6 +152,15 @@ class OpCodeUnknown(GenericError): """ +class JobLost(GenericError): + """Submitted job lost. + + The job was submitted but it cannot be found in the current job + list. + + """ + + class ResolverError(GenericError): """Host name cannot be resolved. diff --git a/tools/burnin b/tools/burnin index 3a63d9b18..52e3996c1 100755 --- a/tools/burnin +++ b/tools/burnin @@ -90,7 +90,7 @@ class Burner(object): def ExecOp(self, op): """Execute an opcode and manage the exec buffer.""" self.ClearFeedbackBuf() - return self.proc.ExecOpCode(op) + return cli.SubmitOpCode(op) def ParseOptions(self): """Parses the command line options. -- GitLab