Commit 80f20e72 authored by Stavros Sachtouris's avatar Stavros Sachtouris Committed by Giorgos Korfiatis

Improve the blocking mechanism in kamaki clients

The "Waiter" class is now deprecated. A "wait" method is used
instead. The new method is located at "kamaki.clients" and has
this signature:
wait(
    poll method, poll method params,
    stop method,
    delay=1, timeout=100,
    caller callback method=None)

The poll method is provided by the caller and is typically the
"get_details" method of each API client.

The "poll method params" is a list of parameters for the polling
method, typically just the item id (i.e., server_id, port_id,
volume_id).

The stop method returns a boolean, is called after each polling
and decides whether the blocking should stop.

The delay is the time between each call of the poll method

The timeout is the total time after which the blocker stops

The caller callback method is not a generator anymore, instead it
is just a method that takes the results of the poll method as
input and it is called at each iteration. It remains optional.
Typically, the caller may use it to advance a progress bar.

The CycladesComputeClient, CycladesNetworkClient and
CycladesBlockStorageClient have been updated so that they make use
of the new method, but they retain the ability to call the Waiter
methods for two more versions, due to backwards compatibility
policies.

The CLI has been adjusted to use the new mechanism.
parent 7a352c9a
......@@ -389,19 +389,32 @@ class Wait(object):
def wait(
self, service, service_id, status_method, status,
countdown=True, timeout=60, msg='still'):
(progress_bar, wait_cb) = self._safe_progress_bar(
countdown=True, timeout=60, msg='still', update_cb=None):
(progress_bar, wait_gen) = self._safe_progress_bar(
'%s %s status %s %s' % (service, service_id, msg, status),
countdown=countdown, timeout=timeout)
wait_step = None
if wait_gen:
wait_step = wait_gen(timeout)
wait_step.next()
def wait_cb(item_details):
if wait_step:
if update_cb:
progress_bar.bar.goto(update_cb(item_details))
else:
wait_step.next()
try:
new_mode = status_method(
item_details = status_method(
service_id, status, max_wait=timeout, wait_cb=wait_cb)
if new_mode:
self.error(
'%s %s status: %s' % (service, service_id, new_mode))
if item_details:
self.error('')
self.error('%s %s status: %s' % (
service, service_id, item_details['status']))
else:
exit("Operation timed out")
except KeyboardInterrupt:
self.error('\n- canceled')
self.error(' - canceled')
finally:
self._safe_progress_bar_finish(progress_bar)
......@@ -71,10 +71,17 @@ server_states = ('BUILD', 'ACTIVE', 'STOPPED', 'REBOOT', 'ERROR')
class _ServerWait(Wait):
def wait_while(self, server_id, current_status, timeout=60):
if current_status in ('BUILD', ):
def update_cb(item_details):
return item_details.get('progress', None)
else:
update_cb = None
super(_ServerWait, self).wait(
'Server', server_id, self.client.wait_server_while, current_status,
countdown=(current_status not in ('BUILD', )),
timeout=timeout if current_status not in ('BUILD', ) else 100)
timeout=timeout, update_cb=update_cb)
def wait_until(self, server_id, target_status, timeout=60):
super(_ServerWait, self).wait(
......
......@@ -596,12 +596,38 @@ class Client(Logged):
return self.request('move', path, **kwargs)
def wait(poll, poll_params, stop, delay=1, timeout=100, wait_cb=None):
"""Wait as long as the stop method returns False, polling each round
:param poll: (method) the polling method is called with poll_params. By
convention, it returns a dict of information about the item
:param poll_params: (iterable) each round, call poll with these parameters
:param stop: (method) gets the results of poll method as input and decides
if the wait method should stop
:param delay: (int) how long to wait (in seconds) between polls
:param timeout: (int) if this number of polls is reached, stop
:param wait_cb: (method) a call back method that takes item_details as
input
:returns: (dict) the last details dict of the item
"""
results = None
for polls in range(timeout // delay):
results = poll(*poll_params)
if wait_cb:
wait_cb(results)
if stop(results):
break
sleep(delay)
return results
class Waiter(object):
"""Use this class to provide blocking API methods - DEPRECATED FROM 0.16"""
def _wait(
self, item_id, wait_status, get_status,
delay=1, max_wait=100, wait_cb=None, wait_until_status=False):
"""Wait while the item is still in wait_status or to reach it
"""DEPRECATED, to be removed in 0.16
Wait while the item is still in wait_status or to reach it
:param server_id: integer (str or int)
......
......@@ -32,7 +32,7 @@
# or implied, of GRNET S.A.
from kamaki.clients.blockstorage.rest_api import BlockStorageRestClient
from kamaki.clients import ClientError, Waiter
from kamaki.clients import ClientError, Waiter, wait
class BlockStorageClient(BlockStorageRestClient, Waiter):
......@@ -142,19 +142,40 @@ class BlockStorageClient(BlockStorageRestClient, Waiter):
# Wait methods
def get_volume_status(self, volume_id):
"""Deprecated, will be removed in 0.15"""
r = self.get_volume_details(volume_id)
return r['status'], None
def wait_volume(
self, volume_id, stop=None, delay=1, timeout=100, wait_cb=None):
"""Wait (block) while the stop method returns True, poll for status
each time
:param volume_id: (str)
:param stop: (method) takes the volume details dict as input, returns
true if the blocking must stop. Default: wait while 'creating'
:param delay: (int) seconds between polls
:param timeout: (int) in seconds
:param wait_cb: (method) optional call back method called after each
poll, provided by the caller. Typically used to monitor progress
Takes volume details dict as parameter
"""
return wait(
self.get_volume_details, (volume_id, ),
stop or (lambda i: i['status'] != 'creating'),
delay, timeout, wait_cb)
def wait_volume_while(
self, volume_id,
current_status='creating', delay=1, max_wait=100, wait_cb=None):
return self.wait_while(
volume_id, current_status, BlockStorageClient.get_volume_status,
return wait(
self.get_volume_details, (volume_id, ),
lambda i: i['status'] != current_status,
delay, max_wait, wait_cb)
def wait_volume_until(
self, volume_id,
target_status='in_use', delay=1, max_wait=100, wait_cb=None):
return self.wait_until(
volume_id, target_status, BlockStorageClient.get_volume_status,
return wait(
self.get_volume_details, (volume_id, ),
lambda i: i['status'] == target_status,
delay, max_wait, wait_cb)
......@@ -35,7 +35,7 @@ from kamaki.clients.cyclades.rest_api import (
CycladesComputeRestClient, CycladesBlockStorageRestClient)
from kamaki.clients.network import NetworkClient
from kamaki.clients.utils import path4url
from kamaki.clients import ClientError, Waiter
from kamaki.clients import ClientError, Waiter, wait
class CycladesComputeClient(CycladesComputeRestClient, Waiter):
......@@ -173,7 +173,8 @@ class CycladesComputeClient(CycladesComputeRestClient, Waiter):
return r.json
def get_server_status(self, server_id):
""":returns: (current status, progress percentile if available)"""
"""Deprecated - will be removed in version 0.15
:returns: (current status, progress percentile if available)"""
r = self.get_server_details(server_id)
return r['status'], (r.get('progress', None) if (
r['status'] in ('BUILD', )) else None)
......@@ -189,8 +190,9 @@ class CycladesComputeClient(CycladesComputeRestClient, Waiter):
:param wait_cb: if set a progressbar is used to show progress
:returns: (str) the new mode if succesfull, (bool) False if timed out
"""
return self.wait_while(
server_id, current_status, CycladesComputeClient.get_server_status,
return wait(
self.get_server_details, (server_id, ),
lambda i: i['status'] != current_status,
delay, max_wait, wait_cb)
def wait_server_until(
......@@ -204,15 +206,16 @@ class CycladesComputeClient(CycladesComputeRestClient, Waiter):
:param wait_cb: if set a progressbar is used to show progress
:returns: (str) the new mode if succesfull, (bool) False if timed out
"""
return self.wait_until(
server_id, target_status, CycladesComputeClient.get_server_status,
return wait(
self.get_server_details, (server_id, ),
lambda i: i['status'] == target_status,
delay, max_wait, wait_cb)
# Backwards compatibility
# Backwards compatibility - deprecated, will be replaced in 0.15
wait_server = wait_server_while
# Backwards compatibility
# Backwards compatibility - will be removed in 0.15
CycladesClient = CycladesComputeClient
......
......@@ -31,7 +31,7 @@
# interpreted as representing official policies, either expressed
# or implied, of GRNET S.A.
from kamaki.clients import ClientError, Waiter
from kamaki.clients import ClientError, wait, Waiter
from kamaki.clients.network.rest_api import NetworkRestClient
......@@ -363,25 +363,30 @@ class NetworkClient(NetworkRestClient, Waiter):
r = self.floatingips_delete(floatingip_id, success=204)
return r.headers
# Wait methods
def get_port_status(self, port_id):
"""Deprecated, will be removed in version 0.15"""
r = self.get_port_details(port_id)
return r['status'], None
# Wait methods
def wait_port_while(
self, port_id,
current_status='BUILD', delay=1, max_wait=100, wait_cb=None):
return self.wait_while(
port_id, current_status, NetworkClient.get_port_status,
"""Wait for port while in current_status"""
return wait(
self.get_port_details, (port_id, ),
lambda i: i['status'] != current_status,
delay, max_wait, wait_cb)
def wait_port_until(
self, port_id,
target_status='ACTIVE', delay=1, max_wait=100, wait_cb=None):
return self.wait_until(
port_id, target_status, NetworkClient.get_port_status,
target_status='BUILD', delay=1, max_wait=100, wait_cb=None):
"""Wait for port while in current_status"""
return wait(
self.get_port_details, (port_id, ),
lambda i: i['status'] == target_status,
delay, max_wait, wait_cb)
# Backwards compatibility
# Backwards compatibility - deprecated, will be replaced in version 0.15
wait_port = wait_port_while
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment