diff --git a/Makefile.am b/Makefile.am index 6b27721a3cfbb70c4fa1e6b026815ca0c2561a53..c69cc34652ce3cfbce8f19fae6d947f9f4b08e6e 100644 --- a/Makefile.am +++ b/Makefile.am @@ -128,6 +128,7 @@ rapi_PYTHON = \ lib/rapi/__init__.py \ lib/rapi/baserlib.py \ lib/rapi/client.py \ + lib/rapi/client_utils.py \ lib/rapi/connector.py \ lib/rapi/rlib2.py diff --git a/doc/rapi.rst b/doc/rapi.rst index a8a0dc2029526f10b355d57b13673338fc3febd7..21ec4812fefcddf77b6fc97a59c322ed16c109a4 100644 --- a/doc/rapi.rst +++ b/doc/rapi.rst @@ -651,6 +651,30 @@ while by a resource we refer to an instance's disk, or NIC, etc. Cancel a not-yet-started job. + +``/2/jobs/[job_id]/wait`` ++++++++++++++++++++++++++ + +``GET`` +~~~~~~~ + +Waits for changes on a job. Takes the following body parameters in a +dict: + +``fields`` + The job fields on which to watch for changes. + +``previous_job_info`` + Previously received field values or None if not yet available. + +``previous_log_serial`` + Highest log serial number received so far or None if not yet + available. + +Returns None if no changes have been detected and a dict with two keys, +``job_info`` and ``log_entries`` otherwise. + + ``/2/nodes`` ++++++++++++ diff --git a/lib/cli.py b/lib/cli.py index b7722e20955b93196ed782f19fc9602d1e6679d8..021591f6a43882307ea5e9a5614ae3e07115fdbf 100644 --- a/lib/cli.py +++ b/lib/cli.py @@ -1250,41 +1250,31 @@ def SendJob(ops, cl=None): return job_id -def PollJob(job_id, cl=None, feedback_fn=None): - """Function to poll for the result of a job. +def GenericPollJob(job_id, cbs, report_cbs): + """Generic job-polling function. - @type job_id: job identified - @param job_id: the job to poll for results - @type cl: luxi.Client - @param cl: the luxi client to use for communicating with the master; - if None, a new client will be created + @type job_id: number + @param job_id: Job ID + @type cbs: Instance of L{JobPollCbBase} + @param cbs: Data callbacks + @type report_cbs: Instance of L{JobPollReportCbBase} + @param report_cbs: Reporting callbacks """ - if cl is None: - cl = GetClient() - prev_job_info = None prev_logmsg_serial = None status = None - notified_queued = False - notified_waitlock = False - while True: - result = cl.WaitForJobChangeOnce(job_id, ["status"], prev_job_info, - prev_logmsg_serial) + result = cbs.WaitForJobChangeOnce(job_id, ["status"], prev_job_info, + prev_logmsg_serial) if not result: # job not found, go away! raise errors.JobLost("Job with id %s lost" % job_id) - elif result == constants.JOB_NOTCHANGED: - if status is not None and not callable(feedback_fn): - if status == constants.JOB_STATUS_QUEUED and not notified_queued: - ToStderr("Job %s is waiting in queue", job_id) - notified_queued = True - elif status == constants.JOB_STATUS_WAITLOCK and not notified_waitlock: - ToStderr("Job %s is trying to acquire all necessary locks", job_id) - notified_waitlock = True + + if result == constants.JOB_NOTCHANGED: + report_cbs.ReportNotChanged(job_id, status) # Wait again continue @@ -1295,12 +1285,9 @@ def PollJob(job_id, cl=None, feedback_fn=None): if log_entries: for log_entry in log_entries: - (serial, timestamp, _, message) = log_entry - if callable(feedback_fn): - feedback_fn(log_entry[1:]) - else: - encoded = utils.SafeEncode(message) - ToStdout("%s %s", time.ctime(utils.MergeTime(timestamp)), encoded) + (serial, timestamp, log_type, message) = log_entry + report_cbs.ReportLogMessage(job_id, serial, timestamp, + log_type, message) prev_logmsg_serial = max(prev_logmsg_serial, serial) # TODO: Handle canceled and archived jobs @@ -1312,30 +1299,189 @@ def PollJob(job_id, cl=None, feedback_fn=None): prev_job_info = job_info - jobs = cl.QueryJobs([job_id], ["status", "opstatus", "opresult"]) + jobs = cbs.QueryJobs([job_id], ["status", "opstatus", "opresult"]) if not jobs: raise errors.JobLost("Job with id %s lost" % job_id) status, opstatus, result = jobs[0] + if status == constants.JOB_STATUS_SUCCESS: return result - elif status in (constants.JOB_STATUS_CANCELING, - constants.JOB_STATUS_CANCELED): + + if status in (constants.JOB_STATUS_CANCELING, constants.JOB_STATUS_CANCELED): raise errors.OpExecError("Job was canceled") + + has_ok = False + for idx, (status, msg) in enumerate(zip(opstatus, result)): + if status == constants.OP_STATUS_SUCCESS: + has_ok = True + elif status == constants.OP_STATUS_ERROR: + errors.MaybeRaise(msg) + + if has_ok: + raise errors.OpExecError("partial failure (opcode %d): %s" % + (idx, msg)) + + raise errors.OpExecError(str(msg)) + + # default failure mode + raise errors.OpExecError(result) + + +class JobPollCbBase: + """Base class for L{GenericPollJob} callbacks. + + """ + def __init__(self): + """Initializes this class. + + """ + + def WaitForJobChangeOnce(self, job_id, fields, + prev_job_info, prev_log_serial): + """Waits for changes on a job. + + """ + raise NotImplementedError() + + def QueryJobs(self, job_ids, fields): + """Returns the selected fields for the selected job IDs. + + @type job_ids: list of numbers + @param job_ids: Job IDs + @type fields: list of strings + @param fields: Fields + + """ + raise NotImplementedError() + + +class JobPollReportCbBase: + """Base class for L{GenericPollJob} reporting callbacks. + + """ + def __init__(self): + """Initializes this class. + + """ + + def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg): + """Handles a log message. + + """ + raise NotImplementedError() + + def ReportNotChanged(self, job_id, status): + """Called for if a job hasn't changed in a while. + + @type job_id: number + @param job_id: Job ID + @type status: string or None + @param status: Job status if available + + """ + raise NotImplementedError() + + +class _LuxiJobPollCb(JobPollCbBase): + def __init__(self, cl): + """Initializes this class. + + """ + JobPollCbBase.__init__(self) + self.cl = cl + + def WaitForJobChangeOnce(self, job_id, fields, + prev_job_info, prev_log_serial): + """Waits for changes on a job. + + """ + return self.cl.WaitForJobChangeOnce(job_id, fields, + prev_job_info, prev_log_serial) + + def QueryJobs(self, job_ids, fields): + """Returns the selected fields for the selected job IDs. + + """ + return self.cl.QueryJobs(job_ids, fields) + + +class FeedbackFnJobPollReportCb(JobPollReportCbBase): + def __init__(self, feedback_fn): + """Initializes this class. + + """ + JobPollReportCbBase.__init__(self) + + self.feedback_fn = feedback_fn + + assert callable(feedback_fn) + + def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg): + """Handles a log message. + + """ + self.feedback_fn((timestamp, log_type, log_msg)) + + def ReportNotChanged(self, job_id, status): + """Called if a job hasn't changed in a while. + + """ + # Ignore + + +class StdioJobPollReportCb(JobPollReportCbBase): + def __init__(self): + """Initializes this class. + + """ + JobPollReportCbBase.__init__(self) + + self.notified_queued = False + self.notified_waitlock = False + + def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg): + """Handles a log message. + + """ + ToStdout("%s %s", time.ctime(utils.MergeTime(timestamp)), + utils.SafeEncode(log_msg)) + + def ReportNotChanged(self, job_id, status): + """Called if a job hasn't changed in a while. + + """ + if status is None: + return + + if status == constants.JOB_STATUS_QUEUED and not self.notified_queued: + ToStderr("Job %s is waiting in queue", job_id) + self.notified_queued = True + + elif status == constants.JOB_STATUS_WAITLOCK and not self.notified_waitlock: + ToStderr("Job %s is trying to acquire all necessary locks", job_id) + self.notified_waitlock = True + + +def PollJob(job_id, cl=None, feedback_fn=None): + """Function to poll for the result of a job. + + @type job_id: job identified + @param job_id: the job to poll for results + @type cl: luxi.Client + @param cl: the luxi client to use for communicating with the master; + if None, a new client will be created + + """ + if cl is None: + cl = GetClient() + + if feedback_fn: + reporter = FeedbackFnJobPollReportCb(feedback_fn) else: - has_ok = False - for idx, (status, msg) in enumerate(zip(opstatus, result)): - if status == constants.OP_STATUS_SUCCESS: - has_ok = True - elif status == constants.OP_STATUS_ERROR: - errors.MaybeRaise(msg) - if has_ok: - raise errors.OpExecError("partial failure (opcode %d): %s" % - (idx, msg)) - else: - raise errors.OpExecError(str(msg)) - # default failure mode - raise errors.OpExecError(result) + reporter = StdioJobPollReportCb() + + return GenericPollJob(job_id, _LuxiJobPollCb(cl), reporter) def SubmitOpCode(op, cl=None, feedback_fn=None, opts=None): diff --git a/lib/cmdlib.py b/lib/cmdlib.py index c0755c0763c01079be7fc716010a589be79c823f..a168f57baa8cfc3e9563328b5ee8dafa2f96a142 100644 --- a/lib/cmdlib.py +++ b/lib/cmdlib.py @@ -1840,7 +1840,7 @@ class LUVerifyCluster(LogicalUnit): feedback_fn("* Verifying orphan volumes") self._VerifyOrphanVolumes(node_vol_should, node_image) - feedback_fn("* Verifying oprhan instances") + feedback_fn("* Verifying orphan instances") self._VerifyOrphanInstances(instancelist, node_image) if constants.VERIFY_NPLUSONE_MEM not in self.skip_set: diff --git a/lib/luxi.py b/lib/luxi.py index 1cee564fced37fc795d04c00534b9d63f1b37893..8a9c4ff4090e6c0301be9d552fd87cc453753caa 100644 --- a/lib/luxi.py +++ b/lib/luxi.py @@ -65,6 +65,9 @@ REQ_SET_WATCHER_PAUSE = "SetWatcherPause" DEF_CTMO = 10 DEF_RWTO = 60 +# WaitForJobChange timeout +WFJC_TIMEOUT = (DEF_RWTO - 1) / 2 + class ProtocolError(errors.GenericError): """Denotes an error in the LUXI protocol""" @@ -434,11 +437,27 @@ class Client(object): return self.CallMethod(REQ_AUTOARCHIVE_JOBS, (age, timeout)) def WaitForJobChangeOnce(self, job_id, fields, - prev_job_info, prev_log_serial): - timeout = (DEF_RWTO - 1) / 2 + prev_job_info, prev_log_serial, + timeout=WFJC_TIMEOUT): + """Waits for changes on a job. + + @param job_id: Job ID + @type fields: list + @param fields: List of field names to be observed + @type prev_job_info: None or list + @param prev_job_info: Previously received job information + @type prev_log_serial: None or int/long + @param prev_log_serial: Highest log serial number previously received + @type timeout: int/float + @param timeout: Timeout in seconds (values larger than L{WFJC_TIMEOUT} will + be capped to that value) + + """ + assert timeout >= 0, "Timeout can not be negative" return self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE, (job_id, fields, prev_job_info, - prev_log_serial, timeout)) + prev_log_serial, + min(WFJC_TIMEOUT, timeout))) def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial): while True: diff --git a/lib/rapi/client.py b/lib/rapi/client.py index bacd64a700993a03a834d84e5511cee214517d5d..ce1ba11469fd22d0ed5927bb703dbb92bde60c2c 100644 --- a/lib/rapi/client.py +++ b/lib/rapi/client.py @@ -22,30 +22,35 @@ """Ganeti RAPI client.""" import httplib -import httplib2 +import urllib2 +import logging import simplejson import socket import urllib -from OpenSSL import SSL -from OpenSSL import crypto +import OpenSSL +import distutils.version +GANETI_RAPI_PORT = 5080 +GANETI_RAPI_VERSION = 2 + HTTP_DELETE = "DELETE" HTTP_GET = "GET" HTTP_PUT = "PUT" HTTP_POST = "POST" +HTTP_OK = 200 +HTTP_APP_JSON = "application/json" + REPLACE_DISK_PRI = "replace_on_primary" REPLACE_DISK_SECONDARY = "replace_on_secondary" REPLACE_DISK_CHG = "replace_new_secondary" REPLACE_DISK_AUTO = "replace_auto" -VALID_REPLACEMENT_MODES = frozenset([ - REPLACE_DISK_PRI, REPLACE_DISK_SECONDARY, REPLACE_DISK_CHG, - REPLACE_DISK_AUTO - ]) -VALID_NODE_ROLES = frozenset([ - "drained", "master", "master-candidate", "offline", "regular" - ]) -VALID_STORAGE_TYPES = frozenset(["file", "lvm-pv", "lvm-vg"]) + +NODE_ROLE_DRAINED = "drained" +NODE_ROLE_MASTER_CANDIATE = "master-candidate" +NODE_ROLE_MASTER = "master" +NODE_ROLE_OFFLINE = "offline" +NODE_ROLE_REGULAR = "regular" class Error(Exception): @@ -66,119 +71,311 @@ class GanetiApiError(Error): """Generic error raised from Ganeti API. """ - pass + def __init__(self, msg, code=None): + Error.__init__(self, msg) + self.code = code + +def FormatX509Name(x509_name): + """Formats an X509 name. -class InvalidReplacementMode(Error): - """Raised when an invalid disk replacement mode is attempted. + @type x509_name: OpenSSL.crypto.X509Name """ - pass + try: + # Only supported in pyOpenSSL 0.7 and above + get_components_fn = x509_name.get_components + except AttributeError: + return repr(x509_name) + else: + return "".join("/%s=%s" % (name, value) + for name, value in get_components_fn()) + +class CertAuthorityVerify: + """Certificate verificator for SSL context. -class InvalidStorageType(Error): - """Raised when an invalid storage type is used. + Configures SSL context to verify server's certificate. """ - pass + _CAPATH_MINVERSION = "0.9" + _DEFVFYPATHS_MINVERSION = "0.9" + + _PYOPENSSL_VERSION = OpenSSL.__version__ + _PARSED_PYOPENSSL_VERSION = distutils.version.LooseVersion(_PYOPENSSL_VERSION) + + _SUPPORT_CAPATH = (_PARSED_PYOPENSSL_VERSION >= _CAPATH_MINVERSION) + _SUPPORT_DEFVFYPATHS = (_PARSED_PYOPENSSL_VERSION >= _DEFVFYPATHS_MINVERSION) + + def __init__(self, cafile=None, capath=None, use_default_verify_paths=False): + """Initializes this class. + + @type cafile: string + @param cafile: In which file we can find the certificates + @type capath: string + @param capath: In which directory we can find the certificates + @type use_default_verify_paths: bool + @param use_default_verify_paths: Whether the platform provided CA + certificates are to be used for + verification purposes + + """ + self._cafile = cafile + self._capath = capath + self._use_default_verify_paths = use_default_verify_paths + + if self._capath is not None and not self._SUPPORT_CAPATH: + raise Error(("PyOpenSSL %s has no support for a CA directory," + " version %s or above is required") % + (self._PYOPENSSL_VERSION, self._CAPATH_MINVERSION)) + + if self._use_default_verify_paths and not self._SUPPORT_DEFVFYPATHS: + raise Error(("PyOpenSSL %s has no support for using default verification" + " paths, version %s or above is required") % + (self._PYOPENSSL_VERSION, self._DEFVFYPATHS_MINVERSION)) + + @staticmethod + def _VerifySslCertCb(logger, _, cert, errnum, errdepth, ok): + """Callback for SSL certificate verification. + + @param logger: Logging object + + """ + if ok: + log_fn = logger.debug + else: + log_fn = logger.error + + log_fn("Verifying SSL certificate at depth %s, subject '%s', issuer '%s'", + errdepth, FormatX509Name(cert.get_subject()), + FormatX509Name(cert.get_issuer())) + + if not ok: + try: + # Only supported in pyOpenSSL 0.7 and above + # pylint: disable-msg=E1101 + fn = OpenSSL.crypto.X509_verify_cert_error_string + except AttributeError: + errmsg = "" + else: + errmsg = ":%s" % fn(errnum) + + logger.error("verify error:num=%s%s", errnum, errmsg) + + return ok + def __call__(self, ctx, logger): + """Configures an SSL context to verify certificates. + + @type ctx: OpenSSL.SSL.Context + @param ctx: SSL context + + """ + if self._use_default_verify_paths: + ctx.set_default_verify_paths() + + if self._cafile or self._capath: + if self._SUPPORT_CAPATH: + ctx.load_verify_locations(self._cafile, self._capath) + else: + ctx.load_verify_locations(self._cafile) -class InvalidNodeRole(Error): - """Raised when an invalid node role is used. + ctx.set_verify(OpenSSL.SSL.VERIFY_PEER, + lambda conn, cert, errnum, errdepth, ok: \ + self._VerifySslCertCb(logger, conn, cert, + errnum, errdepth, ok)) + + +class _HTTPSConnectionOpenSSL(httplib.HTTPSConnection): + """HTTPS Connection handler that verifies the SSL certificate. """ - pass + def __init__(self, *args, **kwargs): + """Initializes this class. + + """ + httplib.HTTPSConnection.__init__(self, *args, **kwargs) + self._logger = None + self._config_ssl_verification = None + + def Setup(self, logger, config_ssl_verification): + """Sets the SSL verification config function. + + @param logger: Logging object + @type config_ssl_verification: callable + + """ + assert self._logger is None + assert self._config_ssl_verification is None + + self._logger = logger + self._config_ssl_verification = config_ssl_verification + + def connect(self): + """Connect to the server specified when the object was created. + + This ensures that SSL certificates are verified. + + """ + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + + ctx = OpenSSL.SSL.Context(OpenSSL.SSL.TLSv1_METHOD) + ctx.set_options(OpenSSL.SSL.OP_NO_SSLv2) + + if self._config_ssl_verification: + self._config_ssl_verification(ctx, self._logger) + + ssl = OpenSSL.SSL.Connection(ctx, sock) + ssl.connect((self.host, self.port)) + + self.sock = httplib.FakeSocket(sock, ssl) + + +class _HTTPSHandler(urllib2.HTTPSHandler): + def __init__(self, logger, config_ssl_verification): + """Initializes this class. + + @param logger: Logging object + @type config_ssl_verification: callable + @param config_ssl_verification: Function to configure SSL context for + certificate verification + + """ + urllib2.HTTPSHandler.__init__(self) + self._logger = logger + self._config_ssl_verification = config_ssl_verification + + def _CreateHttpsConnection(self, *args, **kwargs): + """Wrapper around L{_HTTPSConnectionOpenSSL} to add SSL verification. + + This wrapper is necessary provide a compatible API to urllib2. + + """ + conn = _HTTPSConnectionOpenSSL(*args, **kwargs) + conn.Setup(self._logger, self._config_ssl_verification) + return conn + + def https_open(self, req): + """Creates HTTPS connection. + + Called by urllib2. + + """ + return self.do_open(self._CreateHttpsConnection, req) + + +class _RapiRequest(urllib2.Request): + def __init__(self, method, url, headers, data): + """Initializes this class. + + """ + urllib2.Request.__init__(self, url, data=data, headers=headers) + self._method = method + + def get_method(self): + """Returns the HTTP request method. + + """ + return self._method class GanetiRapiClient(object): """Ganeti RAPI client. """ - USER_AGENT = "Ganeti RAPI Client" + _json_encoder = simplejson.JSONEncoder(sort_keys=True) - def __init__(self, master_hostname, port=5080, username=None, password=None, - ssl_cert_file=None): + def __init__(self, host, port=GANETI_RAPI_PORT, + username=None, password=None, + config_ssl_verification=None, ignore_proxy=False, + logger=logging): """Constructor. - @type master_hostname: str - @param master_hostname: the ganeti cluster master to interact with + @type host: string + @param host: the ganeti cluster master to interact with @type port: int - @param port: the port on which the RAPI is running. (default is 5080) - @type username: str + @param port: the port on which the RAPI is running (default is 5080) + @type username: string @param username: the username to connect with - @type password: str + @type password: string @param password: the password to connect with - @type ssl_cert_file: str or None - @param ssl_cert_file: path to the expected SSL certificate. if None, SSL - certificate will not be verified + @type config_ssl_verification: callable + @param config_ssl_verification: Function to configure SSL context for + certificate verification + @type ignore_proxy: bool + @param ignore_proxy: Whether to ignore proxy settings + @param logger: Logging object """ - self._master_hostname = master_hostname + self._host = host self._port = port + self._logger = logger - self._version = None - self._http = httplib2.Http() + self._base_url = "https://%s:%s" % (host, port) - # Older versions of httplib2 don't support the connection_type argument - # to request(), so we have to manually specify the connection object in the - # internal dict. - base_url = self._MakeUrl("/", prepend_version=False) - scheme, authority, _, _, _ = httplib2.parse_uri(base_url) - conn_key = "%s:%s" % (scheme, authority) - self._http.connections[conn_key] = \ - HTTPSConnectionOpenSSL(master_hostname, port, cert_file=ssl_cert_file) + handlers = [_HTTPSHandler(self._logger, config_ssl_verification)] - self._headers = { - "Accept": "text/plain", - "Content-type": "application/x-www-form-urlencoded", - "User-Agent": self.USER_AGENT} + if username is not None: + pwmgr = urllib2.HTTPPasswordMgrWithDefaultRealm() + pwmgr.add_password(None, self._base_url, username, password) + handlers.append(urllib2.HTTPBasicAuthHandler(pwmgr)) + elif password: + raise Error("Specified password without username") - if username is not None and password is not None: - self._http.add_credentials(username, password) + if ignore_proxy: + handlers.append(urllib2.ProxyHandler({})) - def _MakeUrl(self, path, query=None, prepend_version=True): - """Constructs the URL to pass to the HTTP client. + self._http = urllib2.build_opener(*handlers) # pylint: disable-msg=W0142 - @type path: str - @param path: HTTP URL path - @type query: list of two-tuples - @param query: query arguments to pass to urllib.urlencode - @type prepend_version: bool - @param prepend_version: whether to automatically fetch and prepend the - Ganeti RAPI version to the URL path + self._headers = { + "Accept": HTTP_APP_JSON, + "Content-type": HTTP_APP_JSON, + "User-Agent": self.USER_AGENT, + } + + @staticmethod + def _EncodeQuery(query): + """Encode query values for RAPI URL. - @rtype: str - @return: URL path + @type query: list of two-tuples + @param query: Query arguments + @rtype: list + @return: Query list with encoded values """ - if prepend_version: - path = "/%d%s" % (self.GetVersion(), path) + result = [] + + for name, value in query: + if value is None: + result.append((name, "")) + + elif isinstance(value, bool): + # Boolean values must be encoded as 0 or 1 + result.append((name, int(value))) - return "https://%(host)s:%(port)d%(path)s?%(query)s" % { - "host": self._master_hostname, - "port": self._port, - "path": path, - "query": urllib.urlencode(query or [])} + elif isinstance(value, (list, tuple, dict)): + raise ValueError("Invalid query data type %r" % type(value).__name__) - def _SendRequest(self, method, path, query=None, content=None, - prepend_version=True): + else: + result.append((name, value)) + + return result + + def _SendRequest(self, method, path, query, content): """Sends an HTTP request. This constructs a full URL, encodes and decodes HTTP bodies, and handles invalid responses in a pythonic way. - @type method: str + @type method: string @param method: HTTP method to use - @type path: str + @type path: string @param path: HTTP URL path @type query: list of two-tuples @param query: query arguments to pass to urllib.urlencode @type content: str or None @param content: HTTP body content - @type prepend_version: bool - @param prepend_version: whether to automatically fetch and prepend the - Ganeti RAPI version to the URL path @rtype: str @return: JSON-Decoded response @@ -187,30 +384,45 @@ class GanetiRapiClient(object): @raises GanetiApiError: If an invalid response is returned """ + assert path.startswith("/") + if content: - content = simplejson.JSONEncoder(sort_keys=True).encode(content) + encoded_content = self._json_encoder.encode(content) + else: + encoded_content = None + + # Build URL + url = [self._base_url, path] + if query: + url.append("?") + url.append(urllib.urlencode(self._EncodeQuery(query))) + + req = _RapiRequest(method, "".join(url), self._headers, encoded_content) - url = self._MakeUrl(path, query, prepend_version) try: - resp_headers, resp_content = self._http.request(url, method, - body=content, headers=self._headers) - except (crypto.Error, SSL.Error): - raise CertificateError("Invalid SSL certificate.") + resp = self._http.open(req) + encoded_response_content = resp.read() + except (OpenSSL.SSL.Error, OpenSSL.crypto.Error), err: + raise CertificateError("SSL issue: %r" % err) - if resp_content: - resp_content = simplejson.loads(resp_content) + if encoded_response_content: + response_content = simplejson.loads(encoded_response_content) + else: + response_content = None # TODO: Are there other status codes that are valid? (redirect?) - if resp_headers.status != 200: - if isinstance(resp_content, dict): + if resp.code != HTTP_OK: + if isinstance(response_content, dict): msg = ("%s %s: %s" % - (resp_content["code"], resp_content["message"], - resp_content["explain"])) + (response_content["code"], + response_content["message"], + response_content["explain"])) else: - msg = resp_content - raise GanetiApiError(msg) + msg = str(response_content) + + raise GanetiApiError(msg, code=resp.code) - return resp_content + return response_content def GetVersion(self): """Gets the Remote API version running on the cluster. @@ -219,10 +431,7 @@ class GanetiRapiClient(object): @return: Ganeti Remote API version """ - if self._version is None: - self._version = self._SendRequest(HTTP_GET, "/version", - prepend_version=False) - return self._version + return self._SendRequest(HTTP_GET, "/version", None, None) def GetOperatingSystems(self): """Gets the Operating Systems running in the Ganeti cluster. @@ -231,7 +440,8 @@ class GanetiRapiClient(object): @return: operating systems """ - return self._SendRequest(HTTP_GET, "/os") + return self._SendRequest(HTTP_GET, "/%s/os" % GANETI_RAPI_VERSION, + None, None) def GetInfo(self): """Gets info about the cluster. @@ -240,7 +450,8 @@ class GanetiRapiClient(object): @return: information about the cluster """ - return self._SendRequest(HTTP_GET, "/info") + return self._SendRequest(HTTP_GET, "/%s/info" % GANETI_RAPI_VERSION, + None, None) def GetClusterTags(self): """Gets the cluster tags. @@ -249,7 +460,8 @@ class GanetiRapiClient(object): @return: cluster tags """ - return self._SendRequest(HTTP_GET, "/tags") + return self._SendRequest(HTTP_GET, "/%s/tags" % GANETI_RAPI_VERSION, + None, None) def AddClusterTags(self, tags, dry_run=False): """Adds tags to the cluster. @@ -267,7 +479,8 @@ class GanetiRapiClient(object): if dry_run: query.append(("dry-run", 1)) - return self._SendRequest(HTTP_PUT, "/tags", query) + return self._SendRequest(HTTP_PUT, "/%s/tags" % GANETI_RAPI_VERSION, + query, None) def DeleteClusterTags(self, tags, dry_run=False): """Deletes tags from the cluster. @@ -282,7 +495,8 @@ class GanetiRapiClient(object): if dry_run: query.append(("dry-run", 1)) - self._SendRequest(HTTP_DELETE, "/tags", query) + return self._SendRequest(HTTP_DELETE, "/%s/tags" % GANETI_RAPI_VERSION, + query, None) def GetInstances(self, bulk=False): """Gets information about instances on the cluster. @@ -298,13 +512,14 @@ class GanetiRapiClient(object): if bulk: query.append(("bulk", 1)) - instances = self._SendRequest(HTTP_GET, "/instances", query) + instances = self._SendRequest(HTTP_GET, + "/%s/instances" % GANETI_RAPI_VERSION, + query, None) if bulk: return instances else: return [i["id"] for i in instances] - def GetInstanceInfo(self, instance): """Gets information about an instance. @@ -315,7 +530,9 @@ class GanetiRapiClient(object): @return: info about the instance """ - return self._SendRequest(HTTP_GET, "/instances/%s" % instance) + return self._SendRequest(HTTP_GET, + ("/%s/instances/%s" % + (GANETI_RAPI_VERSION, instance)), None, None) def CreateInstance(self, dry_run=False): """Creates a new instance. @@ -332,7 +549,8 @@ class GanetiRapiClient(object): if dry_run: query.append(("dry-run", 1)) - return self._SendRequest(HTTP_POST, "/instances", query) + return self._SendRequest(HTTP_POST, "/%s/instances" % GANETI_RAPI_VERSION, + query, None) def DeleteInstance(self, instance, dry_run=False): """Deletes an instance. @@ -348,7 +566,9 @@ class GanetiRapiClient(object): if dry_run: query.append(("dry-run", 1)) - return self._SendRequest(HTTP_DELETE, "/instances/%s" % instance, query) + return self._SendRequest(HTTP_DELETE, + ("/%s/instances/%s" % + (GANETI_RAPI_VERSION, instance)), query, None) def GetInstanceTags(self, instance): """Gets tags for an instance. @@ -360,7 +580,9 @@ class GanetiRapiClient(object): @return: tags for the instance """ - return self._SendRequest(HTTP_GET, "/instances/%s/tags" % instance) + return self._SendRequest(HTTP_GET, + ("/%s/instances/%s/tags" % + (GANETI_RAPI_VERSION, instance)), None, None) def AddInstanceTags(self, instance, tags, dry_run=False): """Adds tags to an instance. @@ -380,7 +602,9 @@ class GanetiRapiClient(object): if dry_run: query.append(("dry-run", 1)) - return self._SendRequest(HTTP_PUT, "/instances/%s/tags" % instance, query) + return self._SendRequest(HTTP_PUT, + ("/%s/instances/%s/tags" % + (GANETI_RAPI_VERSION, instance)), query, None) def DeleteInstanceTags(self, instance, tags, dry_run=False): """Deletes tags from an instance. @@ -397,7 +621,9 @@ class GanetiRapiClient(object): if dry_run: query.append(("dry-run", 1)) - self._SendRequest(HTTP_DELETE, "/instances/%s/tags" % instance, query) + return self._SendRequest(HTTP_DELETE, + ("/%s/instances/%s/tags" % + (GANETI_RAPI_VERSION, instance)), query, None) def RebootInstance(self, instance, reboot_type=None, ignore_secondaries=None, dry_run=False): @@ -422,7 +648,9 @@ class GanetiRapiClient(object): if dry_run: query.append(("dry-run", 1)) - self._SendRequest(HTTP_POST, "/instances/%s/reboot" % instance, query) + return self._SendRequest(HTTP_POST, + ("/%s/instances/%s/reboot" % + (GANETI_RAPI_VERSION, instance)), query, None) def ShutdownInstance(self, instance, dry_run=False): """Shuts down an instance. @@ -437,7 +665,9 @@ class GanetiRapiClient(object): if dry_run: query.append(("dry-run", 1)) - self._SendRequest(HTTP_PUT, "/instances/%s/shutdown" % instance, query) + return self._SendRequest(HTTP_PUT, + ("/%s/instances/%s/shutdown" % + (GANETI_RAPI_VERSION, instance)), query, None) def StartupInstance(self, instance, dry_run=False): """Starts up an instance. @@ -452,7 +682,9 @@ class GanetiRapiClient(object): if dry_run: query.append(("dry-run", 1)) - self._SendRequest(HTTP_PUT, "/instances/%s/startup" % instance, query) + return self._SendRequest(HTTP_PUT, + ("/%s/instances/%s/startup" % + (GANETI_RAPI_VERSION, instance)), query, None) def ReinstallInstance(self, instance, os, no_startup=False): """Reinstalls an instance. @@ -468,53 +700,52 @@ class GanetiRapiClient(object): query = [("os", os)] if no_startup: query.append(("nostartup", 1)) - self._SendRequest(HTTP_POST, "/instances/%s/reinstall" % instance, query) + return self._SendRequest(HTTP_POST, + ("/%s/instances/%s/reinstall" % + (GANETI_RAPI_VERSION, instance)), query, None) - def ReplaceInstanceDisks(self, instance, disks, mode="replace_auto", - remote_node=None, iallocator="hail", dry_run=False): + def ReplaceInstanceDisks(self, instance, disks=None, mode=REPLACE_DISK_AUTO, + remote_node=None, iallocator=None, dry_run=False): """Replaces disks on an instance. @type instance: str @param instance: instance whose disks to replace - @type disks: list of str - @param disks: disks to replace + @type disks: list of ints + @param disks: Indexes of disks to replace @type mode: str - @param mode: replacement mode to use. defaults to replace_auto + @param mode: replacement mode to use (defaults to replace_auto) @type remote_node: str or None @param remote_node: new secondary node to use (for use with - replace_new_secondary mdoe) + replace_new_secondary mode) @type iallocator: str or None @param iallocator: instance allocator plugin to use (for use with - replace_auto mdoe). default is hail + replace_auto mode) @type dry_run: bool @param dry_run: whether to perform a dry run @rtype: int @return: job id - @raises InvalidReplacementMode: If an invalid disk replacement mode is given - @raises GanetiApiError: If no secondary node is given with a non-auto - replacement mode is requested. - """ - if mode not in VALID_REPLACEMENT_MODES: - raise InvalidReplacementMode("%s is not a valid disk replacement mode.", - mode) + query = [ + ("mode", mode), + ] - query = [("mode", mode), ("disks", ",".join(disks))] + if disks: + query.append(("disks", ",".join(str(idx) for idx in disks))) - if mode is REPLACE_DISK_AUTO: - query.append(("iallocator", iallocator)) - elif mode is REPLACE_DISK_SECONDARY: - if remote_node is None: - raise GanetiApiError("You must supply a new secondary node.") + if remote_node: query.append(("remote_node", remote_node)) + if iallocator: + query.append(("iallocator", iallocator)) + if dry_run: query.append(("dry-run", 1)) return self._SendRequest(HTTP_POST, - "/instances/%s/replace-disks" % instance, query) + ("/%s/instances/%s/replace-disks" % + (GANETI_RAPI_VERSION, instance)), query, None) def GetJobs(self): """Gets all jobs for the cluster. @@ -523,7 +754,10 @@ class GanetiRapiClient(object): @return: job ids for the cluster """ - return [int(j["id"]) for j in self._SendRequest(HTTP_GET, "/jobs")] + return [int(j["id"]) + for j in self._SendRequest(HTTP_GET, + "/%s/jobs" % GANETI_RAPI_VERSION, + None, None)] def GetJobStatus(self, job_id): """Gets the status of a job. @@ -535,10 +769,29 @@ class GanetiRapiClient(object): @return: job status """ - return self._SendRequest(HTTP_GET, "/jobs/%d" % job_id) + return self._SendRequest(HTTP_GET, + "/%s/jobs/%s" % (GANETI_RAPI_VERSION, job_id), + None, None) - def DeleteJob(self, job_id, dry_run=False): - """Deletes a job. + def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial): + """Waits for job changes. + + @type job_id: int + @param job_id: Job ID for which to wait + + """ + body = { + "fields": fields, + "previous_job_info": prev_job_info, + "previous_log_serial": prev_log_serial, + } + + return self._SendRequest(HTTP_GET, + "/%s/jobs/%s/wait" % (GANETI_RAPI_VERSION, job_id), + None, body) + + def CancelJob(self, job_id, dry_run=False): + """Cancels a job. @type job_id: int @param job_id: id of the job to delete @@ -550,7 +803,9 @@ class GanetiRapiClient(object): if dry_run: query.append(("dry-run", 1)) - self._SendRequest(HTTP_DELETE, "/jobs/%d" % job_id, query) + return self._SendRequest(HTTP_DELETE, + "/%s/jobs/%s" % (GANETI_RAPI_VERSION, job_id), + query, None) def GetNodes(self, bulk=False): """Gets all nodes in the cluster. @@ -567,7 +822,8 @@ class GanetiRapiClient(object): if bulk: query.append(("bulk", 1)) - nodes = self._SendRequest(HTTP_GET, "/nodes", query) + nodes = self._SendRequest(HTTP_GET, "/%s/nodes" % GANETI_RAPI_VERSION, + query, None) if bulk: return nodes else: @@ -583,7 +839,9 @@ class GanetiRapiClient(object): @return: info about the node """ - return self._SendRequest(HTTP_GET, "/nodes/%s" % node) + return self._SendRequest(HTTP_GET, + "/%s/nodes/%s" % (GANETI_RAPI_VERSION, node), + None, None) def EvacuateNode(self, node, iallocator=None, remote_node=None, dry_run=False): @@ -604,10 +862,10 @@ class GanetiRapiClient(object): @raises GanetiApiError: if an iallocator and remote_node are both specified """ - query = [] if iallocator and remote_node: - raise GanetiApiError("Only one of iallocator or remote_node can be used.") + raise GanetiApiError("Only one of iallocator or remote_node can be used") + query = [] if iallocator: query.append(("iallocator", iallocator)) if remote_node: @@ -615,7 +873,9 @@ class GanetiRapiClient(object): if dry_run: query.append(("dry-run", 1)) - return self._SendRequest(HTTP_POST, "/nodes/%s/evacuate" % node, query) + return self._SendRequest(HTTP_POST, + ("/%s/nodes/%s/evacuate" % + (GANETI_RAPI_VERSION, node)), query, None) def MigrateNode(self, node, live=True, dry_run=False): """Migrates all primary instances from a node. @@ -637,7 +897,9 @@ class GanetiRapiClient(object): if dry_run: query.append(("dry-run", 1)) - return self._SendRequest(HTTP_POST, "/nodes/%s/migrate" % node, query) + return self._SendRequest(HTTP_POST, + ("/%s/nodes/%s/migrate" % + (GANETI_RAPI_VERSION, node)), query, None) def GetNodeRole(self, node): """Gets the current role for a node. @@ -649,7 +911,9 @@ class GanetiRapiClient(object): @return: the current role for a node """ - return self._SendRequest(HTTP_GET, "/nodes/%s/role" % node) + return self._SendRequest(HTTP_GET, + ("/%s/nodes/%s/role" % + (GANETI_RAPI_VERSION, node)), None, None) def SetNodeRole(self, node, role, force=False): """Sets the role for a node. @@ -664,15 +928,14 @@ class GanetiRapiClient(object): @rtype: int @return: job id - @raise InvalidNodeRole: If an invalid node role is specified - """ - if role not in VALID_NODE_ROLES: - raise InvalidNodeRole("%s is not a valid node role.", role) + query = [ + ("force", force), + ] - query = [("force", force)] - return self._SendRequest(HTTP_PUT, "/nodes/%s/role" % node, query, - content=role) + return self._SendRequest(HTTP_PUT, + ("/%s/nodes/%s/role" % + (GANETI_RAPI_VERSION, node)), query, role) def GetNodeStorageUnits(self, node, storage_type, output_fields): """Gets the storage units for a node. @@ -687,17 +950,17 @@ class GanetiRapiClient(object): @rtype: int @return: job id where results can be retrieved - @raise InvalidStorageType: If an invalid storage type is specified - """ - # TODO: Add default for storage_type & output_fields - if storage_type not in VALID_STORAGE_TYPES: - raise InvalidStorageType("%s is an invalid storage type.", storage_type) + query = [ + ("storage_type", storage_type), + ("output_fields", output_fields), + ] - query = [("storage_type", storage_type), ("output_fields", output_fields)] - return self._SendRequest(HTTP_GET, "/nodes/%s/storage" % node, query) + return self._SendRequest(HTTP_GET, + ("/%s/nodes/%s/storage" % + (GANETI_RAPI_VERSION, node)), query, None) - def ModifyNodeStorageUnits(self, node, storage_type, name, allocatable=True): + def ModifyNodeStorageUnits(self, node, storage_type, name, allocatable=None): """Modifies parameters of storage units on the node. @type node: str @@ -706,23 +969,25 @@ class GanetiRapiClient(object): @param storage_type: storage type whose units to modify @type name: str @param name: name of the storage unit - @type allocatable: bool - @param allocatable: TODO: Document me + @type allocatable: bool or None + @param allocatable: Whether to set the "allocatable" flag on the storage + unit (None=no modification, True=set, False=unset) @rtype: int @return: job id - @raise InvalidStorageType: If an invalid storage type is specified - """ - if storage_type not in VALID_STORAGE_TYPES: - raise InvalidStorageType("%s is an invalid storage type.", storage_type) - query = [ - ("storage_type", storage_type), ("name", name), - ("allocatable", allocatable) - ] - return self._SendRequest(HTTP_PUT, "/nodes/%s/storage/modify" % node, query) + ("storage_type", storage_type), + ("name", name), + ] + + if allocatable is not None: + query.append(("allocatable", allocatable)) + + return self._SendRequest(HTTP_PUT, + ("/%s/nodes/%s/storage/modify" % + (GANETI_RAPI_VERSION, node)), query, None) def RepairNodeStorageUnits(self, node, storage_type, name): """Repairs a storage unit on the node. @@ -737,14 +1002,15 @@ class GanetiRapiClient(object): @rtype: int @return: job id - @raise InvalidStorageType: If an invalid storage type is specified - """ - if storage_type not in VALID_STORAGE_TYPES: - raise InvalidStorageType("%s is an invalid storage type.", storage_type) + query = [ + ("storage_type", storage_type), + ("name", name), + ] - query = [("storage_type", storage_type), ("name", name)] - return self._SendRequest(HTTP_PUT, "/nodes/%s/storage/repair" % node, query) + return self._SendRequest(HTTP_PUT, + ("/%s/nodes/%s/storage/repair" % + (GANETI_RAPI_VERSION, node)), query, None) def GetNodeTags(self, node): """Gets the tags for a node. @@ -756,7 +1022,9 @@ class GanetiRapiClient(object): @return: tags for the node """ - return self._SendRequest(HTTP_GET, "/nodes/%s/tags" % node) + return self._SendRequest(HTTP_GET, + ("/%s/nodes/%s/tags" % + (GANETI_RAPI_VERSION, node)), None, None) def AddNodeTags(self, node, tags, dry_run=False): """Adds tags to a node. @@ -776,8 +1044,9 @@ class GanetiRapiClient(object): if dry_run: query.append(("dry-run", 1)) - return self._SendRequest(HTTP_PUT, "/nodes/%s/tags" % node, query, - content=tags) + return self._SendRequest(HTTP_PUT, + ("/%s/nodes/%s/tags" % + (GANETI_RAPI_VERSION, node)), query, tags) def DeleteNodeTags(self, node, tags, dry_run=False): """Delete tags from a node. @@ -797,48 +1066,6 @@ class GanetiRapiClient(object): if dry_run: query.append(("dry-run", 1)) - return self._SendRequest(HTTP_DELETE, "/nodes/%s/tags" % node, query) - - -class HTTPSConnectionOpenSSL(httplib.HTTPSConnection): - """HTTPS Connection handler that verifies the SSL certificate. - - """ - - # pylint: disable-msg=W0142 - def __init__(self, *args, **kwargs): - """Constructor. - - """ - httplib.HTTPSConnection.__init__(self, *args, **kwargs) - - self._ssl_cert = None - if self.cert_file: - f = open(self.cert_file, "r") - self._ssl_cert = crypto.load_certificate(crypto.FILETYPE_PEM, f.read()) - f.close() - - # pylint: disable-msg=W0613 - def _VerifySSLCertCallback(self, conn, cert, errnum, errdepth, ok): - """Verifies the SSL certificate provided by the peer. - - """ - return (self._ssl_cert.digest("sha1") == cert.digest("sha1") and - self._ssl_cert.digest("md5") == cert.digest("md5")) - - def connect(self): - """Connect to the server specified when the object was created. - - This ensures that SSL certificates are verified. - - """ - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - ctx = SSL.Context(SSL.SSLv23_METHOD) - ctx.set_options(SSL.OP_NO_SSLv2) - ctx.use_certificate(self._ssl_cert) - ctx.set_verify(SSL.VERIFY_PEER | SSL.VERIFY_FAIL_IF_NO_PEER_CERT, - self._VerifySSLCertCallback) - - ssl = SSL.Connection(ctx, sock) - ssl.connect((self.host, self.port)) - self.sock = httplib.FakeSocket(sock, ssl) + return self._SendRequest(HTTP_DELETE, + ("/%s/nodes/%s/tags" % + (GANETI_RAPI_VERSION, node)), query, None) diff --git a/lib/rapi/client_utils.py b/lib/rapi/client_utils.py new file mode 100644 index 0000000000000000000000000000000000000000..51f980178ee0e50bb1c1f0ff45b13812c5b3445e --- /dev/null +++ b/lib/rapi/client_utils.py @@ -0,0 +1,98 @@ +# +# + +# Copyright (C) 2010 Google Inc. +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA +# 02110-1301, USA. + + +"""RAPI client utilities. + +""" + +from ganeti import constants +from ganeti import cli + +from ganeti.rapi import client + +# Local constant to avoid importing ganeti.http +HTTP_NOT_FOUND = 404 + + +class RapiJobPollCb(cli.JobPollCbBase): + def __init__(self, cl): + """Initializes this class. + + @param cl: RAPI client instance + + """ + cli.JobPollCbBase.__init__(self) + + self.cl = cl + + def WaitForJobChangeOnce(self, job_id, fields, + prev_job_info, prev_log_serial): + """Waits for changes on a job. + + """ + try: + result = self.cl.WaitForJobChange(job_id, fields, + prev_job_info, prev_log_serial) + except client.GanetiApiError, err: + if err.code == HTTP_NOT_FOUND: + return None + + raise + + if result is None: + return constants.JOB_NOTCHANGED + + return (result["job_info"], result["log_entries"]) + + def QueryJobs(self, job_ids, fields): + """Returns the given fields for the selected job IDs. + + @type job_ids: list of numbers + @param job_ids: Job IDs + @type fields: list of strings + @param fields: Fields + + """ + if len(job_ids) != 1: + raise NotImplementedError("Only one job supported at this time") + + try: + result = self.cl.GetJobStatus(job_ids[0]) + except client.GanetiApiError, err: + if err.code == HTTP_NOT_FOUND: + return [None] + + raise + + return [[result[name] for name in fields], ] + + +def PollJob(rapi_client, job_id, reporter): + """Function to poll for the result of a job. + + @param rapi_client: RAPI client instance + @type job_id: number + @param job_id: Job ID + @type reporter: L{cli.JobPollReportCbBase} + @param reporter: PollJob reporter instance + + """ + return cli.GenericPollJob(job_id, RapiJobPollCb(rapi_client), reporter) diff --git a/lib/rapi/connector.py b/lib/rapi/connector.py index d14d4d655d84eeb18ee111fb38effac5fbd65bac..8e81464c09976ea72229cf39187c83798ba6a11b 100644 --- a/lib/rapi/connector.py +++ b/lib/rapi/connector.py @@ -209,6 +209,8 @@ def GetHandlers(node_name_pattern, instance_name_pattern, job_id_pattern): "/2/jobs": rlib2.R_2_jobs, re.compile(r'/2/jobs/(%s)$' % job_id_pattern): rlib2.R_2_jobs_id, + re.compile(r'/2/jobs/(%s)/wait$' % job_id_pattern): + rlib2.R_2_jobs_id_wait, "/2/tags": rlib2.R_2_tags, "/2/info": rlib2.R_2_info, diff --git a/lib/rapi/rlib2.py b/lib/rapi/rlib2.py index 4c7ad4ae90e911ab2d6760c7ae9b8e05fcc1d378..d22afac8ce593e6187903880d57b7493bcbb9710 100644 --- a/lib/rapi/rlib2.py +++ b/lib/rapi/rlib2.py @@ -83,6 +83,9 @@ _NR_MAP = { "R": _NR_REGULAR, } +# Timeout for /2/jobs/[job_id]/wait. Gives job up to 10 seconds to change. +_WFJC_TIMEOUT = 10 + class R_version(baserlib.R_Generic): """/version resource. @@ -211,6 +214,55 @@ class R_2_jobs_id(baserlib.R_Generic): return result +class R_2_jobs_id_wait(baserlib.R_Generic): + """/2/jobs/[job_id]/wait resource. + + """ + # WaitForJobChange provides access to sensitive information and blocks + # machine resources (it's a blocking RAPI call), hence restricting access. + GET_ACCESS = [rapi.RAPI_ACCESS_WRITE] + + def GET(self): + """Waits for job changes. + + """ + job_id = self.items[0] + + fields = self.getBodyParameter("fields") + prev_job_info = self.getBodyParameter("previous_job_info", None) + prev_log_serial = self.getBodyParameter("previous_log_serial", None) + + if not isinstance(fields, list): + raise http.HttpBadRequest("The 'fields' parameter should be a list") + + if not (prev_job_info is None or isinstance(prev_job_info, list)): + raise http.HttpBadRequest("The 'previous_job_info' parameter should" + " be a list") + + if not (prev_log_serial is None or + isinstance(prev_log_serial, (int, long))): + raise http.HttpBadRequest("The 'previous_log_serial' parameter should" + " be a number") + + client = baserlib.GetClient() + result = client.WaitForJobChangeOnce(job_id, fields, + prev_job_info, prev_log_serial, + timeout=_WFJC_TIMEOUT) + if not result: + raise http.HttpNotFound() + + if result == constants.JOB_NOTCHANGED: + # No changes + return None + + (job_info, log_entries) = result + + return { + "job_info": job_info, + "log_entries": log_entries, + } + + class R_2_nodes(baserlib.R_Generic): """/2/nodes resource. diff --git a/lib/utils.py b/lib/utils.py index f8c7299da9e967a7881647fa49f707abd5350faa..d32d6617640b2f8cbe215119b204693e90417d6a 100644 --- a/lib/utils.py +++ b/lib/utils.py @@ -1681,21 +1681,28 @@ def EnsureDirs(dirs): raise errors.GenericError("%s is not a directory" % dir_name) -def ReadFile(file_name, size=-1): +def ReadFile(file_name, size=-1, oneline=False): """Reads a file. @type size: int @param size: Read at most size bytes (if negative, entire file) + @type oneline: bool + @param oneline: Whether to read only one line (newline char is not included) @rtype: str @return: the (possibly partial) content of the file """ f = open(file_name, "r") try: - return f.read(size) + if oneline: + data = f.readline(size).rstrip("\r\n") + else: + data = f.read(size) finally: f.close() + return data + def WriteFile(file_name, fn=None, data=None, mode=None, uid=-1, gid=-1, diff --git a/qa/ganeti-qa.py b/qa/ganeti-qa.py index 05e519ae13b46215d4751b0dbc46ecf4c7093f80..cd2b514e15ca32defd24cb6673dfc751f2f2ea51 100755 --- a/qa/ganeti-qa.py +++ b/qa/ganeti-qa.py @@ -281,10 +281,13 @@ def main(): rapi_user = "ganeti-qa" rapi_secret = utils.GenerateSecret() - qa_rapi.OpenerFactory.SetCredentials(rapi_user, rapi_secret) RunEnvTests() SetupCluster(rapi_user, rapi_secret) + + # Load RAPI certificate + qa_rapi.Setup(rapi_user, rapi_secret) + RunClusterTests() RunOsTests() diff --git a/qa/qa_cluster.py b/qa/qa_cluster.py index 0e557dd928da9c8d5351b93a080a0c8154ba905c..7c0de1976e73f2f1c707f388a7951ddc2fbd629e 100644 --- a/qa/qa_cluster.py +++ b/qa/qa_cluster.py @@ -185,49 +185,62 @@ def TestClusterRenewCrypto(): AssertNotEqual(StartSSH(master["primary"], utils.ShellQuoteArgs(cmd)).wait(), 0) - # Custom RAPI certificate - fh = tempfile.NamedTemporaryFile() + rapi_cert_backup = qa_utils.BackupFile(master["primary"], + constants.RAPI_CERT_FILE) + try: + # Custom RAPI certificate + fh = tempfile.NamedTemporaryFile() - # Ensure certificate doesn't cause "gnt-cluster verify" to complain - validity = constants.SSL_CERT_EXPIRATION_WARN * 3 + # Ensure certificate doesn't cause "gnt-cluster verify" to complain + validity = constants.SSL_CERT_EXPIRATION_WARN * 3 - utils.GenerateSelfSignedSslCert(fh.name, validity=validity) + utils.GenerateSelfSignedSslCert(fh.name, validity=validity) - tmpcert = qa_utils.UploadFile(master["primary"], fh.name) - try: + tmpcert = qa_utils.UploadFile(master["primary"], fh.name) + try: + cmd = ["gnt-cluster", "renew-crypto", "--force", + "--rapi-certificate=%s" % tmpcert] + AssertEqual(StartSSH(master["primary"], + utils.ShellQuoteArgs(cmd)).wait(), 0) + finally: + cmd = ["rm", "-f", tmpcert] + AssertEqual(StartSSH(master["primary"], + utils.ShellQuoteArgs(cmd)).wait(), 0) + + # Custom cluster domain secret + cds_fh = tempfile.NamedTemporaryFile() + cds_fh.write(utils.GenerateSecret()) + cds_fh.write("\n") + cds_fh.flush() + + tmpcds = qa_utils.UploadFile(master["primary"], cds_fh.name) + try: + cmd = ["gnt-cluster", "renew-crypto", "--force", + "--cluster-domain-secret=%s" % tmpcds] + AssertEqual(StartSSH(master["primary"], + utils.ShellQuoteArgs(cmd)).wait(), 0) + finally: + cmd = ["rm", "-f", tmpcds] + AssertEqual(StartSSH(master["primary"], + utils.ShellQuoteArgs(cmd)).wait(), 0) + + # Normal case cmd = ["gnt-cluster", "renew-crypto", "--force", - "--rapi-certificate=%s" % tmpcert] + "--new-cluster-certificate", "--new-confd-hmac-key", + "--new-rapi-certificate", "--new-cluster-domain-secret"] AssertEqual(StartSSH(master["primary"], utils.ShellQuoteArgs(cmd)).wait(), 0) - finally: - cmd = ["rm", "-f", tmpcert] - AssertEqual(StartSSH(master["primary"], - utils.ShellQuoteArgs(cmd)).wait(), 0) - - # Custom cluster domain secret - cds_fh = tempfile.NamedTemporaryFile() - cds_fh.write(utils.GenerateSecret()) - cds_fh.write("\n") - cds_fh.flush() - tmpcds = qa_utils.UploadFile(master["primary"], cds_fh.name) - try: + # Restore RAPI certificate cmd = ["gnt-cluster", "renew-crypto", "--force", - "--cluster-domain-secret=%s" % tmpcds] + "--rapi-certificate=%s" % rapi_cert_backup] AssertEqual(StartSSH(master["primary"], utils.ShellQuoteArgs(cmd)).wait(), 0) finally: - cmd = ["rm", "-f", tmpcds] + cmd = ["rm", "-f", rapi_cert_backup] AssertEqual(StartSSH(master["primary"], utils.ShellQuoteArgs(cmd)).wait(), 0) - # Normal case - cmd = ["gnt-cluster", "renew-crypto", "--force", - "--new-cluster-certificate", "--new-confd-hmac-key", - "--new-rapi-certificate", "--new-cluster-domain-secret"] - AssertEqual(StartSSH(master["primary"], - utils.ShellQuoteArgs(cmd)).wait(), 0) - def TestClusterBurnin(): """Burnin""" diff --git a/qa/qa_rapi.py b/qa/qa_rapi.py index cefed6614518f4333262ba16996381a8f1e649c3..639f62a92e58dda3faa4673f26964392c15675dc 100644 --- a/qa/qa_rapi.py +++ b/qa/qa_rapi.py @@ -22,12 +22,17 @@ """ -import urllib2 +import tempfile from ganeti import utils from ganeti import constants from ganeti import errors from ganeti import serializer +from ganeti import cli +from ganeti import rapi + +import ganeti.rapi.client +import ganeti.rapi.client_utils import qa_config import qa_utils @@ -37,58 +42,38 @@ from qa_utils import (AssertEqual, AssertNotEqual, AssertIn, AssertMatch, StartSSH) -class OpenerFactory: - """A factory singleton to construct urllib opener chain. - - This is needed because qa_config is not initialized yet at module load time - - """ - _opener = None - _rapi_user = None - _rapi_secret = None - - @classmethod - def SetCredentials(cls, rapi_user, rapi_secret): - """Set the credentials for authorized access. - - """ - cls._rapi_user = rapi_user - cls._rapi_secret = rapi_secret +_rapi_ca = None +_rapi_client = None - @classmethod - def Opener(cls): - """Construct the opener if not yet done. - """ - if not cls._opener: - if not cls._rapi_user or not cls._rapi_secret: - raise errors.ProgrammerError("SetCredentials was never called.") +def Setup(username, password): + """Configures the RAPI client. - # Create opener which doesn't try to look for proxies and does auth - master = qa_config.GetMasterNode() - host = master["primary"] - port = qa_config.get("rapi-port", default=constants.DEFAULT_RAPI_PORT) - passman = urllib2.HTTPPasswordMgrWithDefaultRealm() - passman.add_password(None, 'https://%s:%s' % (host, port), - cls._rapi_user, - cls._rapi_secret) - authhandler = urllib2.HTTPBasicAuthHandler(passman) - cls._opener = urllib2.build_opener(urllib2.ProxyHandler({}), authhandler) + """ + global _rapi_ca + global _rapi_client - return cls._opener + master = qa_config.GetMasterNode() + # Load RAPI certificate from master node + cmd = ["cat", constants.RAPI_CERT_FILE] -class RapiRequest(urllib2.Request): - """This class supports other methods beside GET/POST. + # Write to temporary file + _rapi_ca = tempfile.NamedTemporaryFile() + _rapi_ca.write(qa_utils.GetCommandOutput(master["primary"], + utils.ShellQuoteArgs(cmd))) + _rapi_ca.flush() - """ + port = qa_config.get("rapi-port", default=constants.DEFAULT_RAPI_PORT) + cfg_ssl = rapi.client.CertAuthorityVerify(cafile=_rapi_ca.name) - def __init__(self, method, url, headers, data): - urllib2.Request.__init__(self, url, data=data, headers=headers) - self._method = method + _rapi_client = rapi.client.GanetiRapiClient(master["primary"], port=port, + username=username, + password=password, + config_ssl_verification=cfg_ssl, + ignore_proxy=True) - def get_method(self): - return self._method + print "RAPI protocol version: %s" % _rapi_client.GetVersion() INSTANCE_FIELDS = ("name", "os", "pnode", "snodes", @@ -119,43 +104,12 @@ def Enabled(): def _DoTests(uris): - master = qa_config.GetMasterNode() - host = master["primary"] - port = qa_config.get("rapi-port", default=constants.DEFAULT_RAPI_PORT) results = [] for uri, verify, method, body in uris: assert uri.startswith("/") - url = "https://%s:%s%s" % (host, port, uri) - - headers = {} - - if body: - data = serializer.DumpJson(body, indent=False) - headers["Content-Type"] = "application/json" - else: - data = None - - if headers or data: - details = [] - if headers: - details.append("headers=%s" % - serializer.DumpJson(headers, indent=False).rstrip()) - if data: - details.append("data=%s" % data.rstrip()) - info = "(%s)" % (", ".join(details), ) - else: - info = "" - - print "Testing %s %s %s..." % (method, url, info) - - req = RapiRequest(method, url, headers, data) - response = OpenerFactory.Opener().open(req) - - AssertEqual(response.info()["Content-type"], "application/json") - - data = serializer.LoadJson(response.read()) + data = _rapi_client._SendRequest(method, uri, None, body) if verify is not None: if callable(verify): @@ -305,10 +259,7 @@ def _WaitForRapiJob(job_id): ("/2/jobs/%s" % job_id, _VerifyJob, "GET", None), ]) - # FIXME: Use "gnt-job watch" until RAPI supports waiting for job - cmd = ["gnt-job", "watch", str(job_id)] - AssertEqual(StartSSH(master["primary"], - utils.ShellQuoteArgs(cmd)).wait(), 0) + rapi.client_utils.PollJob(_rapi_client, job_id, cli.StdioJobPollReportCb()) def TestRapiInstanceAdd(node): diff --git a/qa/qa_utils.py b/qa/qa_utils.py index a4af6307599e462386ed7e84b27d8b60800119f0..5a2c4e7d6be64d6fd27d0f0894c802bb426af7c9 100644 --- a/qa/qa_utils.py +++ b/qa/qa_utils.py @@ -150,6 +150,7 @@ def UploadFile(node, src): Caller needs to remove the returned file on the node when it's not needed anymore. + """ # Make sure nobody else has access to it while preserving local permissions mode = os.stat(src).st_mode & 0700 @@ -171,6 +172,22 @@ def UploadFile(node, src): f.close() +def BackupFile(node, path): + """Creates a backup of a file on the node and returns the filename. + + Caller needs to remove the returned file on the node when it's not needed + anymore. + + """ + cmd = ("tmp=$(tempfile --prefix .gnt --directory=$(dirname %s)) && " + "[[ -f \"$tmp\" ]] && " + "cp %s $tmp && " + "echo $tmp") % (utils.ShellQuote(path), utils.ShellQuote(path)) + + # Return temporary filename + return GetCommandOutput(node, cmd).strip() + + def _ResolveName(cmd, key): """Helper function. diff --git a/test/ganeti.cli_unittest.py b/test/ganeti.cli_unittest.py index b768f4a03b091d5bcf795d9aded0369da8965c2f..2e5a6e3b6ff1273392e628e15927d4c031d2a09f 100755 --- a/test/ganeti.cli_unittest.py +++ b/test/ganeti.cli_unittest.py @@ -29,6 +29,8 @@ import testutils from ganeti import constants from ganeti import cli +from ganeti import errors +from ganeti import utils from ganeti.errors import OpPrereqError, ParameterError @@ -100,7 +102,7 @@ class TestIdentKeyVal(unittest.TestCase): class TestToStream(unittest.TestCase): - """Thes the ToStream functions""" + """Test the ToStream functions""" def testBasic(self): for data in ["foo", @@ -246,5 +248,175 @@ class TestGenerateTable(unittest.TestCase): None, None, "m", exp) +class _MockJobPollCb(cli.JobPollCbBase, cli.JobPollReportCbBase): + def __init__(self, tc, job_id): + self.tc = tc + self.job_id = job_id + self._wfjcr = [] + self._jobstatus = [] + self._expect_notchanged = False + self._expect_log = [] + + def CheckEmpty(self): + self.tc.assertFalse(self._wfjcr) + self.tc.assertFalse(self._jobstatus) + self.tc.assertFalse(self._expect_notchanged) + self.tc.assertFalse(self._expect_log) + + def AddWfjcResult(self, *args): + self._wfjcr.append(args) + + def AddQueryJobsResult(self, *args): + self._jobstatus.append(args) + + def WaitForJobChangeOnce(self, job_id, fields, + prev_job_info, prev_log_serial): + self.tc.assertEqual(job_id, self.job_id) + self.tc.assertEqualValues(fields, ["status"]) + self.tc.assertFalse(self._expect_notchanged) + self.tc.assertFalse(self._expect_log) + + (exp_prev_job_info, exp_prev_log_serial, result) = self._wfjcr.pop(0) + self.tc.assertEqualValues(prev_job_info, exp_prev_job_info) + self.tc.assertEqual(prev_log_serial, exp_prev_log_serial) + + if result == constants.JOB_NOTCHANGED: + self._expect_notchanged = True + elif result: + (_, logmsgs) = result + if logmsgs: + self._expect_log.extend(logmsgs) + + return result + + def QueryJobs(self, job_ids, fields): + self.tc.assertEqual(job_ids, [self.job_id]) + self.tc.assertEqualValues(fields, ["status", "opstatus", "opresult"]) + self.tc.assertFalse(self._expect_notchanged) + self.tc.assertFalse(self._expect_log) + + result = self._jobstatus.pop(0) + self.tc.assertEqual(len(fields), len(result)) + return [result] + + def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg): + self.tc.assertEqual(job_id, self.job_id) + self.tc.assertEqualValues((serial, timestamp, log_type, log_msg), + self._expect_log.pop(0)) + + def ReportNotChanged(self, job_id, status): + self.tc.assertEqual(job_id, self.job_id) + self.tc.assert_(self._expect_notchanged) + self._expect_notchanged = False + + +class TestGenericPollJob(testutils.GanetiTestCase): + def testSuccessWithLog(self): + job_id = 29609 + cbs = _MockJobPollCb(self, job_id) + + cbs.AddWfjcResult(None, None, constants.JOB_NOTCHANGED) + + cbs.AddWfjcResult(None, None, + ((constants.JOB_STATUS_QUEUED, ), None)) + + cbs.AddWfjcResult((constants.JOB_STATUS_QUEUED, ), None, + constants.JOB_NOTCHANGED) + + cbs.AddWfjcResult((constants.JOB_STATUS_QUEUED, ), None, + ((constants.JOB_STATUS_RUNNING, ), + [(1, utils.SplitTime(1273491611.0), + constants.ELOG_MESSAGE, "Step 1"), + (2, utils.SplitTime(1273491615.9), + constants.ELOG_MESSAGE, "Step 2"), + (3, utils.SplitTime(1273491625.02), + constants.ELOG_MESSAGE, "Step 3"), + (4, utils.SplitTime(1273491635.05), + constants.ELOG_MESSAGE, "Step 4"), + (37, utils.SplitTime(1273491645.0), + constants.ELOG_MESSAGE, "Step 5"), + (203, utils.SplitTime(127349155.0), + constants.ELOG_MESSAGE, "Step 6")])) + + cbs.AddWfjcResult((constants.JOB_STATUS_RUNNING, ), 203, + ((constants.JOB_STATUS_RUNNING, ), + [(300, utils.SplitTime(1273491711.01), + constants.ELOG_MESSAGE, "Step X"), + (302, utils.SplitTime(1273491815.8), + constants.ELOG_MESSAGE, "Step Y"), + (303, utils.SplitTime(1273491925.32), + constants.ELOG_MESSAGE, "Step Z")])) + + cbs.AddWfjcResult((constants.JOB_STATUS_RUNNING, ), 303, + ((constants.JOB_STATUS_SUCCESS, ), None)) + + cbs.AddQueryJobsResult(constants.JOB_STATUS_SUCCESS, + [constants.OP_STATUS_SUCCESS, + constants.OP_STATUS_SUCCESS], + ["Hello World", "Foo man bar"]) + + self.assertEqual(["Hello World", "Foo man bar"], + cli.GenericPollJob(job_id, cbs, cbs)) + cbs.CheckEmpty() + + def testJobLost(self): + job_id = 13746 + + cbs = _MockJobPollCb(self, job_id) + cbs.AddWfjcResult(None, None, constants.JOB_NOTCHANGED) + cbs.AddWfjcResult(None, None, None) + self.assertRaises(errors.JobLost, cli.GenericPollJob, job_id, cbs, cbs) + cbs.CheckEmpty() + + def testError(self): + job_id = 31088 + + cbs = _MockJobPollCb(self, job_id) + cbs.AddWfjcResult(None, None, constants.JOB_NOTCHANGED) + cbs.AddWfjcResult(None, None, ((constants.JOB_STATUS_ERROR, ), None)) + cbs.AddQueryJobsResult(constants.JOB_STATUS_ERROR, + [constants.OP_STATUS_SUCCESS, + constants.OP_STATUS_ERROR], + ["Hello World", "Error code 123"]) + self.assertRaises(errors.OpExecError, cli.GenericPollJob, job_id, cbs, cbs) + cbs.CheckEmpty() + + def testError2(self): + job_id = 22235 + + cbs = _MockJobPollCb(self, job_id) + cbs.AddWfjcResult(None, None, ((constants.JOB_STATUS_ERROR, ), None)) + encexc = errors.EncodeException(errors.LockError("problem")) + cbs.AddQueryJobsResult(constants.JOB_STATUS_ERROR, + [constants.OP_STATUS_ERROR], [encexc]) + self.assertRaises(errors.LockError, cli.GenericPollJob, job_id, cbs, cbs) + cbs.CheckEmpty() + + def testWeirdError(self): + job_id = 28847 + + cbs = _MockJobPollCb(self, job_id) + cbs.AddWfjcResult(None, None, ((constants.JOB_STATUS_ERROR, ), None)) + cbs.AddQueryJobsResult(constants.JOB_STATUS_ERROR, + [constants.OP_STATUS_RUNNING, + constants.OP_STATUS_RUNNING], + [None, None]) + self.assertRaises(errors.OpExecError, cli.GenericPollJob, job_id, cbs, cbs) + cbs.CheckEmpty() + + def testCancel(self): + job_id = 4275 + + cbs = _MockJobPollCb(self, job_id) + cbs.AddWfjcResult(None, None, constants.JOB_NOTCHANGED) + cbs.AddWfjcResult(None, None, ((constants.JOB_STATUS_CANCELING, ), None)) + cbs.AddQueryJobsResult(constants.JOB_STATUS_CANCELING, + [constants.OP_STATUS_CANCELING, + constants.OP_STATUS_CANCELING], + [None, None]) + self.assertRaises(errors.OpExecError, cli.GenericPollJob, job_id, cbs, cbs) + cbs.CheckEmpty() + + if __name__ == '__main__': testutils.GanetiTestProgram() diff --git a/test/ganeti.rapi.client_unittest.py b/test/ganeti.rapi.client_unittest.py index 2f4830ebe28ce7b93ea6821ab6739165865771cc..0d83580a9dede61bb82ee65ca9981aefcdc0b870 100755 --- a/test/ganeti.rapi.client_unittest.py +++ b/test/ganeti.rapi.client_unittest.py @@ -22,22 +22,16 @@ """Script for unittesting the RAPI client module""" -try: - import httplib2 - BaseHttp = httplib2.Http - from ganeti.rapi import client -except ImportError: - httplib2 = None - BaseHttp = object - import re import unittest import warnings from ganeti import http +from ganeti import serializer from ganeti.rapi import connector from ganeti.rapi import rlib2 +from ganeti.rapi import client import testutils @@ -56,81 +50,73 @@ def _GetPathFromUri(uri): return None -class HttpResponseMock(dict): - """Dumb mock of httplib2.Response. +class HttpResponseMock: + """Dumb mock of httplib.HTTPResponse. """ - def __init__(self, status): - self.status = status - self['status'] = status + def __init__(self, code, data): + self.code = code + self._data = data + + def read(self): + return self._data -class HttpMock(BaseHttp): - """Mock for httplib.Http. +class OpenerDirectorMock: + """Mock for urllib.OpenerDirector. """ def __init__(self, rapi): self._rapi = rapi - self._last_request = None + self.last_request = None - last_request_url = property(lambda self: self._last_request[0]) - last_request_method = property(lambda self: self._last_request[1]) - last_request_body = property(lambda self: self._last_request[2]) + def open(self, req): + self.last_request = req - def request(self, url, method, body, headers): - self._last_request = (url, method, body) - code, resp_body = self._rapi.FetchResponse(_GetPathFromUri(url), method) - return HttpResponseMock(code), resp_body + path = _GetPathFromUri(req.get_full_url()) + code, resp_body = self._rapi.FetchResponse(path, req.get_method()) + return HttpResponseMock(code, resp_body) class RapiMock(object): - def __init__(self): self._mapper = connector.Mapper() self._responses = [] self._last_handler = None - def AddResponse(self, response): - self._responses.insert(0, response) - - def PopResponse(self): - if len(self._responses) > 0: - return self._responses.pop() - else: - return None + def AddResponse(self, response, code=200): + self._responses.insert(0, (code, response)) def GetLastHandler(self): return self._last_handler def FetchResponse(self, path, method): - code = 200 - response = None - try: HandlerClass, items, args = self._mapper.getController(path) self._last_handler = HandlerClass(items, args, None) if not hasattr(self._last_handler, method.upper()): - code = 400 - response = "Bad request" + raise http.HttpNotImplemented(message="Method not implemented") + except http.HttpException, ex: code = ex.code response = ex.message + else: + if not self._responses: + raise Exception("No responses") - if not response: - response = self.PopResponse() + (code, response) = self._responses.pop() return code, response class RapiMockTest(unittest.TestCase): - def test(self): rapi = RapiMock() path = "/version" self.assertEqual((404, None), rapi.FetchResponse("/foo", "GET")) - self.assertEqual((400, "Bad request"), + self.assertEqual((501, "Method not implemented"), rapi.FetchResponse("/version", "POST")) rapi.AddResponse("2") code, response = rapi.FetchResponse("/version", "GET") @@ -139,14 +125,12 @@ class RapiMockTest(unittest.TestCase): self.failUnless(isinstance(rapi.GetLastHandler(), rlib2.R_version)) -class GanetiRapiClientTests(unittest.TestCase): - """Tests for remote API client. - - """ - +class GanetiRapiClientTests(testutils.GanetiTestCase): def setUp(self): + testutils.GanetiTestCase.setUp(self) + self.rapi = RapiMock() - self.http = HttpMock(self.rapi) + self.http = OpenerDirectorMock(self.rapi) self.client = client.GanetiRapiClient('master.foo.com') self.client._http = self.http # Hard-code the version for easier testing. @@ -167,6 +151,39 @@ class GanetiRapiClientTests(unittest.TestCase): def assertDryRun(self): self.assertTrue(self.rapi.GetLastHandler().dryRun()) + def testEncodeQuery(self): + query = [ + ("a", None), + ("b", 1), + ("c", 2), + ("d", "Foo"), + ("e", True), + ] + + expected = [ + ("a", ""), + ("b", 1), + ("c", 2), + ("d", "Foo"), + ("e", 1), + ] + + self.assertEqualValues(self.client._EncodeQuery(query), + expected) + + # invalid types + for i in [[1, 2, 3], {"moo": "boo"}, (1, 2, 3)]: + self.assertRaises(ValueError, self.client._EncodeQuery, [("x", i)]) + + def testHttpError(self): + self.rapi.AddResponse(None, code=404) + try: + self.client.GetJobStatus(15140) + except client.GanetiApiError, err: + self.assertEqual(err.code, 404) + else: + self.fail("Didn't raise exception") + def testGetVersion(self): self.client._version = None self.rapi.AddResponse("2") @@ -192,7 +209,9 @@ class GanetiRapiClientTests(unittest.TestCase): self.assertQuery("tag", ["awesome"]) def testDeleteClusterTags(self): - self.client.DeleteClusterTags(["awesome"], dry_run=True) + self.rapi.AddResponse("5107") + self.assertEqual(5107, self.client.DeleteClusterTags(["awesome"], + dry_run=True)) self.assertHandler(rlib2.R_2_tags) self.assertDryRun() self.assertQuery("tag", ["awesome"]) @@ -243,35 +262,45 @@ class GanetiRapiClientTests(unittest.TestCase): self.assertQuery("tag", ["awesome"]) def testDeleteInstanceTags(self): - self.client.DeleteInstanceTags("foo", ["awesome"], dry_run=True) + self.rapi.AddResponse("25826") + self.assertEqual(25826, self.client.DeleteInstanceTags("foo", ["awesome"], + dry_run=True)) self.assertHandler(rlib2.R_2_instances_name_tags) self.assertItems(["foo"]) self.assertDryRun() self.assertQuery("tag", ["awesome"]) def testRebootInstance(self): - self.client.RebootInstance("i-bar", reboot_type="hard", - ignore_secondaries=True, dry_run=True) + self.rapi.AddResponse("6146") + job_id = self.client.RebootInstance("i-bar", reboot_type="hard", + ignore_secondaries=True, dry_run=True) + self.assertEqual(6146, job_id) self.assertHandler(rlib2.R_2_instances_name_reboot) self.assertItems(["i-bar"]) self.assertDryRun() self.assertQuery("type", ["hard"]) - self.assertQuery("ignore_secondaries", ["True"]) + self.assertQuery("ignore_secondaries", ["1"]) def testShutdownInstance(self): - self.client.ShutdownInstance("foo-instance", dry_run=True) + self.rapi.AddResponse("1487") + self.assertEqual(1487, self.client.ShutdownInstance("foo-instance", + dry_run=True)) self.assertHandler(rlib2.R_2_instances_name_shutdown) self.assertItems(["foo-instance"]) self.assertDryRun() def testStartupInstance(self): - self.client.StartupInstance("bar-instance", dry_run=True) + self.rapi.AddResponse("27149") + self.assertEqual(27149, self.client.StartupInstance("bar-instance", + dry_run=True)) self.assertHandler(rlib2.R_2_instances_name_startup) self.assertItems(["bar-instance"]) self.assertDryRun() def testReinstallInstance(self): - self.client.ReinstallInstance("baz-instance", "DOS", no_startup=True) + self.rapi.AddResponse("19119") + self.assertEqual(19119, self.client.ReinstallInstance("baz-instance", "DOS", + no_startup=True)) self.assertHandler(rlib2.R_2_instances_name_reinstall) self.assertItems(["baz-instance"]) self.assertQuery("os", ["DOS"]) @@ -280,32 +309,30 @@ class GanetiRapiClientTests(unittest.TestCase): def testReplaceInstanceDisks(self): self.rapi.AddResponse("999") job_id = self.client.ReplaceInstanceDisks("instance-name", - ["hda", "hdc"], dry_run=True) + disks=[0, 1], dry_run=True, iallocator="hail") self.assertEqual(999, job_id) self.assertHandler(rlib2.R_2_instances_name_replace_disks) self.assertItems(["instance-name"]) - self.assertQuery("disks", ["hda,hdc"]) + self.assertQuery("disks", ["0,1"]) self.assertQuery("mode", ["replace_auto"]) self.assertQuery("iallocator", ["hail"]) self.assertDryRun() - self.assertRaises(client.InvalidReplacementMode, - self.client.ReplaceInstanceDisks, - "instance_a", ["hda"], mode="invalid_mode") - self.assertRaises(client.GanetiApiError, - self.client.ReplaceInstanceDisks, - "instance-foo", ["hda"], mode="replace_on_secondary") - self.rapi.AddResponse("1000") job_id = self.client.ReplaceInstanceDisks("instance-bar", - ["hda"], mode="replace_on_secondary", remote_node="foo-node", + disks=[1], mode="replace_on_secondary", remote_node="foo-node", dry_run=True) self.assertEqual(1000, job_id) self.assertItems(["instance-bar"]) - self.assertQuery("disks", ["hda"]) + self.assertQuery("disks", ["1"]) self.assertQuery("remote_node", ["foo-node"]) self.assertDryRun() + self.rapi.AddResponse("5175") + self.assertEqual(5175, self.client.ReplaceInstanceDisks("instance-moo")) + self.assertItems(["instance-moo"]) + self.assertQuery("disks", None) + def testGetJobs(self): self.rapi.AddResponse('[ { "id": "123", "uri": "\\/2\\/jobs\\/123" },' ' { "id": "124", "uri": "\\/2\\/jobs\\/124" } ]') @@ -318,8 +345,23 @@ class GanetiRapiClientTests(unittest.TestCase): self.assertHandler(rlib2.R_2_jobs_id) self.assertItems(["1234"]) - def testDeleteJob(self): - self.client.DeleteJob(999, dry_run=True) + def testWaitForJobChange(self): + fields = ["id", "summary"] + expected = { + "job_info": [123, "something"], + "log_entries": [], + } + + self.rapi.AddResponse(serializer.DumpJson(expected)) + result = self.client.WaitForJobChange(123, fields, [], -1) + self.assertEqualValues(expected, result) + self.assertHandler(rlib2.R_2_jobs_id_wait) + self.assertItems(["123"]) + + def testCancelJob(self): + self.rapi.AddResponse("[true, \"Job 123 will be canceled\"]") + self.assertEqual([True, "Job 123 will be canceled"], + self.client.CancelJob(999, dry_run=True)) self.assertHandler(rlib2.R_2_jobs_id) self.assertItems(["999"]) self.assertDryRun() @@ -383,11 +425,8 @@ class GanetiRapiClientTests(unittest.TestCase): self.client.SetNodeRole("node-foo", "master-candidate", force=True)) self.assertHandler(rlib2.R_2_nodes_name_role) self.assertItems(["node-foo"]) - self.assertQuery("force", ["True"]) - self.assertEqual("\"master-candidate\"", self.http.last_request_body) - - self.assertRaises(client.InvalidNodeRole, - self.client.SetNodeRole, "node-bar", "fake-role") + self.assertQuery("force", ["1"]) + self.assertEqual("\"master-candidate\"", self.http.last_request.data) def testGetNodeStorageUnits(self): self.rapi.AddResponse("42") @@ -398,10 +437,6 @@ class GanetiRapiClientTests(unittest.TestCase): self.assertQuery("storage_type", ["lvm-pv"]) self.assertQuery("output_fields", ["fields"]) - self.assertRaises(client.InvalidStorageType, - self.client.GetNodeStorageUnits, - "node-y", "floppy-disk", "fields") - def testModifyNodeStorageUnits(self): self.rapi.AddResponse("14") self.assertEqual(14, @@ -410,10 +445,27 @@ class GanetiRapiClientTests(unittest.TestCase): self.assertItems(["node-z"]) self.assertQuery("storage_type", ["lvm-pv"]) self.assertQuery("name", ["hda"]) - - self.assertRaises(client.InvalidStorageType, - self.client.ModifyNodeStorageUnits, - "node-n", "floppy-disk", "hdc") + self.assertQuery("allocatable", None) + + for allocatable, query_allocatable in [(True, "1"), (False, "0")]: + self.rapi.AddResponse("7205") + job_id = self.client.ModifyNodeStorageUnits("node-z", "lvm-pv", "hda", + allocatable=allocatable) + self.assertEqual(7205, job_id) + self.assertHandler(rlib2.R_2_nodes_name_storage_modify) + self.assertItems(["node-z"]) + self.assertQuery("storage_type", ["lvm-pv"]) + self.assertQuery("name", ["hda"]) + self.assertQuery("allocatable", [query_allocatable]) + + def testRepairNodeStorageUnits(self): + self.rapi.AddResponse("99") + self.assertEqual(99, self.client.RepairNodeStorageUnits("node-z", "lvm-pv", + "hda")) + self.assertHandler(rlib2.R_2_nodes_name_storage_repair) + self.assertItems(["node-z"]) + self.assertQuery("storage_type", ["lvm-pv"]) + self.assertQuery("name", ["hda"]) def testGetNodeTags(self): self.rapi.AddResponse("[\"fry\", \"bender\"]") @@ -431,7 +483,9 @@ class GanetiRapiClientTests(unittest.TestCase): self.assertQuery("tag", ["awesome"]) def testDeleteNodeTags(self): - self.client.DeleteNodeTags("node-w", ["awesome"], dry_run=True) + self.rapi.AddResponse("16861") + self.assertEqual(16861, self.client.DeleteNodeTags("node-w", ["awesome"], + dry_run=True)) self.assertHandler(rlib2.R_2_nodes_name_tags) self.assertItems(["node-w"]) self.assertDryRun() @@ -439,7 +493,4 @@ class GanetiRapiClientTests(unittest.TestCase): if __name__ == '__main__': - if httplib2 is None: - warnings.warn("These tests require the httplib2 library") - else: - testutils.GanetiTestProgram() + testutils.GanetiTestProgram() diff --git a/test/ganeti.utils_unittest.py b/test/ganeti.utils_unittest.py index a0fde9de6de662b51476935d0f8a9ba5a072a52b..71b47cbf53a76bc1a3cabd39c5901ef625be3a8a 100755 --- a/test/ganeti.utils_unittest.py +++ b/test/ganeti.utils_unittest.py @@ -39,6 +39,7 @@ import OpenSSL import warnings import distutils.version import glob +import md5 import ganeti import testutils @@ -662,6 +663,82 @@ class TestMatchNameComponent(unittest.TestCase): None) +class TestReadFile(testutils.GanetiTestCase): + def setUp(self): + testutils.GanetiTestCase.setUp(self) + + self.tmpdir = tempfile.mkdtemp() + self.fname = utils.PathJoin(self.tmpdir, "data1") + + def tearDown(self): + testutils.GanetiTestCase.tearDown(self) + + shutil.rmtree(self.tmpdir) + + def testReadAll(self): + data = utils.ReadFile(self._TestDataFilename("cert1.pem")) + self.assertEqual(len(data), 814) + + h = md5.new() + h.update(data) + self.assertEqual(h.hexdigest(), "a491efb3efe56a0535f924d5f8680fd4") + + def testReadSize(self): + data = utils.ReadFile(self._TestDataFilename("cert1.pem"), + size=100) + self.assertEqual(len(data), 100) + + h = md5.new() + h.update(data) + self.assertEqual(h.hexdigest(), "893772354e4e690b9efd073eed433ce7") + + def testReadOneline(self): + data = utils.ReadFile(self._TestDataFilename("cert1.pem"), + oneline=True) + self.assertEqual(len(data), 27) + self.assertEqual(data, "-----BEGIN CERTIFICATE-----") + + def testReadOnelineSize(self): + dummydata = (1024 * "Hello World! ") + self.assertFalse(set("\r\n") & set(dummydata)) + + utils.WriteFile(self.fname, data=dummydata) + + data = utils.ReadFile(self.fname, oneline=True, size=555) + self.assertEqual(len(data), 555) + self.assertEqual(data, dummydata[:555]) + self.assertFalse(set("\r\n") & set(data)) + + def testReadOnelineSize2(self): + for end in ["\n", "\r\n"]: + dummydata = (1024 * ("Hello World%s" % end)) + self.assert_(set("\r\n") & set(dummydata)) + + utils.WriteFile(self.fname, data=dummydata) + + data = utils.ReadFile(self.fname, oneline=True, size=555) + self.assertEqual(len(data), len("Hello World")) + self.assertEqual(data, dummydata[:11]) + self.assertFalse(set("\r\n") & set(data)) + + def testReadOnelineWhitespace(self): + for ws in [" ", "\t", "\t\t \t", "\t "]: + dummydata = (1024 * ("Foo bar baz %s\n" % ws)) + self.assert_(set("\r\n") & set(dummydata)) + + utils.WriteFile(self.fname, data=dummydata) + + data = utils.ReadFile(self.fname, oneline=True, size=555) + explen = len("Foo bar baz ") + len(ws) + self.assertEqual(len(data), explen) + self.assertEqual(data, dummydata[:explen]) + self.assertFalse(set("\r\n") & set(data)) + + def testError(self): + self.assertRaises(EnvironmentError, utils.ReadFile, + utils.PathJoin(self.tmpdir, "does-not-exist")) + + class TestTimestampForFilename(unittest.TestCase): def test(self): self.assert_("." not in utils.TimestampForFilename())