Commit 81c60832 authored by Stavros Sachtouris's avatar Stavros Sachtouris
Browse files

Move cluster handling in server create/delete

Refs: #4429
parent b8352ce4
......@@ -375,22 +375,41 @@ class server_create(_init_cyclades, _optional_json, _server_wait):
arguments = dict(
personality=PersonalityArgument(
(80 * ' ').join(howto_personality), ('-p', '--personality')),
wait=FlagArgument('Wait server to build', ('-w', '--wait'))
wait=FlagArgument('Wait server to build', ('-w', '--wait')),
cluster_size=IntArgument(
'Create a cluster of servers of this size. In this case, the name'
'parameter is the prefix of each server in the cluster (e.g.,'
'srv1, srv2, etc.',
'--cluster-size')
)
@errors.cyclades.cluster_size
def _create_cluster(self, prefix, flavor_id, image_id, size):
servers = [dict(
name='%s%s' % (prefix, i),
flavor_id=flavor_id,
image_id=image_id,
personality=self['personality']) for i in range(size)]
if size == 1:
return [self.client.create_server(**servers[0])]
return self.client.async_run(self.client.create_server, servers)
@errors.generic.all
@errors.cyclades.connection
@errors.plankton.id
@errors.cyclades.flavor_id
def _run(self, name, flavor_id, image_id):
r = self.client.create_server(
name, int(flavor_id), image_id, personality=self['personality'])
usernames = self._uuids2usernames([r['user_id'], r['tenant_id']])
r['user_id'] += ' (%s)' % usernames[r['user_id']]
r['tenant_id'] += ' (%s)' % usernames[r['tenant_id']]
self._print(r, self.print_dict)
if self['wait']:
self._wait(r['id'], r['status'])
for r in self._create_cluster(
name, flavor_id, image_id, size=self['cluster_size'] or 1):
print 'HEY I GOT A', r
print 'MKEY?????????????????'
usernames = self._uuids2usernames([r['user_id'], r['tenant_id']])
r['user_id'] += ' (%s)' % usernames[r['user_id']]
r['tenant_id'] += ' (%s)' % usernames[r['tenant_id']]
self._print(r, self.print_dict)
if self['wait']:
self._wait(r['id'], r['status'])
self.error('')
def main(self, name, flavor_id, image_id):
super(self.__class__, self)._run()
......@@ -421,27 +440,41 @@ class server_delete(_init_cyclades, _optional_output_cmd, _server_wait):
"""Delete a virtual server"""
arguments = dict(
wait=FlagArgument('Wait server to be destroyed', ('-w', '--wait'))
wait=FlagArgument('Wait server to be destroyed', ('-w', '--wait')),
cluster=FlagArgument(
'(DANGEROUS) Delete all virtual servers prefixed with the cluster '
'prefix. In that case, the prefix replaces the server id',
'--cluster')
)
def _server_ids(self, server_var):
if self['cluster']:
return [s['id'] for s in self.client.list_servers() if (
s['name'].startswith(server_var))]
@errors.cyclades.server_id
def _check_server_id(self, server_id):
return server_id
return [_check_server_id(self, server_id=server_var), ]
@errors.generic.all
@errors.cyclades.connection
@errors.cyclades.server_id
def _run(self, server_id):
status = 'DELETED'
def _run(self, server_var):
for server_id in self._server_ids(server_var):
if self['wait']:
details = self.client.get_server_details(server_id)
status = details['status']
r = self.client.delete_server(int(server_id))
r = self.client.delete_server(server_id)
self._optional_output(r)
if self['wait']:
self._wait(server_id, status)
def main(self, server_id):
def main(self, server_id_or_cluster_prefix):
super(self.__class__, self)._run()
self._run(server_id=server_id)
self._run(server_id_or_cluster_prefix)
@command(server_cmds)
......@@ -785,52 +818,6 @@ class server_wait(_init_cyclades, _server_wait):
self._run(server_id=server_id, current_status=current_status)
@command(server_cmds)
class server_cluster_create(_init_cyclades):
"""Create a cluster of virtual servers
All new servers will be named as <prefix><increment> e.g.,
mycluster1, mycluster2, etc.
All servers in the cluster will run the same image on the same hardware
flavor.
"""
@errors.generic.all
@errors.cyclades.connection
@errors.plankton.id
@errors.cyclades.flavor_id
@errors.cyclades.cluster_size
def _run(self, prefix, image_id, flavor_id, size):
servers = [dict(
name='%s%s' % (prefix, i),
flavor_id=flavor_id,
image_id=image_id) for i in range(int(size))]
self.client.create_cluster(servers)
def main(self, prefix, image_id, flavor_id, size):
super(self.__class__, self)._run()
self._run(prefix, image_id=image_id, flavor_id=flavor_id, size=size)
@command(server_cmds)
class server_cluster_delete(_init_cyclades):
"""Remove all servers that belong to a virtual cluster
A virtual cluster consists of the virtual servers with the same name prefix
ATTENTION: make sure you want to delete all servers of that prefix
To get a list of your servers: /server list
"""
@errors.generic.all
@errors.cyclades.connection
def _run(self, prefix):
servers = [s['id'] for s in self.client.list_servers() if (
s['name'].startswith(prefix))]
self.client.delete_cluster(servers)
def main(self, prefix):
super(self.__class__, self)._run()
self._run(prefix)
@command(flavor_cmds)
class flavor_list(_init_cyclades, _optional_json, _name_filter, _id_filter):
"""List available hardware flavors"""
......
......@@ -218,7 +218,7 @@ class cyclades(object):
return foo(self, *args, **kwargs)
except ValueError as ve:
msg = 'Invalid network id %s ' % network_id
details = ['network id must be a positive integer']
details = 'network id must be a positive integer'
raiseCLIError(ve, msg, details=details, importance=1)
except ClientError as ce:
if network_id and ce.status == 404 and (
......@@ -274,7 +274,7 @@ class cyclades(object):
return foo(self, *args, **kwargs)
except ValueError as ve:
msg = 'Invalid flavor id %s ' % flavor_id,
details = 'Flavor id must be a positive integer',
details = 'Flavor id must be a positive integer'
raiseCLIError(ve, msg, details=details, importance=1)
except ClientError as ce:
if flavor_id and ce.status == 404 and (
......@@ -294,7 +294,7 @@ class cyclades(object):
return foo(self, *args, **kwargs)
except ValueError as ve:
msg = 'Invalid virtual server id %s' % server_id,
details = ['id must be a positive integer'],
details = 'Server id must be a positive integer'
raiseCLIError(ve, msg, details=details, importance=1)
except ClientError as ce:
err_msg = ('%s' % ce).lower()
......
......@@ -369,6 +369,45 @@ class Client(Logged):
return []
return threadlist
def async_run(self, method, kwarg_list):
"""Fire threads of operations
:param method: the method to run in each thread
:param kwarg_list: (list of dicts) the arguments to pass in each method
call
:returns: (list) the results of each method call w.r. to the order of
kwarg_list
"""
flying, results = {}, {}
self._init_thread_limit()
for index, kwargs in enumerate(kwarg_list):
self._watch_thread_limit(flying.values())
flying[index] = SilentEvent(method=method, **kwargs)
flying[index].start()
unfinished = {}
for key, thread in flying.items():
if thread.isAlive():
unfinished[key] = thread
elif thread.exception:
print 'HERE IS AN EXCEPTION MK?'
raise thread.exception
else:
results[key] = thread.value
print 'NO EXCEPTION', thread.value
flying = unfinished
sendlog.info('- - - wait for threads to finish')
for key, thread in flying.items():
if thread.isAlive():
thread.join()
elif thread.exception:
print 'HERE IS AN EXCEPTION MK-2?'
raise thread.exception
results[key] = thread.value
print 'NO EXCEPTION-2', thread.value
return results.values()
def _raise_for_status(self, r):
log.debug('raise err from [%s] of type[%s]' % (r, type(r)))
status_msg = getattr(r, 'status', None) or ''
......
......@@ -33,7 +33,6 @@
from kamaki.clients import ClientError
from kamaki.clients.compute.rest_api import ComputeRestClient
from kamaki.clients.utils import path4url
class ComputeClient(ComputeRestClient):
......
......@@ -72,49 +72,6 @@ class CycladesClient(CycladesRestClient):
name, flavor_id, image_id,
metadata=metadata, personality=personality)
def _async_run(self, method, kwarg_list):
"""Fire threads of operations
:param method: the method to run in each thread
:param kwarg_list: (list of dicts) the arguments to pass in each method
call
"""
flying = []
self._init_thread_limit()
for kwargs in kwarg_list:
self._watch_thread_limit(flying)
flying.append(SilentEvent(method=method, **kwargs))
flying[-1].start()
unfinished = []
for thread in flying:
if thread.isAlive():
unfinished.append(thread)
elif thread.exception:
raise thread.exception
flying = unfinished
sendlog.info('- - - wait for threads to finish')
for thread in flying:
if thread.isAlive():
thread.join()
elif thread.exception:
raise thread.exception
def create_cluster(self, servers):
"""Create multiple servers asynchronously
:param servers: (list of dicts) [
{name, flavor_id, image_id, metadata, personality}, ...]
"""
# Perform async server creations
return self._async_run(self.create_server, servers)
def delete_cluster(self, server_ids):
"""
:param server_ids: (list) ids of servers to delete
"""
self._async_run(
self.delete_server, [dict(server_id=sid) for sid in server_ids])
def start_server(self, server_id):
"""Submit a startup request
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment