localfs_client.py 24 KB
Newer Older
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# Copyright (C) 2015 GRNET S.A.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

16
import os
17
import stat
18
19
20
import re
import datetime
import psutil
21
import time
22
import filecmp
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
23
import shutil
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
24
import errno
25

26
27
28
29
30
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
import logging

from agkyra.syncer.file_client import FileClient
31
from agkyra.syncer import utils, common, messaging
32
33
34
35
36
37
38
39
40
41
42
from agkyra.syncer.database import transaction

logger = logging.getLogger(__name__)

LOCAL_FILE = 0
LOCAL_EMPTY_DIR = 1
LOCAL_NONEMPTY_DIR = 2
LOCAL_MISSING = 3
LOCAL_SOFTLINK = 4
LOCAL_OTHER = 5

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
43
DEFAULT_MTIME_PRECISION = 1e-4
44

45
46
47
48
exclude_regexes = ["\.#", "\.~", "~\$", "~.*\.tmp$", "\..*\.swp$"]
exclude_pattern = re.compile('|'.join(exclude_regexes))


49
50
51
52
53
class DirMissing(BaseException):
    pass


def link_file(src, dest):
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
54
    cmd = os.rename if utils.iswin() else os.link
55
    try:
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
56
        cmd(src, dest)
57
    except OSError as e:
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
58
        if e.errno == errno.EEXIST:
59
            raise common.ConflictError("Cannot link, '%s' exists." % dest)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
60
        if e.errno in [errno.ENOTDIR, errno.EINVAL]:
61
62
            raise common.ConflictError(
                "Cannot link, missing path for '%s'." % dest)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
63
        if e.errno == errno.ENOENT:
64
65
66
67
68
69
70
            raise DirMissing()


def make_dirs(path):
    try:
        os.makedirs(path)
    except OSError as e:
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
71
        if e.errno == errno.EEXIST and os.path.isdir(path):
72
            return
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
73
        if e.errno in [errno.EEXIST, errno.ENOTDIR, errno.ENOENT]:
74
75
76
77
78
79
80
81
82
83
84
85
86
87
            raise common.ConflictError("Cannot make dir '%s'." % path)
        raise


psutil_open_files = \
    (lambda proc: proc.open_files()) if psutil.version_info[0] >= 2 else \
    (lambda proc: proc.get_open_files())


def file_is_open(path):
    for proc in psutil.process_iter():
        try:
            flist = psutil_open_files(proc)
            for nt in flist:
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
88
89
90
91
92
                try:
                    nt_path = utils.to_unicode(nt.path)
                except UnicodeDecodeError as e:
                    continue
                if nt_path == path:
93
                    return True
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
94
        except psutil.Error:
95
96
97
98
            pass
    return False


99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
def is_iso_date(tstamp):
    try:
        datetime.datetime.strptime(tstamp, "%Y-%m-%dT%H:%M:%S.%f")
        return True
    except ValueError:
        return False


def get_orig_name(filename):
    parts = filename.split('_')
    if len(parts) < 3 or parts[-1] != utils.NODE or not is_iso_date(parts[-2]):
        return filename
    orig = "_".join(parts[:-2])
    if not orig:
        return filename
    return orig


117
def mk_stash_name(filename):
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
118
    tstamp = utils.str_time_stamp()
119
120
    orig = get_orig_name(filename)
    return orig + '_' + tstamp + '_' + utils.NODE
121
122
123


def eq_float(f1, f2):
124
    return abs(f1 - f2) < DEFAULT_MTIME_PRECISION
125
126
127


def files_equal(f1, f2):
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
128
    logger.debug("Comparing files: '%s', '%s'" % (f1, f2))
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
129
130
131
    try:
        return filecmp.cmp(f1, f2, shallow=False)
    except OSError as e:
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
132
        if e.errno in [errno.ENOENT, errno.ENOTDIR]:
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
133
134
            return False
        raise
135
136
137
138
139
140


def info_is_unhandled(info):
    return info != {} and info[LOCALFS_TYPE] == common.T_UNHANDLED


Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
141
def local_path_changes(path, state, unhandled_equal=True):
142
143
    live_info = get_live_info(path)
    info = state.info
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
144
    if is_info_eq(live_info, info, unhandled_equal):
145
146
147
148
149
150
151
        return None
    return live_info


def get_live_info(path):
    if path is None:
        return {}
152
    stats, status = get_local_status(path)
153
154
155
156
157
158
    if status == LOCAL_MISSING:
        return {}
    if status in [LOCAL_SOFTLINK, LOCAL_OTHER]:
        return {LOCALFS_TYPE: common.T_UNHANDLED}
    if status in [LOCAL_EMPTY_DIR, LOCAL_NONEMPTY_DIR]:
        return {LOCALFS_TYPE: common.T_DIR}
159
    live_info = {LOCALFS_MTIME: stats.st_mtime,
160
                 LOCALFS_SIZE: stats[stat.ST_SIZE],
161
162
163
164
165
                 LOCALFS_TYPE: common.T_FILE,
                 }
    return live_info


166
def stat_file(path):
167
    try:
168
        return os.lstat(path)
169
    except OSError as e:
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
170
        if e.errno in [errno.ENOENT, errno.ENOTDIR]:
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
            return None
        raise


LOCALFS_TYPE = "localfs_type"
LOCALFS_MTIME = "localfs_mtime"
LOCALFS_SIZE = "localfs_size"


def status_of_info(info):
    if info == {}:
        return LOCAL_MISSING
    if info[LOCALFS_TYPE] == common.T_DIR:
        return LOCAL_EMPTY_DIR
    if info[LOCALFS_TYPE] == common.T_UNHANDLED:
        return LOCAL_OTHER  # shouldn't happen
    return LOCAL_FILE


190
191
def get_local_status(path, attempt=0):
    stats = stat_file(path)
192
    try:
193
        status = _get_local_status_from_stats(stats, path)
194
    except OSError as e:
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
        logger.warning("Got error '%s' while listing dir '%s'" % (e, path))
        if attempt > 2:
            raise
        return get_local_status(path, attempt + 1)
    return stats, status


def _get_local_status_from_stats(stats, path):
    if stats is None:
        return LOCAL_MISSING
    mode = stats[stat.ST_MODE]
    if stat.S_ISLNK(mode):
        return LOCAL_SOFTLINK
    if stat.S_ISREG(mode):
        return LOCAL_FILE
    if stat.S_ISDIR(mode):
        if os.listdir(path):
            return LOCAL_NONEMPTY_DIR
        return LOCAL_EMPTY_DIR
    return LOCAL_OTHER


def path_status(path):
    stats, status = get_local_status(path)
    return status
220
221


Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
222
223
224
225
def info_of_regular_file(info):
    return info and info[LOCALFS_TYPE] == common.T_FILE


Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
226
def is_info_eq(info1, info2, unhandled_equal=True):
227
228
229
230
231
    if {} in [info1, info2]:
        return info1 == info2
    if info1[LOCALFS_TYPE] != info2[LOCALFS_TYPE]:
        return False
    if info1[LOCALFS_TYPE] == common.T_UNHANDLED:
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
232
        return unhandled_equal
233
234
235
236
237
238
239
240
    if info1[LOCALFS_TYPE] == common.T_DIR:
        return True
    return eq_float(info1[LOCALFS_MTIME], info2[LOCALFS_MTIME]) \
        and info1[LOCALFS_SIZE] == info2[LOCALFS_SIZE]


class LocalfsTargetHandle(object):
    def __init__(self, settings, target_state):
241
        self.settings = settings
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
242
        self.SIGNATURE = "LocalfsTargetHandle"
243
244
245
246
247
        self.rootpath = settings.local_root_path
        self.cache_hide_name = settings.cache_hide_name
        self.cache_hide_path = settings.cache_hide_path
        self.cache_path = settings.cache_path
        self.get_db = settings.get_db
248
        self.mtime_lag = settings.mtime_lag
249
        self.target_state = target_state
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
250
        self.objname = target_state.objname
251
        self.fspath = utils.join_path(self.rootpath, self.objname)
252
253
254
        self.hidden_filename = None
        self.hidden_path = None

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
255
256
    def get_path_in_cache(self, name):
        return utils.join_path(self.cache_path, name)
257
258
259
260
261
262
263

    @transaction()
    def register_hidden_name(self, filename):
        db = self.get_db()
        f = utils.hash_string(filename)
        hide_filename = utils.join_path(self.cache_hide_name, f)
        self.hidden_filename = hide_filename
264
        self.hidden_path = self.get_path_in_cache(self.hidden_filename)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
265
        if db.get_cachename(hide_filename):
266
            return False
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
267
        db.insert_cachename(hide_filename, self.SIGNATURE, filename)
268
269
270
271
272
        return True

    @transaction()
    def unregister_hidden_name(self, hidden_filename):
        db = self.get_db()
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
273
        db.delete_cachename(hidden_filename)
274
        self.hidden_filename = None
275
        self.hidden_path = None
276

277
    def move_file(self):
278
279
        fspath = self.fspath
        if file_is_open(fspath):
280
            raise common.BusyError("File '%s' is open. Aborting."
281
                                   % fspath)
282

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
283
        new_registered = self.register_hidden_name(self.objname)
284
        hidden_filename = self.hidden_filename
285
        hidden_path = self.hidden_path
286
287
288

        if not new_registered:
            logger.warning("Hiding already registered for file %s" %
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
289
                           (self.objname,))
290
291
            if os.path.lexists(hidden_path):
                logger.warning("File %s already hidden at %s" %
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
292
                               (self.objname, hidden_path))
293
294
                return
        try:
295
            os.rename(fspath, hidden_path)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
296
297
            logger.debug("Hiding file '%s' to '%s'" %
                         (fspath, hidden_path))
298
        except OSError as e:
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
299
            if e.errno in [errno.ENOENT, errno.ENOTDIR]:
300
                self.unregister_hidden_name(hidden_filename)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
301
                logger.debug("File '%s' does not exist" % fspath)
302
                return
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
303
304
305
306
            elif e.errno == errno.EACCES:
                self.unregister_hidden_name(hidden_filename)
                raise common.BusyError("File '%' is open. Undoing." %
                                       hidden_path)
307
308
            else:
                raise e
309
310
311
312
313
314
315
316
317
318
319
320
321

    def hide_file(self):
        self.move_file()
        if self.hidden_filename is not None:
            if file_is_open(self.hidden_path):
                os.rename(self.hidden_path, self.fspath)
                self.unregister_hidden_name(self.hidden_filename)
                raise common.BusyError("File '%s' is open. Undoing." %
                                       self.hidden_path)
            if path_status(self.hidden_path) == LOCAL_NONEMPTY_DIR:
                os.rename(self.hidden_path, self.fspath)
                self.unregister_hidden_name(self.hidden_filename)
                raise common.ConflictError("'%s' is non-empty" % self.fspath)
322

323
    def apply(self, fetched_path, fetched_live_info, sync_state):
324
        local_status = path_status(self.fspath)
325
326
327
328
329
330
331
        fetched_status = status_of_info(fetched_live_info)
        if local_status in [LOCAL_EMPTY_DIR, LOCAL_NONEMPTY_DIR] \
                and fetched_status == LOCAL_EMPTY_DIR:
            return
        if local_status == LOCAL_MISSING and fetched_status == LOCAL_MISSING:
            return
        if local_status == LOCAL_NONEMPTY_DIR:
332
            raise common.ConflictError("'%s' is non-empty" % self.fspath)
333

334
335
        self.prepare(fetched_path, sync_state)
        self.finalize(fetched_path, fetched_live_info)
336
337
338
        self.cleanup(self.hidden_path)
        self.unregister_hidden_name(self.hidden_filename)

339
    def prepare(self, fetched_path, sync_state):
340
        self.hide_file()
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
341
342
        info_changed = local_path_changes(
            self.hidden_path, sync_state, unhandled_equal=False)
343
        if info_changed is not None and info_changed != {}:
344
            if not files_equal(self.hidden_path, fetched_path):
345
346
347
                self.stash_file()

    def stash_file(self):
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
348
349
        stash_name = mk_stash_name(self.objname)
        stash_path = utils.join_path(self.rootpath, stash_name)
350
351
352
        msg = messaging.ConflictStashMessage(
            objname=self.objname, stash_name=stash_name, logger=logger)
        self.settings.messager.put(msg)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
353
        os.rename(self.hidden_path, stash_path)
354

355
    def finalize(self, filepath, live_info):
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
356
        logger.debug("Finalizing file '%s'" % filepath)
357
358
        if live_info == {}:
            return
359
        if live_info[LOCALFS_TYPE] == common.T_FILE:
360
            time.sleep(self.mtime_lag)
361
            try:
362
                link_file(filepath, self.fspath)
363
            except DirMissing:
364
                make_dirs(os.path.dirname(self.fspath))
365
                link_file(filepath, self.fspath)
366
        elif live_info[LOCALFS_TYPE] == common.T_DIR:
367
            make_dirs(self.fspath)
368
369
        else:
            raise AssertionError("info for fetched file '%s' is %s" %
370
                                 (filepath, live_info))
371

372
373
    def cleanup(self, filepath):
        if filepath is None:
374
            return
375
        status = path_status(filepath)
376
377
        if status == LOCAL_FILE:
            try:
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
378
                logger.debug("Cleaning up file '%s'" % filepath)
379
                os.unlink(filepath)
380
381
382
            except:
                pass
        elif status in [LOCAL_EMPTY_DIR, LOCAL_NONEMPTY_DIR]:
383
            os.rmdir(filepath)
384
385

    def pull(self, source_handle, sync_state):
386
387
388
389
        fetched_path = source_handle.send_file(sync_state)
        fetched_live_info = get_live_info(fetched_path)
        self.apply(fetched_path, fetched_live_info, sync_state)
        self.cleanup(fetched_path)
390
391
392
393
394
395
396
397
398
399
        return self.target_state.set(info=fetched_live_info)


class LocalfsSourceHandle(object):
    @transaction()
    def register_stage_name(self, filename):
        db = self.get_db()
        f = utils.hash_string(filename)
        stage_filename = utils.join_path(self.cache_stage_name, f)
        self.stage_filename = stage_filename
400
401
        stage_path = self.get_path_in_cache(stage_filename)
        self.staged_path = stage_path
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
402
        if db.get_cachename(stage_filename):
403
            return False
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
404
        db.insert_cachename(stage_filename, self.SIGNATURE, filename)
405
406
407
408
409
        return True

    @transaction()
    def unregister_stage_name(self, stage_filename):
        db = self.get_db()
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
410
        db.delete_cachename(stage_filename)
411
        self.stage_filename = None
412
        self.staged_path = None
413

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
414
415
    def get_path_in_cache(self, name):
        return utils.join_path(self.cache_path, name)
416

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
417
    def copy_file(self):
418
        fspath = self.fspath
419
        if file_is_open(fspath):
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
420
421
            raise common.OpenBusyError("File '%s' is open. Aborting"
                                       % fspath)
422
        new_registered = self.register_stage_name(fspath)
423
        stage_filename = self.stage_filename
424
        stage_path = self.staged_path
425
426
427

        if not new_registered:
            logger.warning("Staging already registered for file %s" %
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
428
                           (self.objname,))
429
430
            if os.path.lexists(stage_path):
                logger.warning("File %s already staged at %s" %
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
431
                               (self.objname, stage_path))
432
                return
433

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
434
        logger.debug("Staging file '%s' to '%s'" % (self.objname, stage_path))
435
        try:
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
436
437
            shutil.copy2(fspath, stage_path)
        except IOError as e:
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
438
            if e.errno in [errno.ENOENT, errno.EISDIR]:
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
439
                logger.debug("Source is not a regular file: '%s'" % fspath)
440
                self.unregister_stage_name(stage_filename)
441
                return
442
443
            else:
                raise e
444

445
    def stage_file(self):
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
        self.copy_file()
        live_info = get_live_info(self.fspath)
        self.check_staged(live_info)
        self.check_update_source_state(live_info)

    def check_staged(self, live_info):
        is_reg = info_of_regular_file(live_info)

        if self.staged_path is None:
            if is_reg:
                m = ("File '%s' is not in a stable state; unstaged"
                     % self.objname)
                logger.warning(m)
                raise common.NotStableBusyError(m)
            return

        if not is_reg:
            os.unlink(self.staged_path)
            self.unregister_stage_name(self.stage_filename)
            logger.warning("Path '%s' is not a regular file; unstaged")
            return

        if file_is_open(self.fspath):
            os.unlink(self.staged_path)
            self.unregister_stage_name(self.stage_filename)
            m = "File '%s' is open; unstaged" % self.objname
            logger.warning(m)
            raise common.OpenBusyError(m)

        if not files_equal(self.staged_path, self.fspath):
            os.unlink(self.staged_path)
            self.unregister_stage_name(self.stage_filename)
            m = "File '%s' contents have changed; unstaged" % self.objname
            logger.warning(m)
            raise common.ChangedBusyError(m)
481
482

    def __init__(self, settings, source_state):
483
        self.settings = settings
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
484
        self.SIGNATURE = "LocalfsSourceHandle"
485
486
487
488
489
490
        self.rootpath = settings.local_root_path
        self.cache_stage_name = settings.cache_stage_name
        self.cache_stage_path = settings.cache_stage_path
        self.cache_path = settings.cache_path
        self.get_db = settings.get_db
        self.source_state = source_state
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
491
        self.objname = source_state.objname
492
        self.fspath = utils.join_path(self.rootpath, self.objname)
493
494
495
        self.stage_filename = None
        self.staged_path = None
        self.heartbeat = settings.heartbeat
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
496
        if info_of_regular_file(self.source_state.info):
497
            self.stage_file()
498

499
500
501
502
503
    @transaction()
    def update_state(self, state):
        db = self.get_db()
        db.put_state(state)

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
504
505
    def check_update_source_state(self, live_info):
        if not is_info_eq(live_info, self.source_state.info):
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
506
507
508
509
            msg = messaging.LiveInfoUpdateMessage(
                archive=self.SIGNATURE, objname=self.objname,
                info=live_info, logger=logger)
            self.settings.messager.put(msg)
510
511
512
513
            new_state = self.source_state.set(info=live_info)
            self.update_state(new_state)
            self.source_state = new_state

514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
    def get_synced_state(self):
        return self.source_state

    def info_is_dir(self):
        try:
            return self.source_state.info[LOCALFS_TYPE] == common.T_DIR
        except KeyError:
            return False

    def info_is_deleted(self):
        return self.source_state.info == {}

    def info_is_deleted_or_unhandled(self):
        return self.source_state.info == {} \
            or self.source_state.info[LOCALFS_TYPE] == common.T_UNHANDLED

    def stash_staged_file(self):
531
        stash_filename = mk_stash_name(self.fspath)
532
        logger.warning("Stashing file '%s' to '%s'" %
533
                       (self.fspath, stash_filename))
534
535
536
537
538
539
        os.rename(self.staged_path, stash_filename)

    def unstage_file(self):
        if self.stage_filename is None:
            return
        staged_path = self.staged_path
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
540
        os.unlink(staged_path)
541
        self.unregister_stage_name(self.stage_filename)
542
543
544
545
546


class LocalfsFileClient(FileClient):
    def __init__(self, settings):
        self.settings = settings
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
547
        self.SIGNATURE = "LocalfsFileClient"
548
549
550
        self.ROOTPATH = settings.local_root_path
        self.CACHEPATH = settings.cache_path
        self.get_db = settings.get_db
551
        self.probe_candidates = utils.ThreadSafeDict()
552
553
554
555
556
        self.check_enabled()

    def check_enabled(self):
        if not self.settings.localfs_is_enabled():
            msg = messaging.LocalfsSyncDisabled(logger=logger)
557
558
559
        else:
            msg = messaging.LocalfsSyncEnabled(logger=logger)
        self.settings.messager.put(msg)
560

561
562
563
564
565
566
567
568
569
570
    def remove_candidates(self, objnames, ident):
        with self.probe_candidates.lock() as d:
            for objname in objnames:
                try:
                    cached = d.pop(objname)
                    if cached["ident"] != ident:
                        d[objname] = cached
                except KeyError:
                    pass

571
    def list_candidate_files(self, forced=False):
572
573
574
575
576
577
578
579
        if not self.settings.localfs_is_enabled():
            return {}
        if not os.path.isdir(self.ROOTPATH):
            self.settings.set_localfs_enabled(False)
            msg = messaging.LocalfsSyncDisabled(logger=logger)
            self.settings.messager.put(msg)
            return {}

580
581
582
583
584
        with self.probe_candidates.lock() as d:
            if forced:
                candidates = self.walk_filesystem()
                d.update(candidates)
            return d.keys()
585

586
587
588
    def none_info(self):
        return {"ident": None, "info": None}

589
    def walk_filesystem(self):
590
        candidates = {}
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
591
592
593
594
595
596
        rootpath = utils.from_unicode(self.ROOTPATH)
        for dirpath, dirnames, files in os.walk(rootpath):
            try:
                dirpath = utils.to_unicode(dirpath)
            except UnicodeDecodeError as e:
                continue
597
598
599
            rel_dirpath = os.path.relpath(dirpath, start=self.ROOTPATH)
            logger.debug("'%s' '%s'" % (dirpath, rel_dirpath))
            if rel_dirpath != '.':
600
601
                objname = utils.to_standard_sep(rel_dirpath)
                candidates[objname] = self.none_info()
602
            for filename in files:
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
603
604
605
606
                try:
                    filename = utils.to_unicode(filename)
                except UnicodeDecodeError as e:
                    continue
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
607
608
609
610
611
                if rel_dirpath == '.':
                    prefix = ""
                else:
                    prefix = utils.to_standard_sep(rel_dirpath)
                objname = utils.join_objname(prefix, filename)
612
                candidates[objname] = self.none_info()
613

614
        db_cands = dict((name, self.none_info())
615
                        for name in self.list_files())
616
        candidates.update(db_cands)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
617
        logger.debug("Candidates: %s" % candidates)
618
        return candidates
619

620
621
622
623
624
    @transaction()
    def list_files(self):
        db = self.get_db()
        return db.list_files(self.SIGNATURE)

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
625
626
    def _local_path_changes(self, name, state):
        local_path = utils.join_path(self.ROOTPATH, name)
627
628
        return local_path_changes(local_path, state)

629
630
631
632
633
634
635
636
    def exclude_file(self, objname):
        parts = objname.split(common.OBJECT_DIRSEP)
        init_part = parts[0]
        if init_part in [self.settings.cache_name]:
            return True
        final_part = parts[-1]
        return exclude_pattern.match(final_part)

637
    def probe_file(self, objname, old_state, ref_state, ident):
638
        with self.probe_candidates.lock() as d:
639
640
641
642
643
644
645
            try:
                cached = d[objname]
                cached_info = cached["info"]
                cached["ident"] = ident
            except KeyError:
                cached_info = None

646
        if self.exclude_file(objname):
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
647
648
649
            msg = messaging.IgnoreProbeMessage(
                archive=old_state.archive, objname=objname, logger=logger)
            self.settings.messager.put(msg)
650
651
            return

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
652
        live_info = (self._local_path_changes(objname, old_state)
653
                     if cached_info is None else cached_info)
654
655
656
        if live_info is None:
            return
        live_state = old_state.set(info=live_info)
657
        return live_state
658
659
660
661
662
663
664

    def stage_file(self, source_state):
        return LocalfsSourceHandle(self.settings, source_state)

    def prepare_target(self, target_state):
        return LocalfsTargetHandle(self.settings, target_state)

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
665
    def notifier(self):
666
        def handle_path(path):
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
667
668
669
670
            try:
                path = utils.to_unicode(path)
            except UnicodeDecodeError as e:
                return
671
672
            if path.startswith(self.CACHEPATH):
                return
673
            rel_path = os.path.relpath(path, start=self.ROOTPATH)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
674
            objname = utils.to_standard_sep(rel_path)
675
            with self.probe_candidates.lock() as d:
676
                d[objname] = self.none_info()
677

678
        root_path = utils.from_unicode(self.ROOTPATH)
679
680
681
682
683
        class EventHandler(FileSystemEventHandler):
            def on_created(this, event):
                # if not event.is_directory:
                #     return
                path = event.src_path
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
684
                logger.debug("Handling %s" % event)
685
686
687
688
                handle_path(path)

            def on_deleted(this, event):
                path = event.src_path
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
689
                logger.debug("Handling %s" % event)
690
691
692
693
                if path == root_path:
                    self.settings.set_localfs_enabled(False)
                    msg = messaging.LocalfsSyncDisabled(logger=logger)
                    self.settings.messager.put(msg)
694
695
696
697
698
699
                handle_path(path)

            def on_modified(this, event):
                if event.is_directory:
                    return
                path = event.src_path
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
700
                logger.debug("Handling %s" % event)
701
702
703
704
705
                handle_path(path)

            def on_moved(this, event):
                src_path = event.src_path
                dest_path = event.dest_path
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
706
                logger.debug("Handling %s" % event)
707
708
709
710
711
                handle_path(src_path)
                handle_path(dest_path)

        event_handler = EventHandler()
        observer = Observer()
712
        observer.schedule(event_handler, root_path, recursive=True)
713
714
        observer.start()
        return observer