Commit e02728f9 authored by Stavros Sachtouris's avatar Stavros Sachtouris
Browse files

Use standard threading instead of gevent/greenlets

gevent depedency is not needed after that
parent cf49aa14
......@@ -34,10 +34,6 @@
from __future__ import print_function
import gevent.monkey
#Monkey-patch everything for gevent early on
gevent.monkey.patch_all()
import logging
from inspect import getargspec
......
......@@ -36,7 +36,6 @@ from urlparse import urlparse
from synnefo.lib.pool.http import get_http_connection
from kamaki.clients.connection import HTTPConnection, HTTPResponse,\
HTTPConnectionError
from gevent.dns import DNSError
from socket import gaierror
from json import loads
......@@ -141,7 +140,7 @@ class KamakiHTTPConnection(HTTPConnection):
body=data)
except Exception as err:
conn.close()
if isinstance(err, DNSError) or isinstance(err, gaierror):
if isinstance(err, gaierror):
raise HTTPConnectionError('Cannot connect to %s' % self.url,
status=701,
details='%s: %s' % (type(err), unicode(err)))
......
......@@ -31,12 +31,6 @@
# interpreted as representing official policies, either expressed
# or implied, of GRNET S.A.
import gevent
import gevent.monkey
# Monkey-patch everything for gevent early on
gevent.monkey.patch_all()
import gevent.pool
import unittest
import sys
from StringIO import StringIO
......@@ -68,6 +62,7 @@ class testKamakiCon(unittest.TestCase):
def tearDown(self):
pass
"""
def _get_async_content(self, con, **kwargs):
class SilentGreenlet(gevent.Greenlet):
def _report_error(self, exc_info):
......@@ -84,6 +79,7 @@ class testKamakiCon(unittest.TestCase):
g = SilentGreenlet(self._get_content_len, con, **kwargs)
self.async_pool.start(g)
return g
"""
def _get_content_len(self, con, **kwargs):
r = con.perform_request('GET', **kwargs)
......@@ -113,8 +109,6 @@ class testKamakiCon(unittest.TestCase):
self.assertNotEqual(r2, r4)
#print('1:%s 2:%s 3:%s 4:%s 5:%s'%(r1, r2, r3, r4, r5))
gevent.joinall([h1, h2, h3, h4, h5])
if __name__ == '__main__':
suiteFew = unittest.TestSuite()
suiteFew.addTest(unittest.makeSuite(testKamakiCon))
......
......@@ -31,16 +31,11 @@
# interpreted as representing official policies, either expressed
# or implied, of GRNET S.A.
import gevent
#import gevent.monkey
# Monkey-patch everything for gevent early on
#gevent.monkey.patch_all()
import gevent.pool
from threading import Thread
from os import fstat
from hashlib import new as newhashlib
from time import time, sleep
import sys
from time import time
from binascii import hexlify
......@@ -69,9 +64,37 @@ def _range_up(start, end, a_range):
return (start, end)
class SilentEvent(Thread):
""" Thread-run method(*args, **kwargs)
put exception in exception_bucket
"""
def __init__(self, method, *args, **kwargs):
super(self.__class__, self).__init__()
self.method = method
self.args = args
self.kwargs = kwargs
@property
def exception(self):
return getattr(self, '_exception', False)
@property
def value(self):
return getattr(self, '_value', None)
def run(self):
try:
self._value = self.method(*(self.args), **(self.kwargs))
except Exception as e:
print('______\n%s\n_______' % e)
self._exception = e
class PithosClient(PithosRestAPI):
"""GRNet Pithos API client"""
_thread_exceptions = []
def __init__(self, base_url, token, account=None, container=None):
super(PithosClient, self).__init__(base_url, token, account, container)
self.async_pool = None
......@@ -117,20 +140,9 @@ class PithosClient(PithosRestAPI):
# upload_* auxiliary methods
def put_block_async(self, data, hash):
class SilentGreenlet(gevent.Greenlet):
def _report_error(self, exc_info):
try:
sys.stderr = StringIO()
gevent.Greenlet._report_error(self, exc_info)
finally:
if hasattr(sys, '_stderr'):
sys.stderr = sys._stderr
POOL_SIZE = self.POOL_SIZE if hasattr(self, 'POOL_SIZE') else 5
if self.async_pool is None:
self.async_pool = gevent.pool.Pool(size=POOL_SIZE)
g = SilentGreenlet(self.put_block, data, hash)
self.async_pool.start(g)
return g
event = SilentEvent(method=self.put_block, data=data, hash=hash)
event.start()
return event
def put_block(self, data, hash):
r = self.container_post(update=True,
......@@ -233,23 +245,26 @@ class PithosClient(PithosRestAPI):
data = fileobj.read(bytes)
r = self.put_block_async(data, hash)
flying.append(r)
for r in flying:
if r.ready():
if r.exception:
raise r.exception
unfinished = []
for thread in flying:
if thread.isAlive() or thread.exception:
unfinished.append(thread)
else:
if upload_cb:
upload_gen.next()
flying = [r for r in flying if not r.ready()]
flying = unfinished
while upload_cb:
try:
upload_gen.next()
except StopIteration:
break
gevent.joinall(flying)
for thread in flying:
thread.join()
failures = [r for r in flying if r.exception]
if len(failures):
details = ', '.join(['(%s).%s' % (i, r.exception)\
details = ', '.join([' (%s).%s' % (i, r.exception)\
for i, r in enumerate(failures)])
raise ClientError(message="Block uploading failed",
status=505,
......@@ -340,23 +355,12 @@ class PithosClient(PithosRestAPI):
dst.flush()
def _get_block_async(self, obj, **restargs):
class SilentGreenlet(gevent.Greenlet):
def _report_error(self, exc_info):
try:
sys.stderr = StringIO()
gevent.Greenlet._report_error(self, exc_info)
finally:
if hasattr(sys, '_stderr'):
sys.stderr = sys._stderr
if not hasattr(self, 'POOL_SIZE'):
self.POOL_SIZE = 5
if self.async_pool is None:
self.async_pool = gevent.pool.Pool(size=self.POOL_SIZE)
g = SilentGreenlet(self.object_get, obj,
event = SilentEvent(self.object_get,
obj,
success=(200, 206),
**restargs)
self.async_pool.start(g)
return g
event.start()
return event
def _hash_from_file(self, fp, start, size, blockhash):
fp.seek(start)
......@@ -365,8 +369,8 @@ class PithosClient(PithosRestAPI):
h.update(block.strip('\x00'))
return hexlify(h.digest())
def _greenlet2file(self,
flying_greenlets,
def _thread2file(self,
flying,
local_file,
offset=0,
**restargs):
......@@ -375,15 +379,15 @@ class PithosClient(PithosRestAPI):
- e.g. if the range is 10-100, all
blocks will be written to normal_position - 10"""
finished = []
for start, g in flying_greenlets.items():
if g.ready():
for start, g in flying.items():
if not g.isAlive():
if g.exception:
raise g.exception
block = g.value.content
local_file.seek(start - offset)
local_file.write(block)
self._cb_next()
finished.append(flying_greenlets.pop(start))
finished.append(flying.pop(start))
local_file.flush()
return finished
......@@ -399,8 +403,8 @@ class PithosClient(PithosRestAPI):
**restargs):
file_size = fstat(local_file.fileno()).st_size if resume else 0
flying_greenlets = {}
finished_greenlets = []
flying = {}
finished = []
offset = 0
if filerange is not None:
rstart = int(filerange.split('-')[0])
......@@ -414,8 +418,8 @@ class PithosClient(PithosRestAPI):
blockhash):
self._cb_next()
continue
if len(flying_greenlets) >= self.POOL_SIZE:
finished_greenlets += self._greenlet2file(flying_greenlets,
if len(flying) >= self.POOL_SIZE:
finished += self._thread2file(flying,
local_file,
offset,
**restargs)
......@@ -426,17 +430,11 @@ class PithosClient(PithosRestAPI):
self._cb_next()
continue
restargs['async_headers'] = {'Range': 'bytes=%s-%s' % (start, end)}
flying_greenlets[start] = self._get_block_async(obj, **restargs)
#check the greenlets
while len(flying_greenlets) > 0:
sleep(0.001)
finished_greenlets += self._greenlet2file(flying_greenlets,
local_file,
offset,
**restargs)
flying[start] = self._get_block_async(obj, **restargs)
gevent.joinall(finished_greenlets)
for thread in flying.values():
thread.join()
finished += self._thread2file(flying, local_file, offset, **restargs)
def download_object(self,
obj,
......
......@@ -31,10 +31,6 @@
# interpreted as representing official policies, either expressed
# or implied, of GRNET S.A.
import gevent.monkey
# Monkey-patch everything for gevent early on
gevent.monkey.patch_all()
from argparse import ArgumentParser
import unittest
import time
......
......@@ -34,26 +34,22 @@
# or implied, of GRNET S.A.
from setuptools import setup
#from sys import version_info
from sys import version_info
import kamaki
#Suggested packages can be installed manually later, but it is not nessecary
suggested = ['ansicolors==1.0.2', 'progress==1.0.1']
required = ['gevent>=0.13.6', 'snf-common>=0.10', 'argparse']
optional = ['ansicolors', 'progress']
required = ['snf-common>=0.10', 'argparse']
setup(
name='kamaki',
version=kamaki.__version__,
description='A command-line tool for poking clouds',
description='A command-line tool for managing clouds',
long_description=open('README.rst').read(),
url='http://code.grnet.gr/projects/kamaki',
license='BSD',
packages=['kamaki',
'kamaki.cli',
'kamaki.clients',
'kamaki.clients.connection',
'kamaki.cli.commands'],
packages=['kamaki', 'kamaki.clients', 'kamaki.clients.connection', 'kamaki.cli', 'kamaki.cli.commands'],
include_package_data=True,
entry_points={
'console_scripts': ['kamaki = kamaki.cli:main']
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment