Commit 685ee993 authored by Iustin Pop's avatar Iustin Pop
Browse files

Convert cli.SubmitOpCode to use the master

This patch converts the cli.py SubmitOpCode method to use the unix
protocol and thus execute the opcodes via the master.

The patch allows a partial burnin to work with the master. Currently the
query opcodes, since they are executed via the SubmitOpCode, are
executed inside a job too, which is suboptimal, but they work fine.

The cmd lock has been removed from the master, but the cli.py still
takes the lock. This is ok for this in-progress patch (since the master
still has only one executor thread). This will be fixed in a future
patch.

Reviewed-by: ultrotter
parent fd38ef95
#!/usr/bin/python
#!/usr/bin/python -u
#
# Copyright (C) 2006, 2007 Google Inc.
......@@ -264,7 +264,7 @@ def PoolWorker(worker_id, incoming_queue):
if item is None:
break
print "worker %s processing job %s" % (worker_id, item.data.job_id)
utils.Lock('cmd')
#utils.Lock('cmd')
try:
proc = mcpu.Processor(feedback=lambda x: None)
try:
......@@ -272,8 +272,9 @@ def PoolWorker(worker_id, incoming_queue):
except errors.GenericError, err:
print "ganeti exception %s" % err
finally:
utils.Unlock('cmd')
utils.LockCleanup()
#utils.Unlock('cmd')
#utils.LockCleanup()
pass
print "worker %s finish job %s" % (worker_id, item.data.job_id)
print "worker %s exiting" % worker_id
......
......@@ -26,6 +26,7 @@ import sys
import textwrap
import os.path
import copy
import time
from cStringIO import StringIO
from ganeti import utils
......@@ -381,6 +382,36 @@ def SubmitOpCode(op, proc=None, feedback_fn=None):
interaction functions.
"""
cl = luxi.Client()
job = opcodes.Job(op_list=[op])
jid = SubmitJob(job)
query = {
"object": "jobs",
"fields": ["status"],
"names": [jid],
}
while True:
jdata = SubmitQuery(query)
if not jdata:
# job not found, go away!
raise errors.JobLost("Job with id %s lost" % jid)
status = jdata[0][0]
if status in (opcodes.Job.STATUS_SUCCESS, opcodes.Job.STATUS_FAIL):
break
time.sleep(1)
query["fields"].extend(["op_list", "op_status", "op_result"])
jdata = SubmitQuery(query)
if not jdata:
raise errors.JobLost("Job with id %s lost" % jid)
status, op_list, op_status, op_result = jdata[0]
if status != opcodes.Job.STATUS_SUCCESS:
raise errors.OpExecError(op_result[0])
return op_result[0]
if feedback_fn is None:
feedback_fn = logger.ToStdout
if proc is None:
......
......@@ -1221,12 +1221,14 @@ def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
break
if unlock:
utils.Unlock('cmd')
#utils.Unlock('cmd')
pass
try:
time.sleep(min(60, max_time))
finally:
if unlock:
utils.Lock('cmd')
#utils.Lock('cmd')
pass
if done:
proc.LogInfo("Instance %s's disks are in sync." % instance.name)
......
......@@ -152,6 +152,15 @@ class OpCodeUnknown(GenericError):
"""
class JobLost(GenericError):
"""Submitted job lost.
The job was submitted but it cannot be found in the current job
list.
"""
class ResolverError(GenericError):
"""Host name cannot be resolved.
......
......@@ -90,7 +90,7 @@ class Burner(object):
def ExecOp(self, op):
"""Execute an opcode and manage the exec buffer."""
self.ClearFeedbackBuf()
return self.proc.ExecOpCode(op)
return cli.SubmitOpCode(op)
def ParseOptions(self):
"""Parses the command line options.
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment