pithos_client.py 14.7 KB
Newer Older
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
# Copyright (C) 2015 GRNET S.A.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

16 17 18 19
from functools import wraps
import time
import os
import logging
20
import re
21

22
from agkyra.syncer import utils, common, messaging, database
23 24
from agkyra.syncer.file_client import FileClient
from agkyra.syncer.setup import ClientError
25
from agkyra.syncer.database import TransactedConnection
26 27 28 29

logger = logging.getLogger(__name__)


Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
30 31 32 33 34 35 36 37 38 39 40 41 42
def handle_client_errors(f):
    @wraps(f)
    def inner(*args, **kwargs):
        try:
            return f(*args, **kwargs)
        except ClientError as e:
            if e.status == 412:  # Precondition failed
                raise common.CollisionError(e)
            # TODO handle other cases, too
            raise common.SyncError(e)
    return inner


43
class PithosSourceHandle(object):
44
    def __init__(self, client, source_state):
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
45
        self.SIGNATURE = "PithosSourceHandle"
46 47
        self.client = client
        settings = client.settings
48 49 50 51 52
        self.settings = settings
        self.endpoint = settings.endpoint
        self.cache_fetch_name = settings.cache_fetch_name
        self.cache_fetch_path = settings.cache_fetch_path
        self.cache_path = settings.cache_path
53 54
        self.syncer_dbtuple = settings.syncer_dbtuple
        self.client_dbtuple = client.client_dbtuple
55
        self.source_state = source_state
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
56
        self.objname = source_state.objname
57 58 59
        self.heartbeat = settings.heartbeat

    def register_fetch_name(self, filename):
60 61 62 63
        with TransactedConnection(self.client_dbtuple) as db:
            return self._register_fetch_name(db, filename)

    def _register_fetch_name(self, db, filename):
64
        f = utils.hash_string(filename) + "_" + \
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
65
            utils.str_time_stamp()
66 67
        fetch_name = utils.join_path(self.cache_fetch_name, f)
        self.fetch_name = fetch_name
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
68
        db.insert_cachename(fetch_name, self.SIGNATURE, filename)
69 70
        return utils.join_path(self.cache_path, fetch_name)

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
71
    @handle_client_errors
72
    def send_file(self, sync_state):
73
        fetched_fspath = self.register_fetch_name(self.objname)
74
        headers = dict()
75
        with open(fetched_fspath, mode='wb+') as fil:
76
            try:
77 78
                logger.debug("Downloading object: '%s', to: '%s'" %
                             (self.objname, fetched_fspath))
79
                self.endpoint.download_object(
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
80
                    self.objname,
81 82 83 84 85 86 87 88 89 90 91 92 93
                    fil,
                    headers=headers)
            except ClientError as e:
                if e.status == 404:
                    actual_info = {}
                else:
                    raise e
            else:
                actual_etag = headers["x-object-hash"]
                actual_type = (common.T_DIR if object_isdir(headers)
                               else common.T_FILE)
                actual_info = {"pithos_etag": actual_etag,
                               "pithos_type": actual_type}
94
            self.check_update_source_state(actual_info)
95
        if actual_info == {}:
96 97
            logger.debug("Downloading object: '%s', object is gone."
                         % self.objname)
98
            os.unlink(fetched_fspath)
99
        elif actual_info["pithos_type"] == common.T_DIR:
100 101
            logger.debug("Downloading object: '%s', object is dir."
                         % self.objname)
102 103 104
            os.unlink(fetched_fspath)
            os.mkdir(fetched_fspath)
        return fetched_fspath
105

106
    def update_state(self, state):
107 108
        with TransactedConnection(self.syncer_dbtuple) as db:
            db.put_state(state)
109 110 111

    def check_update_source_state(self, actual_info):
        if actual_info != self.source_state.info:
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
112 113 114 115
            msg = messaging.LiveInfoUpdateMessage(
                archive=self.SIGNATURE, objname=self.objname,
                info=actual_info, logger=logger)
            self.settings.messager.put(msg)
116 117 118 119
            new_state = self.source_state.set(info=actual_info)
            self.update_state(new_state)
            self.source_state = new_state

120 121 122 123
    def get_synced_state(self):
        return self.source_state

    def unstage_file(self):
124
        pass
125

126 127 128 129 130
STAGED_FOR_DELETION_SUFFIX = ".pithos_staged_for_deletion"
exclude_staged_regex = ".*" + STAGED_FOR_DELETION_SUFFIX + "$"
exclude_pattern = re.compile(exclude_staged_regex)


131
class PithosTargetHandle(object):
132 133 134
    def __init__(self, client, target_state):
        self.client = client
        settings = client.settings
135 136 137
        self.settings = settings
        self.endpoint = settings.endpoint
        self.target_state = target_state
138
        self.target_objname = target_state.objname
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
139
        self.objname = target_state.objname
140 141
        self.heartbeat = settings.heartbeat

142 143
    def mk_del_name(self, name, etag):
        return "%s.%s%s" % (name, etag, STAGED_FOR_DELETION_SUFFIX)
144

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
145 146
    def safe_object_del(self, objname, etag):
        del_name = self.mk_del_name(objname, etag)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
147
        logger.debug("Moving upstream temporarily to '%s'" % del_name)
148
        self._move_object(objname, etag, del_name)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
149
        self._del_object(del_name)
150 151 152

    def _move_object(self, objname, etag, del_name):
        container = self.endpoint.container
153
        dest = common.OBJECT_DIRSEP + utils.join_objname(container, del_name)
154 155
        try:
            self.endpoint.object_move(
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
156
                objname,
157
                destination=dest,
158 159 160
                if_etag_match=etag)
        except ClientError as e:
            if e.status == 404:
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
161 162
                logger.warning("Upstream '%s' not found; already moved?"
                               % objname)
163 164
            else:
                raise
165

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
166 167 168
    def _del_object(self, del_name):
        try:
            self.endpoint.del_object(del_name)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
169
            logger.debug("Deleted upstream tmp '%s'" % del_name)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
170 171
        except ClientError as e:
            if e.status == 404:
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
172 173
                logger.warning("Upstream '%s' not found; already deleted?"
                               % del_name)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
174 175 176
            else:
                raise

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
177
    def directory_put(self, objname, etag):
178
        if_etag_not_match = '*' if not(etag) else None
179
        r = self.endpoint.object_put(
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
180
            objname,
181 182
            content_type='application/directory',
            content_length=0,
183
            if_etag_not_match=if_etag_not_match,
184 185 186
            if_etag_match=etag)
        return r

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
187
    @handle_client_errors
188
    def pull(self, source_handle, sync_state):
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
189
        # assert isinstance(source_handle, LocalfsSourceHandle)
190 191
        info = sync_state.info
        etag = info.get("pithos_etag")
192 193 194
        try:
            if source_handle.info_is_deleted_or_unhandled():
                if etag is not None:
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
195
                    logger.debug("Deleting object '%s'" % self.target_objname)
196 197 198
                    self.safe_object_del(self.target_objname, etag)
                live_info = {}
            elif source_handle.info_is_dir():
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
199
                logger.debug("Creating dir '%s'" % self.target_objname)
200 201
                r = self.directory_put(self.target_objname, etag)
                synced_etag = r.headers["etag"]
202
                live_info = {"pithos_etag": synced_etag,
203 204 205 206 207 208
                             "pithos_type": common.T_DIR}
            else:
                with open(source_handle.staged_path, mode="rb") as fil:
                    r = self.endpoint.upload_object(
                        self.target_objname,
                        fil,
209 210
                        if_not_exist=not(etag),
                        if_etag_match=etag)
211 212
                    synced_etag = r["etag"]
                live_info = {"pithos_etag": synced_etag,
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
213
                             "pithos_type": common.T_FILE}
214 215 216 217 218 219 220 221 222
            return self.target_state.set(info=live_info)
        except ClientError as e:
            if e.status == 412:  # Precondition failed
                msg = messaging.CollisionMessage(
                    objname=self.target_objname, etag=etag, logger=logger)
                self.settings.messager.put(msg)
                raise common.CollisionError(e)
            else:
                raise
223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240


def object_isdir(obj):
    try:
        content_type = obj["content_type"]
    except KeyError:
        content_type = obj["content-type"]
    return any(txt in content_type for txt in ['application/directory',
                                               'application/folder'])


PITHOS_TYPE = "pithos_type"
PITHOS_ETAG = "pithos_etag"


class PithosFileClient(FileClient):
    def __init__(self, settings):
        self.settings = settings
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
241
        self.SIGNATURE = "PithosFileClient"
242 243 244
        self.auth_url = settings.auth_url
        self.auth_token = settings.auth_token
        self.container = settings.container
245 246 247 248 249 250
        self.syncer_dbtuple = settings.syncer_dbtuple
        client_dbname = self.SIGNATURE+'.db'
        self.client_dbtuple = common.DBTuple(
            dbtype=database.ClientDB,
            dbname=utils.join_path(settings.instance_path, client_dbname))
        database.initialize(self.client_dbtuple)
251
        self.endpoint = settings.endpoint
252
        self.last_modification = "0000-00-00"
253
        self.probe_candidates = utils.ThreadSafeDict()
254 255 256 257 258
        self.check_enabled()

    def check_enabled(self):
        if not self.settings.pithos_is_enabled():
            msg = messaging.PithosSyncDisabled(logger=logger)
259 260 261
        else:
            msg = messaging.PithosSyncEnabled(logger=logger)
        self.settings.messager.put(msg)
262

263 264 265 266 267 268 269 270 271 272
    def remove_candidates(self, objnames, ident):
        with self.probe_candidates.lock() as d:
            for objname in objnames:
                try:
                    cached = d.pop(objname)
                    if cached["ident"] != ident:
                        d[objname] = cached
                except KeyError:
                    pass

273
    def list_candidate_files(self, forced=False):
274 275 276 277 278
        with self.probe_candidates.lock() as d:
            if forced:
                candidates = self.get_pithos_candidates()
                d.update(candidates)
            return d.keys()
279 280

    def get_pithos_candidates(self, last_modified=None):
281 282
        if not self.settings.pithos_is_enabled():
            return {}
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
283 284 285
        try:
            objects = self.endpoint.list_objects()
        except ClientError as e:
286 287 288 289
            if e.status == 404:
                self.settings.set_pithos_enabled(False)
                msg = messaging.PithosSyncDisabled(logger=logger)
                self.settings.messager.put(msg)
290 291 292
            elif e.status == 401:
                msg = messaging.PithosAuthTokenError(logger=logger, exc=e)
                self.settings.messager.put(msg)
293
            else:
294 295
                msg = messaging.PithosGenericError(logger=logger, exc=e)
                self.settings.messager.put(msg)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
296
            return {}
297
        self.objects = objects
298 299 300
        upstream_all = {}
        for obj in objects:
            name = obj["name"]
301 302 303 304
            upstream_all[name] = {
                "ident": None,
                "info": self.get_object_live_info(obj)
            }
305 306 307
            obj_last_modified = obj["last_modified"]
            if obj_last_modified > self.last_modification:
                self.last_modification = obj_last_modified
308
        upstream_all_names = set(upstream_all.keys())
309
        if last_modified is not None:
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
310 311 312 313 314 315
            upstream_modified = {}
            for obj in objects:
                name = obj["name"]
                if obj["last_modified"] > last_modified:
                    upstream_modified[name] = upstream_all[name]
            candidates = upstream_modified
316
        else:
317 318
            candidates = upstream_all

319
        newly_deleted = self.get_newly_deleted(upstream_all_names)
320
        candidates.update(newly_deleted)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
321 322
        logger.debug("Candidates since %s: %s" %
                     (last_modified, candidates))
323 324
        return candidates

325 326 327 328 329 330 331 332 333 334 335
    def get_newly_deleted(self, upstream_all_names):
        try:
            with TransactedConnection(self.syncer_dbtuple) as db:
                non_deleted_in_db = set(
                    db.list_non_deleted_files(self.SIGNATURE))
        except common.DatabaseError:
            return {}
        newly_deleted_names = non_deleted_in_db.difference(upstream_all_names)
        logger.debug("newly_deleted %s" % newly_deleted_names)
        return dict((name, {"ident": None, "info": {}})
                    for name in newly_deleted_names)
336

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
337
    def notifier(self):
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
338
        interval = self.settings.pithos_list_interval
339 340
        class PollPithosThread(utils.StoppableThread):
            def run_body(this):
341
                candidates = self.get_pithos_candidates(
342
                    last_modified=self.last_modification)
343 344
                with self.probe_candidates.lock() as d:
                    d.update(candidates)
345 346
                time.sleep(interval)
        return utils.start_daemon(PollPithosThread)
347

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
348
    def get_object(self, objname):
349
        try:
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
350
            return self.endpoint.get_object_info(objname)
351 352 353 354 355 356 357 358 359
        except ClientError as e:
            if e.status == 404:
                return None
            raise e

    def get_object_live_info(self, obj):
        if obj is None:
            return {}
        p_type = common.T_DIR if object_isdir(obj) else common.T_FILE
360 361 362
        obj_hash = obj.get("x-object-hash")
        if obj_hash is None:
            obj_hash = obj.get("x_object_hash")
363 364 365 366
        return {PITHOS_ETAG: obj_hash,
                PITHOS_TYPE: p_type,
                }

367
    def probe_file(self, objname, old_state, ref_state, ident):
368
        info = old_state.info
369
        with self.probe_candidates.lock() as d:
370 371 372 373 374 375
            try:
                cached = d[objname]
                cached_info = cached["info"]
                cached["ident"] = ident
            except KeyError:
                cached_info = None
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
376 377 378
        if exclude_pattern.match(objname):
            logger.warning("Ignoring probe archive: %s, object: '%s'" %
                           (old_state.archive, objname))
379
            return
380
        if cached_info is None:
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
381
            obj = self.get_object(objname)
382 383
            live_info = self.get_object_live_info(obj)
        else:
384
            live_info = cached_info
385
        if info != live_info:
386 387
            live_state = old_state.set(info=live_info)
            return live_state
388 389

    def stage_file(self, source_state):
390
        return PithosSourceHandle(self, source_state)
391 392

    def prepare_target(self, target_state):
393
        return PithosTargetHandle(self, target_state)