diff --git a/lib/rapi/baserlib.py b/lib/rapi/baserlib.py index 1914885946ebf1bd276f055192df5b041ab0c603..2bca63ba07a99ff9ab05fc86718ba16d9d0011a8 100644 --- a/lib/rapi/baserlib.py +++ b/lib/rapi/baserlib.py @@ -23,14 +23,17 @@ """ +import logging + import ganeti.cli -import ganeti.opcodes from ganeti import luxi from ganeti import rapi from ganeti import http from ganeti import ssconf from ganeti import constants +from ganeti import opcodes +from ganeti import errors def BuildUriList(ids, uri_format, uri_fields=("name", "uri")): @@ -85,8 +88,8 @@ def _Tags_GET(kind, name=""): """ if kind == constants.TAG_INSTANCE or kind == constants.TAG_NODE: if not name: - raise HttpBadRequest("Missing name on tag request") - cl = luxi.Client() + raise http.HttpBadRequest("Missing name on tag request") + cl = GetClient() if kind == constants.TAG_INSTANCE: fn = cl.QueryInstances else: @@ -106,18 +109,14 @@ def _Tags_PUT(kind, tags, name=""): """Helper function to set tags. """ - cl = luxi.Client() - return cl.SubmitJob([ganeti.opcodes.OpAddTags(kind=kind, name=name, - tags=tags)]) + return SubmitJob([opcodes.OpAddTags(kind=kind, name=name, tags=tags)]) def _Tags_DELETE(kind, tags, name=""): """Helper function to delete tags. """ - cl = luxi.Client() - return cl.SubmitJob([ganeti.opcodes.OpDelTags(kind=kind, name=name, - tags=tags)]) + return SubmitJob([opcodes.OpDelTags(kind=kind, name=name, tags=tags)]) def MapBulkFields(itemslist, fields): @@ -163,6 +162,51 @@ def MakeParamsDict(opts, params): return result +def SubmitJob(op, cl=None): + """Generic wrapper for submit job, for better http compatibility. + + @type op: list + @param op: the list of opcodes for the job + @type cl: None or luxi.Client + @param cl: optional luxi client to use + @rtype: string + @return: the job ID + + """ + try: + if cl is None: + cl = GetClient() + return cl.SubmitJob(op) + except errors.JobQueueFull: + raise http.HttpServiceUnavailable("Job queue is full, needs archiving") + except errors.JobQueueDrainError: + raise http.HttpServiceUnavailable("Job queue is drained, cannot submit") + except luxi.NoMasterError, err: + raise http.HttpBadGateway("Master seems to unreachable: %s" % str(err)) + except luxi.TimeoutError, err: + raise http.HttpGatewayTimeout("Timeout while talking to the master" + " daemon. Error: %s" % str(err)) + +def GetClient(): + """Geric wrapper for luxi.Client(), for better http compatiblity. + + """ + try: + return luxi.Client() + except luxi.NoMasterError, err: + raise http.HttpBadGateway("Master seems to unreachable: %s" % str(err)) + + +def FeedbackFn(ts, log_type, log_msg): + """Feedback logging function for http case. + + We don't have a stdout for printing log messages, so log them to the + http log at least. + + """ + logging.info("%s: %s", log_type, log_msg) + + class R_Generic(object): """Generic class for resources. diff --git a/lib/rapi/rlib2.py b/lib/rapi/rlib2.py index 79b95da16db810cf2c228db8d3f373e0fcdaab06..738f86f029f1a34e9da41e2f1256529de0f7674c 100644 --- a/lib/rapi/rlib2.py +++ b/lib/rapi/rlib2.py @@ -23,13 +23,14 @@ """ -import ganeti.opcodes +from ganeti import opcodes from ganeti import http -from ganeti import luxi from ganeti import constants +from ganeti import cli from ganeti.rapi import baserlib + I_FIELDS = ["name", "admin_state", "os", "pnode", "snodes", "disk_template", @@ -105,7 +106,7 @@ class R_2_info(baserlib.R_Generic): } """ - client = luxi.Client() + client = baserlib.GetClient() return client.QueryClusterInfo() @@ -123,12 +124,15 @@ class R_2_os(baserlib.R_Generic): Example: ["debian-etch"] """ - op = ganeti.opcodes.OpDiagnoseOS(output_fields=["name", "valid"], - names=[]) - diagnose_data = ganeti.cli.SubmitOpCode(op) + cl = baserlib.GetClient() + op = opcodes.OpDiagnoseOS(output_fields=["name", "valid"], names=[]) + job_id = baserlib.SubmitJob([op], cl) + # we use custom feedback function, instead of print we log the status + result = cli.PollJob(job_id, cl, feedback_fn=baserlib.FeedbackFn) + diagnose_data = result[0] if not isinstance(diagnose_data, list): - raise http.HttpInternalServerError(message="Can't get OS list") + raise http.HttpBadGateway(message="Can't get OS list") return [row[0] for row in diagnose_data if row[1]] @@ -146,8 +150,9 @@ class R_2_jobs(baserlib.R_Generic): """ fields = ["id"] + cl = baserlib.GetClient() # Convert the list of lists to the list of ids - result = [job_id for [job_id] in luxi.Client().QueryJobs(None, fields)] + result = [job_id for [job_id] in cl.QueryJobs(None, fields)] return baserlib.BuildUriList(result, "/2/jobs/%s", uri_fields=("id", "uri")) @@ -176,7 +181,7 @@ class R_2_jobs_id(baserlib.R_Generic): "received_ts", "start_ts", "end_ts", ] job_id = self.items[0] - result = luxi.Client().QueryJobs([job_id, ], fields)[0] + result = baserlib.GetClient().QueryJobs([job_id, ], fields)[0] if result is None: raise http.HttpNotFound() return baserlib.MapFields(fields, result) @@ -186,7 +191,7 @@ class R_2_jobs_id(baserlib.R_Generic): """ job_id = self.items[0] - result = luxi.Client().CancelJob(job_id) + result = baserlib.GetClient().CancelJob(job_id) return result @@ -237,7 +242,7 @@ class R_2_nodes(baserlib.R_Generic): @return: a dictionary with 'name' and 'uri' keys for each of them """ - client = luxi.Client() + client = baserlib.GetClient() if self.useBulk(): bulkdata = client.QueryNodes([], N_FIELDS, False) @@ -260,7 +265,7 @@ class R_2_nodes_name(baserlib.R_Generic): """ node_name = self.items[0] - client = luxi.Client() + client = baserlib.GetClient() result = client.QueryNodes(names=[node_name], fields=N_FIELDS, use_locking=self.useLocking()) @@ -326,7 +331,7 @@ class R_2_instances(baserlib.R_Generic): @return: a dictionary with 'name' and 'uri' keys for each of them. """ - client = luxi.Client() + client = baserlib.GetClient() use_locking = self.useLocking() if self.useBulk(): @@ -368,28 +373,27 @@ class R_2_instances(baserlib.R_Generic): "ip": fn("ip", None), "bridge": fn("bridge", None)}] - op = ganeti.opcodes.OpCreateInstance( - mode=constants.INSTANCE_CREATE, - instance_name=fn('name'), - disks=disks, - disk_template=fn('disk_template'), - os_type=fn('os'), - pnode=fn('pnode', None), - snode=fn('snode', None), - iallocator=fn('iallocator', None), - nics=nics, - start=fn('start', True), - ip_check=fn('ip_check', True), - wait_for_sync=True, - hypervisor=fn('hypervisor', None), - hvparams=hvparams, - beparams=beparams, - file_storage_dir=fn('file_storage_dir', None), - file_driver=fn('file_driver', 'loop'), - ) - - job_id = ganeti.cli.SendJob([op]) - return job_id + op = opcodes.OpCreateInstance( + mode=constants.INSTANCE_CREATE, + instance_name=fn('name'), + disks=disks, + disk_template=fn('disk_template'), + os_type=fn('os'), + pnode=fn('pnode', None), + snode=fn('snode', None), + iallocator=fn('iallocator', None), + nics=nics, + start=fn('start', True), + ip_check=fn('ip_check', True), + wait_for_sync=True, + hypervisor=fn('hypervisor', None), + hvparams=hvparams, + beparams=beparams, + file_storage_dir=fn('file_storage_dir', None), + file_driver=fn('file_driver', 'loop'), + ) + + return baserlib.SubmitJob([op]) class R_2_instances_name(baserlib.R_Generic): @@ -402,7 +406,7 @@ class R_2_instances_name(baserlib.R_Generic): """Send information about an instance. """ - client = luxi.Client() + client = baserlib.GetClient() instance_name = self.items[0] result = client.QueryInstances(names=[instance_name], fields=I_FIELDS, use_locking=self.useLocking()) @@ -413,10 +417,9 @@ class R_2_instances_name(baserlib.R_Generic): """Delete an instance. """ - op = ganeti.opcodes.OpRemoveInstance(instance_name=self.items[0], - ignore_failures=False) - job_id = ganeti.cli.SendJob([op]) - return job_id + op = opcodes.OpRemoveInstance(instance_name=self.items[0], + ignore_failures=False) + return baserlib.SubmitJob([op]) class R_2_instances_name_reboot(baserlib.R_Generic): @@ -440,14 +443,11 @@ class R_2_instances_name_reboot(baserlib.R_Generic): [constants.INSTANCE_REBOOT_HARD])[0] ignore_secondaries = bool(self.queryargs.get('ignore_secondaries', [False])[0]) - op = ganeti.opcodes.OpRebootInstance( - instance_name=instance_name, - reboot_type=reboot_type, - ignore_secondaries=ignore_secondaries) - - job_id = ganeti.cli.SendJob([op]) + op = opcodes.OpRebootInstance(instance_name=instance_name, + reboot_type=reboot_type, + ignore_secondaries=ignore_secondaries) - return job_id + return baserlib.SubmitJob([op]) class R_2_instances_name_startup(baserlib.R_Generic): @@ -468,12 +468,10 @@ class R_2_instances_name_startup(baserlib.R_Generic): """ instance_name = self.items[0] force_startup = bool(self.queryargs.get('force', [False])[0]) - op = ganeti.opcodes.OpStartupInstance(instance_name=instance_name, - force=force_startup) + op = opcodes.OpStartupInstance(instance_name=instance_name, + force=force_startup) - job_id = ganeti.cli.SendJob([op]) - - return job_id + return baserlib.SubmitJob([op]) class R_2_instances_name_shutdown(baserlib.R_Generic): @@ -490,11 +488,9 @@ class R_2_instances_name_shutdown(baserlib.R_Generic): """ instance_name = self.items[0] - op = ganeti.opcodes.OpShutdownInstance(instance_name=instance_name) - - job_id = ganeti.cli.SendJob([op]) + op = opcodes.OpShutdownInstance(instance_name=instance_name) - return job_id + return baserlib.SubmitJob([op]) class _R_Tags(baserlib.R_Generic):