localfs_client.py 26 KB
Newer Older
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# Copyright (C) 2015 GRNET S.A.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

16
import os
17
import stat
18
19
20
import re
import datetime
import psutil
21
import time
22
import filecmp
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
23
import shutil
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
24
import errno
25

26
27
28
29
30
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
import logging

from agkyra.syncer.file_client import FileClient
31
32
from agkyra.syncer import utils, common, messaging, database
from agkyra.syncer.database import TransactedConnection
33
34
35
36
37
38
39
40
41
42

logger = logging.getLogger(__name__)

LOCAL_FILE = 0
LOCAL_EMPTY_DIR = 1
LOCAL_NONEMPTY_DIR = 2
LOCAL_MISSING = 3
LOCAL_SOFTLINK = 4
LOCAL_OTHER = 5

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
43
DEFAULT_MTIME_PRECISION = 1e-4
44

45
46
47
48
exclude_regexes = ["\.#", "\.~", "~\$", "~.*\.tmp$", "\..*\.swp$"]
exclude_pattern = re.compile('|'.join(exclude_regexes))


49
50
51
52
53
class DirMissing(BaseException):
    pass


def link_file(src, dest):
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
54
    cmd = os.rename if utils.iswin() else os.link
55
    try:
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
56
        cmd(src, dest)
57
    except OSError as e:
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
58
        if e.errno == errno.EEXIST:
59
            raise common.ConflictError("Cannot link, '%s' exists." % dest)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
60
        if e.errno in [errno.ENOTDIR, errno.EINVAL]:
61
62
            raise common.ConflictError(
                "Cannot link, missing path for '%s'." % dest)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
63
        if e.errno == errno.ENOENT:
64
65
66
67
68
69
70
            raise DirMissing()


def make_dirs(path):
    try:
        os.makedirs(path)
    except OSError as e:
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
71
        if e.errno == errno.EEXIST and os.path.isdir(path):
72
            return
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
73
        if e.errno in [errno.EEXIST, errno.ENOTDIR, errno.ENOENT]:
74
75
76
77
            raise common.ConflictError("Cannot make dir '%s'." % path)
        raise


78
79
80
81
82
83
84
85
86
def list_dir(path):
    try:
        return os.listdir(path)
    except OSError as e:
        if e.errno in [errno.ENOTDIR, errno.ENOENT, errno.EINVAL]:
            return []
        raise


87
88
89
90
91
92
93
94
95
96
psutil_open_files = \
    (lambda proc: proc.open_files()) if psutil.version_info[0] >= 2 else \
    (lambda proc: proc.get_open_files())


def file_is_open(path):
    for proc in psutil.process_iter():
        try:
            flist = psutil_open_files(proc)
            for nt in flist:
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
97
98
99
100
101
                try:
                    nt_path = utils.to_unicode(nt.path)
                except UnicodeDecodeError as e:
                    continue
                if nt_path == path:
102
                    return True
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
103
        except psutil.Error:
104
105
106
107
            pass
    return False


108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
def is_iso_date(tstamp):
    try:
        datetime.datetime.strptime(tstamp, "%Y-%m-%dT%H:%M:%S.%f")
        return True
    except ValueError:
        return False


def get_orig_name(filename):
    parts = filename.split('_')
    if len(parts) < 3 or parts[-1] != utils.NODE or not is_iso_date(parts[-2]):
        return filename
    orig = "_".join(parts[:-2])
    if not orig:
        return filename
    return orig


126
def mk_stash_name(filename):
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
127
    tstamp = utils.str_time_stamp()
128
129
    orig = get_orig_name(filename)
    return orig + '_' + tstamp + '_' + utils.NODE
130
131
132


def eq_float(f1, f2):
133
    return abs(f1 - f2) < DEFAULT_MTIME_PRECISION
134
135
136


def files_equal(f1, f2):
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
137
    logger.debug("Comparing files: '%s', '%s'" % (f1, f2))
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
138
139
140
    try:
        return filecmp.cmp(f1, f2, shallow=False)
    except OSError as e:
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
141
        if e.errno in [errno.ENOENT, errno.ENOTDIR]:
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
142
143
            return False
        raise
144
145
146
147
148
149


def info_is_unhandled(info):
    return info != {} and info[LOCALFS_TYPE] == common.T_UNHANDLED


150
151
def local_path_changes(settings, path, state, unhandled_equal=True):
    live_info = get_live_info(settings, path)
152
    info = state.info
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
153
    if is_info_eq(live_info, info, unhandled_equal):
154
155
156
157
        return None
    return live_info


158
159
160
161
162
163
def is_actual_path(path):
    prefix = path.rstrip(os.path.sep)
    while True:
        prefix, basename = os.path.split(prefix)
        if not basename:
            return True
164
        if not basename in list_dir(prefix):
165
166
167
            return False

def get_live_info(settings, path):
168
169
    if path is None:
        return {}
170
171
    if settings.case_insensitive:
        if not is_actual_path(path):
172
            return {}
173
    stats, status = get_local_status(path)
174
175
176
177
178
179
    if status == LOCAL_MISSING:
        return {}
    if status in [LOCAL_SOFTLINK, LOCAL_OTHER]:
        return {LOCALFS_TYPE: common.T_UNHANDLED}
    if status in [LOCAL_EMPTY_DIR, LOCAL_NONEMPTY_DIR]:
        return {LOCALFS_TYPE: common.T_DIR}
180
    live_info = {LOCALFS_MTIME: stats.st_mtime,
181
                 LOCALFS_SIZE: stats[stat.ST_SIZE],
182
183
184
185
186
                 LOCALFS_TYPE: common.T_FILE,
                 }
    return live_info


187
def stat_file(path):
188
    try:
189
        return os.lstat(path)
190
    except OSError as e:
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
191
        if e.errno in [errno.ENOENT, errno.ENOTDIR]:
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
            return None
        raise


LOCALFS_TYPE = "localfs_type"
LOCALFS_MTIME = "localfs_mtime"
LOCALFS_SIZE = "localfs_size"


def status_of_info(info):
    if info == {}:
        return LOCAL_MISSING
    if info[LOCALFS_TYPE] == common.T_DIR:
        return LOCAL_EMPTY_DIR
    if info[LOCALFS_TYPE] == common.T_UNHANDLED:
        return LOCAL_OTHER  # shouldn't happen
    return LOCAL_FILE


211
212
def get_local_status(path, attempt=0):
    stats = stat_file(path)
213
    try:
214
        status = _get_local_status_from_stats(stats, path)
215
    except OSError as e:
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
        logger.warning("Got error '%s' while listing dir '%s'" % (e, path))
        if attempt > 2:
            raise
        return get_local_status(path, attempt + 1)
    return stats, status


def _get_local_status_from_stats(stats, path):
    if stats is None:
        return LOCAL_MISSING
    mode = stats[stat.ST_MODE]
    if stat.S_ISLNK(mode):
        return LOCAL_SOFTLINK
    if stat.S_ISREG(mode):
        return LOCAL_FILE
    if stat.S_ISDIR(mode):
232
        if list_dir(path):
233
234
235
236
237
238
239
240
            return LOCAL_NONEMPTY_DIR
        return LOCAL_EMPTY_DIR
    return LOCAL_OTHER


def path_status(path):
    stats, status = get_local_status(path)
    return status
241
242


Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
243
244
245
246
def info_of_regular_file(info):
    return info and info[LOCALFS_TYPE] == common.T_FILE


Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
247
def is_info_eq(info1, info2, unhandled_equal=True):
248
249
250
251
252
    if {} in [info1, info2]:
        return info1 == info2
    if info1[LOCALFS_TYPE] != info2[LOCALFS_TYPE]:
        return False
    if info1[LOCALFS_TYPE] == common.T_UNHANDLED:
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
253
        return unhandled_equal
254
255
256
257
258
259
260
    if info1[LOCALFS_TYPE] == common.T_DIR:
        return True
    return eq_float(info1[LOCALFS_MTIME], info2[LOCALFS_MTIME]) \
        and info1[LOCALFS_SIZE] == info2[LOCALFS_SIZE]


class LocalfsTargetHandle(object):
261
262
263
    def __init__(self, client, target_state):
        self.client = client
        settings = client.settings
264
        self.settings = settings
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
265
        self.SIGNATURE = "LocalfsTargetHandle"
266
267
268
269
        self.rootpath = settings.local_root_path
        self.cache_hide_name = settings.cache_hide_name
        self.cache_hide_path = settings.cache_hide_path
        self.cache_path = settings.cache_path
270
271
        self.syncer_dbtuple = settings.syncer_dbtuple
        self.client_dbtuple = client.client_dbtuple
272
        self.mtime_lag = settings.mtime_lag
273
        self.target_state = target_state
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
274
        self.objname = target_state.objname
275
        self.fspath = utils.join_path(self.rootpath, self.objname)
276
277
278
        self.hidden_filename = None
        self.hidden_path = None

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
279
280
    def get_path_in_cache(self, name):
        return utils.join_path(self.cache_path, name)
281
282

    def register_hidden_name(self, filename):
283
284
285
286
        with TransactedConnection(self.client_dbtuple) as db:
            return self._register_hidden_name(db, filename)

    def _register_hidden_name(self, db, filename):
287
288
289
        f = utils.hash_string(filename)
        hide_filename = utils.join_path(self.cache_hide_name, f)
        self.hidden_filename = hide_filename
290
        self.hidden_path = self.get_path_in_cache(self.hidden_filename)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
291
        if db.get_cachename(hide_filename):
292
            return False
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
293
        db.insert_cachename(hide_filename, self.SIGNATURE, filename)
294
295
296
        return True

    def unregister_hidden_name(self, hidden_filename):
297
298
        with TransactedConnection(self.client_dbtuple) as db:
            db.delete_cachename(hidden_filename)
299
        self.hidden_filename = None
300
        self.hidden_path = None
301

302
    def move_file(self):
303
304
        fspath = self.fspath
        if file_is_open(fspath):
305
            raise common.BusyError("File '%s' is open. Aborting."
306
                                   % fspath)
307

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
308
        new_registered = self.register_hidden_name(self.objname)
309
        hidden_filename = self.hidden_filename
310
        hidden_path = self.hidden_path
311
312
313

        if not new_registered:
            logger.warning("Hiding already registered for file %s" %
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
314
                           (self.objname,))
315
316
            if os.path.lexists(hidden_path):
                logger.warning("File %s already hidden at %s" %
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
317
                               (self.objname, hidden_path))
318
319
                return
        try:
320
            os.rename(fspath, hidden_path)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
321
322
            logger.debug("Hiding file '%s' to '%s'" %
                         (fspath, hidden_path))
323
        except OSError as e:
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
324
            if e.errno in [errno.ENOENT, errno.ENOTDIR]:
325
                self.unregister_hidden_name(hidden_filename)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
326
                logger.debug("File '%s' does not exist" % fspath)
327
                return
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
328
329
330
331
            elif e.errno == errno.EACCES:
                self.unregister_hidden_name(hidden_filename)
                raise common.BusyError("File '%' is open. Undoing." %
                                       hidden_path)
332
333
            else:
                raise e
334
335
336
337
338
339
340
341
342
343
344
345
346

    def hide_file(self):
        self.move_file()
        if self.hidden_filename is not None:
            if file_is_open(self.hidden_path):
                os.rename(self.hidden_path, self.fspath)
                self.unregister_hidden_name(self.hidden_filename)
                raise common.BusyError("File '%s' is open. Undoing." %
                                       self.hidden_path)
            if path_status(self.hidden_path) == LOCAL_NONEMPTY_DIR:
                os.rename(self.hidden_path, self.fspath)
                self.unregister_hidden_name(self.hidden_filename)
                raise common.ConflictError("'%s' is non-empty" % self.fspath)
347

348
    def apply(self, fetched_path, fetched_live_info, sync_state):
349
        local_status = path_status(self.fspath)
350
351
352
353
354
355
356
        fetched_status = status_of_info(fetched_live_info)
        if local_status in [LOCAL_EMPTY_DIR, LOCAL_NONEMPTY_DIR] \
                and fetched_status == LOCAL_EMPTY_DIR:
            return
        if local_status == LOCAL_MISSING and fetched_status == LOCAL_MISSING:
            return
        if local_status == LOCAL_NONEMPTY_DIR:
357
            raise common.ConflictError("'%s' is non-empty" % self.fspath)
358

359
360
        self.prepare(fetched_path, sync_state)
        self.finalize(fetched_path, fetched_live_info)
361
        self.cleanup(self.hidden_path)
362
363
        if self.hidden_filename is not None:
            self.unregister_hidden_name(self.hidden_filename)
364

365
    def prepare(self, fetched_path, sync_state):
366
        self.hide_file()
367
        info_changed = local_path_changes(self.settings,
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
368
            self.hidden_path, sync_state, unhandled_equal=False)
369
        if info_changed is not None and info_changed != {}:
370
            if not files_equal(self.hidden_path, fetched_path):
371
372
373
                self.stash_file()

    def stash_file(self):
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
374
375
        stash_name = mk_stash_name(self.objname)
        stash_path = utils.join_path(self.rootpath, stash_name)
376
377
378
        msg = messaging.ConflictStashMessage(
            objname=self.objname, stash_name=stash_name, logger=logger)
        self.settings.messager.put(msg)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
379
        os.rename(self.hidden_path, stash_path)
380

381
    def finalize(self, filepath, live_info):
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
382
        logger.debug("Finalizing file '%s'" % filepath)
383
384
        if live_info == {}:
            return
385
        if live_info[LOCALFS_TYPE] == common.T_FILE:
386
            time.sleep(self.mtime_lag)
387
            try:
388
                link_file(filepath, self.fspath)
389
            except DirMissing:
390
                make_dirs(os.path.dirname(self.fspath))
391
                link_file(filepath, self.fspath)
392
        elif live_info[LOCALFS_TYPE] == common.T_DIR:
393
            make_dirs(self.fspath)
394
395
        else:
            raise AssertionError("info for fetched file '%s' is %s" %
396
                                 (filepath, live_info))
397

398
399
    def cleanup(self, filepath):
        if filepath is None:
400
            return
401
        status = path_status(filepath)
402
403
        if status == LOCAL_FILE:
            try:
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
404
                logger.debug("Cleaning up file '%s'" % filepath)
405
                os.unlink(filepath)
406
407
408
            except:
                pass
        elif status in [LOCAL_EMPTY_DIR, LOCAL_NONEMPTY_DIR]:
409
            os.rmdir(filepath)
410
411

    def pull(self, source_handle, sync_state):
412
        fetched_path = source_handle.send_file(sync_state)
413
        fetched_live_info = get_live_info(self.settings, fetched_path)
414
415
        self.apply(fetched_path, fetched_live_info, sync_state)
        self.cleanup(fetched_path)
416
417
418
419
420
        return self.target_state.set(info=fetched_live_info)


class LocalfsSourceHandle(object):
    def register_stage_name(self, filename):
421
422
423
424
        with TransactedConnection(self.client_dbtuple) as db:
            return self._register_stage_name(db, filename)

    def _register_stage_name(self, db, filename):
425
426
427
        f = utils.hash_string(filename)
        stage_filename = utils.join_path(self.cache_stage_name, f)
        self.stage_filename = stage_filename
428
429
        stage_path = self.get_path_in_cache(stage_filename)
        self.staged_path = stage_path
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
430
        if db.get_cachename(stage_filename):
431
            return False
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
432
        db.insert_cachename(stage_filename, self.SIGNATURE, filename)
433
434
435
        return True

    def unregister_stage_name(self, stage_filename):
436
437
        with TransactedConnection(self.client_dbtuple) as db:
            db.delete_cachename(stage_filename)
438
        self.stage_filename = None
439
        self.staged_path = None
440

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
441
442
    def get_path_in_cache(self, name):
        return utils.join_path(self.cache_path, name)
443

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
444
    def copy_file(self):
445
        fspath = self.fspath
446
        if file_is_open(fspath):
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
447
448
            raise common.OpenBusyError("File '%s' is open. Aborting"
                                       % fspath)
449
        new_registered = self.register_stage_name(fspath)
450
        stage_filename = self.stage_filename
451
        stage_path = self.staged_path
452
453
454

        if not new_registered:
            logger.warning("Staging already registered for file %s" %
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
455
                           (self.objname,))
456
457
            if os.path.lexists(stage_path):
                logger.warning("File %s already staged at %s" %
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
458
                               (self.objname, stage_path))
459
                return
460

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
461
        logger.debug("Staging file '%s' to '%s'" % (self.objname, stage_path))
462
        try:
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
463
464
            shutil.copy2(fspath, stage_path)
        except IOError as e:
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
465
            if e.errno in [errno.ENOENT, errno.EISDIR]:
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
466
                logger.debug("Source is not a regular file: '%s'" % fspath)
467
                self.unregister_stage_name(stage_filename)
468
                return
469
470
            else:
                raise e
471

472
    def stage_file(self):
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
473
        self.copy_file()
474
        live_info = get_live_info(self.settings, self.fspath)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
        self.check_staged(live_info)
        self.check_update_source_state(live_info)

    def check_staged(self, live_info):
        is_reg = info_of_regular_file(live_info)

        if self.staged_path is None:
            if is_reg:
                m = ("File '%s' is not in a stable state; unstaged"
                     % self.objname)
                logger.warning(m)
                raise common.NotStableBusyError(m)
            return

        if not is_reg:
            os.unlink(self.staged_path)
            self.unregister_stage_name(self.stage_filename)
            logger.warning("Path '%s' is not a regular file; unstaged")
            return

        if file_is_open(self.fspath):
            os.unlink(self.staged_path)
            self.unregister_stage_name(self.stage_filename)
            m = "File '%s' is open; unstaged" % self.objname
            logger.warning(m)
            raise common.OpenBusyError(m)

        if not files_equal(self.staged_path, self.fspath):
            os.unlink(self.staged_path)
            self.unregister_stage_name(self.stage_filename)
            m = "File '%s' contents have changed; unstaged" % self.objname
            logger.warning(m)
            raise common.ChangedBusyError(m)
508

509
510
511
    def __init__(self, client, source_state):
        self.client = client
        settings = client.settings
512
        self.settings = settings
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
513
        self.SIGNATURE = "LocalfsSourceHandle"
514
515
516
517
        self.rootpath = settings.local_root_path
        self.cache_stage_name = settings.cache_stage_name
        self.cache_stage_path = settings.cache_stage_path
        self.cache_path = settings.cache_path
518
519
        self.syncer_dbtuple = settings.syncer_dbtuple
        self.client_dbtuple = client.client_dbtuple
520
        self.source_state = source_state
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
521
        self.objname = source_state.objname
522
        self.fspath = utils.join_path(self.rootpath, self.objname)
523
524
525
        self.stage_filename = None
        self.staged_path = None
        self.heartbeat = settings.heartbeat
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
526
        if info_of_regular_file(self.source_state.info):
527
            self.stage_file()
528

529
    def update_state(self, state):
530
531
        with TransactedConnection(self.syncer_dbtuple) as db:
            db.put_state(state)
532

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
533
534
    def check_update_source_state(self, live_info):
        if not is_info_eq(live_info, self.source_state.info):
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
535
536
537
538
            msg = messaging.LiveInfoUpdateMessage(
                archive=self.SIGNATURE, objname=self.objname,
                info=live_info, logger=logger)
            self.settings.messager.put(msg)
539
540
541
542
            new_state = self.source_state.set(info=live_info)
            self.update_state(new_state)
            self.source_state = new_state

543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
    def get_synced_state(self):
        return self.source_state

    def info_is_dir(self):
        try:
            return self.source_state.info[LOCALFS_TYPE] == common.T_DIR
        except KeyError:
            return False

    def info_is_deleted(self):
        return self.source_state.info == {}

    def info_is_deleted_or_unhandled(self):
        return self.source_state.info == {} \
            or self.source_state.info[LOCALFS_TYPE] == common.T_UNHANDLED

    def stash_staged_file(self):
560
        stash_filename = mk_stash_name(self.fspath)
561
        logger.warning("Stashing file '%s' to '%s'" %
562
                       (self.fspath, stash_filename))
563
564
565
566
567
568
        os.rename(self.staged_path, stash_filename)

    def unstage_file(self):
        if self.stage_filename is None:
            return
        staged_path = self.staged_path
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
569
        os.unlink(staged_path)
570
        self.unregister_stage_name(self.stage_filename)
571
572
573
574
575


class LocalfsFileClient(FileClient):
    def __init__(self, settings):
        self.settings = settings
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
576
        self.SIGNATURE = "LocalfsFileClient"
577
578
        self.ROOTPATH = settings.local_root_path
        self.CACHEPATH = settings.cache_path
579
580
581
582
583
584
        self.syncer_dbtuple = settings.syncer_dbtuple
        client_dbname = self.SIGNATURE+'.db'
        self.client_dbtuple = common.DBTuple(
            dbtype=database.ClientDB,
            dbname=utils.join_path(settings.instance_path, client_dbname))
        database.initialize(self.client_dbtuple)
585
        self.probe_candidates = utils.ThreadSafeDict()
586
587
588
589
590
        self.check_enabled()

    def check_enabled(self):
        if not self.settings.localfs_is_enabled():
            msg = messaging.LocalfsSyncDisabled(logger=logger)
591
592
593
        else:
            msg = messaging.LocalfsSyncEnabled(logger=logger)
        self.settings.messager.put(msg)
594

595
596
597
598
599
600
601
602
603
604
    def remove_candidates(self, objnames, ident):
        with self.probe_candidates.lock() as d:
            for objname in objnames:
                try:
                    cached = d.pop(objname)
                    if cached["ident"] != ident:
                        d[objname] = cached
                except KeyError:
                    pass

605
    def list_candidate_files(self, forced=False):
606
607
608
609
610
611
612
613
        if not self.settings.localfs_is_enabled():
            return {}
        if not os.path.isdir(self.ROOTPATH):
            self.settings.set_localfs_enabled(False)
            msg = messaging.LocalfsSyncDisabled(logger=logger)
            self.settings.messager.put(msg)
            return {}

614
615
616
617
618
        with self.probe_candidates.lock() as d:
            if forced:
                candidates = self.walk_filesystem()
                d.update(candidates)
            return d.keys()
619

620
621
622
    def none_info(self):
        return {"ident": None, "info": None}

623
    def walk_filesystem(self):
624
        candidates = {}
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
625
626
627
628
629
630
        rootpath = utils.from_unicode(self.ROOTPATH)
        for dirpath, dirnames, files in os.walk(rootpath):
            try:
                dirpath = utils.to_unicode(dirpath)
            except UnicodeDecodeError as e:
                continue
631
632
633
            rel_dirpath = os.path.relpath(dirpath, start=self.ROOTPATH)
            logger.debug("'%s' '%s'" % (dirpath, rel_dirpath))
            if rel_dirpath != '.':
634
635
                objname = utils.to_standard_sep(rel_dirpath)
                candidates[objname] = self.none_info()
636
            for filename in files:
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
637
638
639
640
                try:
                    filename = utils.to_unicode(filename)
                except UnicodeDecodeError as e:
                    continue
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
641
642
643
644
645
                if rel_dirpath == '.':
                    prefix = ""
                else:
                    prefix = utils.to_standard_sep(rel_dirpath)
                objname = utils.join_objname(prefix, filename)
646
                candidates[objname] = self.none_info()
647

648
        db_cands = dict((name, self.none_info())
649
                        for name in self.list_files())
650
        candidates.update(db_cands)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
651
        logger.debug("Candidates: %s" % candidates)
652
        return candidates
653

654
    def list_files(self):
655
656
        with TransactedConnection(self.syncer_dbtuple) as db:
            return db.list_files(self.SIGNATURE)
657

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
658
659
    def _local_path_changes(self, name, state):
        local_path = utils.join_path(self.ROOTPATH, name)
660
        return local_path_changes(self.settings, local_path, state)
661

662
663
664
665
666
667
668
669
    def exclude_file(self, objname):
        parts = objname.split(common.OBJECT_DIRSEP)
        init_part = parts[0]
        if init_part in [self.settings.cache_name]:
            return True
        final_part = parts[-1]
        return exclude_pattern.match(final_part)

670
    def probe_file(self, objname, old_state, ref_state, ident):
671
        with self.probe_candidates.lock() as d:
672
673
674
675
676
677
678
            try:
                cached = d[objname]
                cached_info = cached["info"]
                cached["ident"] = ident
            except KeyError:
                cached_info = None

679
        if self.exclude_file(objname):
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
680
681
682
            msg = messaging.IgnoreProbeMessage(
                archive=old_state.archive, objname=objname, logger=logger)
            self.settings.messager.put(msg)
683
684
            return

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
685
        live_info = (self._local_path_changes(objname, old_state)
686
                     if cached_info is None else cached_info)
687
688
689
        if live_info is None:
            return
        live_state = old_state.set(info=live_info)
690
        return live_state
691
692

    def stage_file(self, source_state):
693
        return LocalfsSourceHandle(self, source_state)
694
695

    def prepare_target(self, target_state):
696
        return LocalfsTargetHandle(self, target_state)
697

698
    def get_dir_contents(self, objname):
699
        logger.debug("Getting contents for object '%s'" % objname)
700
701
        with TransactedConnection(self.syncer_dbtuple) as db:
            return db.get_dir_contents(self.SIGNATURE, objname)
702

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
703
    def notifier(self):
704
        def handle_path(path, rec=False):
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
705
706
707
708
            try:
                path = utils.to_unicode(path)
            except UnicodeDecodeError as e:
                return
709
710
            if path.startswith(self.CACHEPATH):
                return
711
            rel_path = os.path.relpath(path, start=self.ROOTPATH)
712
713
            if rel_path == '.':
                return
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
714
            objname = utils.to_standard_sep(rel_path)
715
            leaves = self.get_dir_contents(objname) if rec else None
716
            with self.probe_candidates.lock() as d:
717
                d[objname] = self.none_info()
718
719
720
                if rec:
                    for leaf in leaves:
                        d[leaf] = self.none_info()
721

722
        root_path = utils.from_unicode(self.ROOTPATH)
723
724
725
726
727
        class EventHandler(FileSystemEventHandler):
            def on_created(this, event):
                # if not event.is_directory:
                #     return
                path = event.src_path
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
728
                logger.debug("Handling %s" % event)
729
730
731
732
                handle_path(path)

            def on_deleted(this, event):
                path = event.src_path
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
733
                logger.debug("Handling %s" % event)
734
                if utils.normalize_local_suffix(path) == root_path:
735
736
737
                    self.settings.set_localfs_enabled(False)
                    msg = messaging.LocalfsSyncDisabled(logger=logger)
                    self.settings.messager.put(msg)
738
                    return
739
                handle_path(path, rec=utils.iswin())
740
741
742
743
744

            def on_modified(this, event):
                if event.is_directory:
                    return
                path = event.src_path
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
745
                logger.debug("Handling %s" % event)
746
747
748
749
750
                handle_path(path)

            def on_moved(this, event):
                src_path = event.src_path
                dest_path = event.dest_path
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
751
                logger.debug("Handling %s" % event)
752
753
754
755
756
                handle_path(src_path)
                handle_path(dest_path)

        event_handler = EventHandler()
        observer = Observer()
757
        observer.schedule(event_handler, root_path, recursive=True)
758
759
        observer.start()
        return observer