diff --git a/lib/cli.py b/lib/cli.py index f9a9628e0e14a3a0961d92d00da924e618a1cd29..b08506a42309a9ac11384b1233575c8854e981b2 100644 --- a/lib/cli.py +++ b/lib/cli.py @@ -1035,7 +1035,6 @@ class JobExecutor(object): """ self.queue.append((name, ops)) - def SubmitPending(self): """Submit all pending jobs. diff --git a/lib/constants.py b/lib/constants.py index 13dad7ad9c3a164f871936372e7bba73747baa30..e664a8d3b2adc4d658267faf62015f74f2f2a744 100644 --- a/lib/constants.py +++ b/lib/constants.py @@ -468,13 +468,19 @@ JOB_STATUS_CANCELED = "canceled" JOB_STATUS_SUCCESS = "success" JOB_STATUS_ERROR = "error" +# OpCode status +# not yet finalized OP_STATUS_QUEUED = "queued" OP_STATUS_WAITLOCK = "waiting" OP_STATUS_CANCELING = "canceling" OP_STATUS_RUNNING = "running" +# finalized OP_STATUS_CANCELED = "canceled" OP_STATUS_SUCCESS = "success" OP_STATUS_ERROR = "error" +OPS_FINALIZED = frozenset([OP_STATUS_CANCELED, + OP_STATUS_SUCCESS, + OP_STATUS_ERROR]) # Execution log types ELOG_MESSAGE = "message" diff --git a/lib/jqueue.py b/lib/jqueue.py index 74139b7b5aac54bfc78d2eecec128531f4a3fd47..9aa0baba7a51e558a3ecf262133c23982b3e2d65 100644 --- a/lib/jqueue.py +++ b/lib/jqueue.py @@ -80,6 +80,10 @@ class _QueuedOpCode(object): @ivar stop_timestamp: timestamp for the end of the execution """ + __slots__ = ["input", "status", "result", "log", + "start_timestamp", "end_timestamp", + "__weakref__"] + def __init__(self, op): """Constructor for the _QuededOpCode. @@ -152,6 +156,11 @@ class _QueuedJob(object): @ivar change: a Condition variable we use for waiting for job changes """ + __slots__ = ["queue", "id", "ops", "run_op_index", "log_serial", + "received_timestamp", "start_timestamp", "end_timestamp", + "change", + "__weakref__"] + def __init__(self, queue, job_id, ops): """Constructor for the _QueuedJob. @@ -304,6 +313,26 @@ class _QueuedJob(object): return entries + def MarkUnfinishedOps(self, status, result): + """Mark unfinished opcodes with a given status and result. + + This is an utility function for marking all running or waiting to + be run opcodes with a given status. Opcodes which are already + finalised are not changed. + + @param status: a given opcode status + @param result: the opcode result + + """ + not_marked = True + for op in self.ops: + if op.status in constants.OPS_FINALIZED: + assert not_marked, "Finalized opcodes found after non-finalized ones" + continue + op.status = status + op.result = result + not_marked = False + class _JobQueueWorker(workerpool.BaseWorker): """The actual job workers. @@ -353,6 +382,15 @@ class _JobQueueWorker(workerpool.BaseWorker): count = len(job.ops) for idx, op in enumerate(job.ops): op_summary = op.input.Summary() + if op.status == constants.OP_STATUS_SUCCESS: + # this is a job that was partially completed before master + # daemon shutdown, so it can be expected that some opcodes + # are already completed successfully (if any did error + # out, then the whole job should have been aborted and not + # resubmitted for processing) + logging.info("Op %s/%s: opcode %s already processed, skipping", + idx + 1, count, op_summary) + continue try: logging.info("Op %s/%s: Starting opcode %s", idx + 1, count, op_summary) @@ -446,7 +484,7 @@ class _JobQueueWorker(workerpool.BaseWorker): queue.acquire() try: try: - job.run_op_idx = -1 + job.run_op_index = -1 job.end_timestamp = TimeStampNow() queue.UpdateJobUnlocked(job) finally: @@ -575,9 +613,8 @@ class JobQueue(object): constants.JOB_STATUS_CANCELING): logging.warning("Unfinished job %s found: %s", job.id, job) try: - for op in job.ops: - op.status = constants.OP_STATUS_ERROR - op.result = "Unclean master daemon shutdown" + job.MarkUnfinishedOps(constants.OP_STATUS_ERROR, + "Unclean master daemon shutdown") finally: self.UpdateJobUnlocked(job) @@ -1145,8 +1182,7 @@ class JobQueue(object): elif job_status == constants.JOB_STATUS_WAITLOCK: # The worker will notice the new status and cancel the job try: - for op in job.ops: - op.status = constants.OP_STATUS_CANCELING + job.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None) finally: self.UpdateJobUnlocked(job) return (True, "Job %s will be canceled" % job.id) @@ -1157,9 +1193,8 @@ class JobQueue(object): """ try: - for op in job.ops: - op.status = constants.OP_STATUS_CANCELED - op.result = "Job canceled by request" + job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED, + "Job canceled by request") finally: self.UpdateJobUnlocked(job) diff --git a/lib/rpc.py b/lib/rpc.py index b02cc1afb084bf93856c57b45c61965c0d4f5fdb..e28db0e6a90d86f828c7403eb46772c7ad9f7b2c 100644 --- a/lib/rpc.py +++ b/lib/rpc.py @@ -301,7 +301,7 @@ class RpcRunner(object): def _ConnectList(self, client, node_list, call): """Helper for computing node addresses. - @type client: L{Client} + @type client: L{ganeti.rpc.Client} @param client: a C{Client} instance @type node_list: list @param node_list: the node list we should connect @@ -331,7 +331,7 @@ class RpcRunner(object): def _ConnectNode(self, client, node, call): """Helper for computing one node's address. - @type client: L{Client} + @type client: L{ganeti.rpc.Client} @param client: a C{Client} instance @type node: str @param node: the node we should connect diff --git a/scripts/gnt-debug b/scripts/gnt-debug index d3bf05450c3c08cc6ad94de10d744dbc4d3273cd..3a12b2b7aaa9c1ec111a6d0f00c7ec081706243f 100755 --- a/scripts/gnt-debug +++ b/scripts/gnt-debug @@ -71,19 +71,14 @@ def GenericOpCodes(opts, args): """ cl = cli.GetClient() - job_data = [] - job_ids = [] + jex = cli.JobExecutor(cl=cl) + for fname in args: op_data = simplejson.loads(open(fname).read()) op_list = [opcodes.OpCode.LoadOpCode(val) for val in op_data] - job_data.append((fname, op_list)) - for fname, op_list in job_data: - jid = cli.SendJob(op_list, cl=cl) - ToStdout("File '%s', job id: %s", fname, jid) - job_ids.append(jid) - for jid in job_ids: - ToStdout("Waiting for job id %s", jid) - cli.PollJob(jid, cl=cl) + jex.QueueJob("file %s" % fname, *op_list) + + jex.GetResults() return 0