Commit 59b4eeef authored by Iustin Pop's avatar Iustin Pop
Browse files

rapi: rework error handling



Currently the rapi code doesn't have any custom error handling; any
exceptions raised are simply converted into an HTTP 500 error, without
much explanation.

This patch adds a couple of generic SubmitJob/GetClient functions that
handle some errors specially so that they are transformed into HTTP
errors, with more detailed information.

With this patch, the behaviour of rapi when the queue is full or
drained, or when the master is down is more readable.
Signed-off-by: default avatarIustin Pop <iustin@google.com>
Reviewed-by: default avatarGuido Trotter <ultrotter@google.com>
parent 030b218a
......@@ -23,14 +23,17 @@
"""
import logging
import ganeti.cli
import ganeti.opcodes
from ganeti import luxi
from ganeti import rapi
from ganeti import http
from ganeti import ssconf
from ganeti import constants
from ganeti import opcodes
from ganeti import errors
def BuildUriList(ids, uri_format, uri_fields=("name", "uri")):
......@@ -85,8 +88,8 @@ def _Tags_GET(kind, name=""):
"""
if kind == constants.TAG_INSTANCE or kind == constants.TAG_NODE:
if not name:
raise HttpBadRequest("Missing name on tag request")
cl = luxi.Client()
raise http.HttpBadRequest("Missing name on tag request")
cl = GetClient()
if kind == constants.TAG_INSTANCE:
fn = cl.QueryInstances
else:
......@@ -106,18 +109,14 @@ def _Tags_PUT(kind, tags, name=""):
"""Helper function to set tags.
"""
cl = luxi.Client()
return cl.SubmitJob([ganeti.opcodes.OpAddTags(kind=kind, name=name,
tags=tags)])
return SubmitJob([opcodes.OpAddTags(kind=kind, name=name, tags=tags)])
def _Tags_DELETE(kind, tags, name=""):
"""Helper function to delete tags.
"""
cl = luxi.Client()
return cl.SubmitJob([ganeti.opcodes.OpDelTags(kind=kind, name=name,
tags=tags)])
return SubmitJob([opcodes.OpDelTags(kind=kind, name=name, tags=tags)])
def MapBulkFields(itemslist, fields):
......@@ -163,6 +162,51 @@ def MakeParamsDict(opts, params):
return result
def SubmitJob(op, cl=None):
"""Generic wrapper for submit job, for better http compatibility.
@type op: list
@param op: the list of opcodes for the job
@type cl: None or luxi.Client
@param cl: optional luxi client to use
@rtype: string
@return: the job ID
"""
try:
if cl is None:
cl = GetClient()
return cl.SubmitJob(op)
except errors.JobQueueFull:
raise http.HttpServiceUnavailable("Job queue is full, needs archiving")
except errors.JobQueueDrainError:
raise http.HttpServiceUnavailable("Job queue is drained, cannot submit")
except luxi.NoMasterError, err:
raise http.HttpBadGateway("Master seems to unreachable: %s" % str(err))
except luxi.TimeoutError, err:
raise http.HttpGatewayTimeout("Timeout while talking to the master"
" daemon. Error: %s" % str(err))
def GetClient():
"""Geric wrapper for luxi.Client(), for better http compatiblity.
"""
try:
return luxi.Client()
except luxi.NoMasterError, err:
raise http.HttpBadGateway("Master seems to unreachable: %s" % str(err))
def FeedbackFn(ts, log_type, log_msg):
"""Feedback logging function for http case.
We don't have a stdout for printing log messages, so log them to the
http log at least.
"""
logging.info("%s: %s", log_type, log_msg)
class R_Generic(object):
"""Generic class for resources.
......
......@@ -23,13 +23,14 @@
"""
import ganeti.opcodes
from ganeti import opcodes
from ganeti import http
from ganeti import luxi
from ganeti import constants
from ganeti import cli
from ganeti.rapi import baserlib
I_FIELDS = ["name", "admin_state", "os",
"pnode", "snodes",
"disk_template",
......@@ -105,7 +106,7 @@ class R_2_info(baserlib.R_Generic):
}
"""
client = luxi.Client()
client = baserlib.GetClient()
return client.QueryClusterInfo()
......@@ -123,12 +124,15 @@ class R_2_os(baserlib.R_Generic):
Example: ["debian-etch"]
"""
op = ganeti.opcodes.OpDiagnoseOS(output_fields=["name", "valid"],
names=[])
diagnose_data = ganeti.cli.SubmitOpCode(op)
cl = baserlib.GetClient()
op = opcodes.OpDiagnoseOS(output_fields=["name", "valid"], names=[])
job_id = baserlib.SubmitJob([op], cl)
# we use custom feedback function, instead of print we log the status
result = cli.PollJob(job_id, cl, feedback_fn=baserlib.FeedbackFn)
diagnose_data = result[0]
if not isinstance(diagnose_data, list):
raise http.HttpInternalServerError(message="Can't get OS list")
raise http.HttpBadGateway(message="Can't get OS list")
return [row[0] for row in diagnose_data if row[1]]
......@@ -146,8 +150,9 @@ class R_2_jobs(baserlib.R_Generic):
"""
fields = ["id"]
cl = baserlib.GetClient()
# Convert the list of lists to the list of ids
result = [job_id for [job_id] in luxi.Client().QueryJobs(None, fields)]
result = [job_id for [job_id] in cl.QueryJobs(None, fields)]
return baserlib.BuildUriList(result, "/2/jobs/%s",
uri_fields=("id", "uri"))
......@@ -176,7 +181,7 @@ class R_2_jobs_id(baserlib.R_Generic):
"received_ts", "start_ts", "end_ts",
]
job_id = self.items[0]
result = luxi.Client().QueryJobs([job_id, ], fields)[0]
result = baserlib.GetClient().QueryJobs([job_id, ], fields)[0]
if result is None:
raise http.HttpNotFound()
return baserlib.MapFields(fields, result)
......@@ -186,7 +191,7 @@ class R_2_jobs_id(baserlib.R_Generic):
"""
job_id = self.items[0]
result = luxi.Client().CancelJob(job_id)
result = baserlib.GetClient().CancelJob(job_id)
return result
......@@ -237,7 +242,7 @@ class R_2_nodes(baserlib.R_Generic):
@return: a dictionary with 'name' and 'uri' keys for each of them
"""
client = luxi.Client()
client = baserlib.GetClient()
if self.useBulk():
bulkdata = client.QueryNodes([], N_FIELDS, False)
......@@ -260,7 +265,7 @@ class R_2_nodes_name(baserlib.R_Generic):
"""
node_name = self.items[0]
client = luxi.Client()
client = baserlib.GetClient()
result = client.QueryNodes(names=[node_name], fields=N_FIELDS,
use_locking=self.useLocking())
......@@ -326,7 +331,7 @@ class R_2_instances(baserlib.R_Generic):
@return: a dictionary with 'name' and 'uri' keys for each of them.
"""
client = luxi.Client()
client = baserlib.GetClient()
use_locking = self.useLocking()
if self.useBulk():
......@@ -368,28 +373,27 @@ class R_2_instances(baserlib.R_Generic):
"ip": fn("ip", None),
"bridge": fn("bridge", None)}]
op = ganeti.opcodes.OpCreateInstance(
mode=constants.INSTANCE_CREATE,
instance_name=fn('name'),
disks=disks,
disk_template=fn('disk_template'),
os_type=fn('os'),
pnode=fn('pnode', None),
snode=fn('snode', None),
iallocator=fn('iallocator', None),
nics=nics,
start=fn('start', True),
ip_check=fn('ip_check', True),
wait_for_sync=True,
hypervisor=fn('hypervisor', None),
hvparams=hvparams,
beparams=beparams,
file_storage_dir=fn('file_storage_dir', None),
file_driver=fn('file_driver', 'loop'),
)
job_id = ganeti.cli.SendJob([op])
return job_id
op = opcodes.OpCreateInstance(
mode=constants.INSTANCE_CREATE,
instance_name=fn('name'),
disks=disks,
disk_template=fn('disk_template'),
os_type=fn('os'),
pnode=fn('pnode', None),
snode=fn('snode', None),
iallocator=fn('iallocator', None),
nics=nics,
start=fn('start', True),
ip_check=fn('ip_check', True),
wait_for_sync=True,
hypervisor=fn('hypervisor', None),
hvparams=hvparams,
beparams=beparams,
file_storage_dir=fn('file_storage_dir', None),
file_driver=fn('file_driver', 'loop'),
)
return baserlib.SubmitJob([op])
class R_2_instances_name(baserlib.R_Generic):
......@@ -402,7 +406,7 @@ class R_2_instances_name(baserlib.R_Generic):
"""Send information about an instance.
"""
client = luxi.Client()
client = baserlib.GetClient()
instance_name = self.items[0]
result = client.QueryInstances(names=[instance_name], fields=I_FIELDS,
use_locking=self.useLocking())
......@@ -413,10 +417,9 @@ class R_2_instances_name(baserlib.R_Generic):
"""Delete an instance.
"""
op = ganeti.opcodes.OpRemoveInstance(instance_name=self.items[0],
ignore_failures=False)
job_id = ganeti.cli.SendJob([op])
return job_id
op = opcodes.OpRemoveInstance(instance_name=self.items[0],
ignore_failures=False)
return baserlib.SubmitJob([op])
class R_2_instances_name_reboot(baserlib.R_Generic):
......@@ -440,14 +443,11 @@ class R_2_instances_name_reboot(baserlib.R_Generic):
[constants.INSTANCE_REBOOT_HARD])[0]
ignore_secondaries = bool(self.queryargs.get('ignore_secondaries',
[False])[0])
op = ganeti.opcodes.OpRebootInstance(
instance_name=instance_name,
reboot_type=reboot_type,
ignore_secondaries=ignore_secondaries)
job_id = ganeti.cli.SendJob([op])
op = opcodes.OpRebootInstance(instance_name=instance_name,
reboot_type=reboot_type,
ignore_secondaries=ignore_secondaries)
return job_id
return baserlib.SubmitJob([op])
class R_2_instances_name_startup(baserlib.R_Generic):
......@@ -468,12 +468,10 @@ class R_2_instances_name_startup(baserlib.R_Generic):
"""
instance_name = self.items[0]
force_startup = bool(self.queryargs.get('force', [False])[0])
op = ganeti.opcodes.OpStartupInstance(instance_name=instance_name,
force=force_startup)
op = opcodes.OpStartupInstance(instance_name=instance_name,
force=force_startup)
job_id = ganeti.cli.SendJob([op])
return job_id
return baserlib.SubmitJob([op])
class R_2_instances_name_shutdown(baserlib.R_Generic):
......@@ -490,11 +488,9 @@ class R_2_instances_name_shutdown(baserlib.R_Generic):
"""
instance_name = self.items[0]
op = ganeti.opcodes.OpShutdownInstance(instance_name=instance_name)
job_id = ganeti.cli.SendJob([op])
op = opcodes.OpShutdownInstance(instance_name=instance_name)
return job_id
return baserlib.SubmitJob([op])
class _R_Tags(baserlib.R_Generic):
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment