localfs_client.py 22.2 KB
Newer Older
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# Copyright (C) 2015 GRNET S.A.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

16
import os
17
import stat
18
19
20
import re
import datetime
import psutil
21
import time
22
import filecmp
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
23
import shutil
24

25
26
27
28
29
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
import logging

from agkyra.syncer.file_client import FileClient
30
from agkyra.syncer import utils, common, messaging
31
32
33
34
35
36
37
38
39
40
41
42
43
44
from agkyra.syncer.database import transaction

logger = logging.getLogger(__name__)

LOCAL_FILE = 0
LOCAL_EMPTY_DIR = 1
LOCAL_NONEMPTY_DIR = 2
LOCAL_MISSING = 3
LOCAL_SOFTLINK = 4
LOCAL_OTHER = 5

OS_FILE_EXISTS = 17
OS_NOT_A_DIR = 20
OS_NO_FILE_OR_DIR = 2
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
45
OS_IS_DIR = 21
46

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
47
DEFAULT_MTIME_PRECISION = 1e-4
48

49
50
51
52
exclude_regexes = ["\.#", "\.~", "~\$", "~.*\.tmp$", "\..*\.swp$"]
exclude_pattern = re.compile('|'.join(exclude_regexes))


53
54
55
56
57
58
59
60
class DirMissing(BaseException):
    pass


def link_file(src, dest):
    try:
        os.link(src, dest)
    except OSError as e:
61
        if e.errno == OS_FILE_EXISTS:
62
            raise common.ConflictError("Cannot link, '%s' exists." % dest)
63
64
65
        if e.errno == OS_NOT_A_DIR:
            raise common.ConflictError(
                "Cannot link, missing path for '%s'." % dest)
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
        if e.errno == OS_NO_FILE_OR_DIR:
            raise DirMissing()


def make_dirs(path):
    try:
        os.makedirs(path)
    except OSError as e:
        if e.errno == OS_FILE_EXISTS and os.path.isdir(path):
            return
        if e.errno in [OS_FILE_EXISTS, OS_NOT_A_DIR]:
            raise common.ConflictError("Cannot make dir '%s'." % path)
        raise


psutil_open_files = \
    (lambda proc: proc.open_files()) if psutil.version_info[0] >= 2 else \
    (lambda proc: proc.get_open_files())


def file_is_open(path):
    for proc in psutil.process_iter():
        try:
            flist = psutil_open_files(proc)
            for nt in flist:
                if nt.path == path:
                    return True
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
93
        except psutil.Error:
94
95
96
97
98
99
100
101
102
103
            pass
    return False


def mk_stash_name(filename):
    tstamp = datetime.datetime.now().strftime("%s")
    return filename + '.' + tstamp + '.local'


def eq_float(f1, f2):
104
    return abs(f1 - f2) < DEFAULT_MTIME_PRECISION
105
106
107


def files_equal(f1, f2):
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
108
    logger.debug("Comparing files: '%s', '%s'" % (f1, f2))
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
109
110
111
112
113
114
    try:
        return filecmp.cmp(f1, f2, shallow=False)
    except OSError as e:
        if e.errno in [OS_NO_FILE_OR_DIR, OS_NOT_A_DIR]:
            return False
        raise
115
116
117
118
119
120


def info_is_unhandled(info):
    return info != {} and info[LOCALFS_TYPE] == common.T_UNHANDLED


Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
121
def local_path_changes(path, state, unhandled_equal=True):
122
123
    live_info = get_live_info(path)
    info = state.info
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
124
    if is_info_eq(live_info, info, unhandled_equal):
125
126
127
128
129
130
131
        return None
    return live_info


def get_live_info(path):
    if path is None:
        return {}
132
    stats, status = get_local_status(path)
133
134
135
136
137
138
    if status == LOCAL_MISSING:
        return {}
    if status in [LOCAL_SOFTLINK, LOCAL_OTHER]:
        return {LOCALFS_TYPE: common.T_UNHANDLED}
    if status in [LOCAL_EMPTY_DIR, LOCAL_NONEMPTY_DIR]:
        return {LOCALFS_TYPE: common.T_DIR}
139
    live_info = {LOCALFS_MTIME: stats.st_mtime,
140
                 LOCALFS_SIZE: stats[stat.ST_SIZE],
141
142
143
144
145
                 LOCALFS_TYPE: common.T_FILE,
                 }
    return live_info


146
def stat_file(path):
147
    try:
148
        return os.lstat(path)
149
    except OSError as e:
150
        if e.errno in [OS_NO_FILE_OR_DIR, OS_NOT_A_DIR]:
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
            return None
        raise


LOCALFS_TYPE = "localfs_type"
LOCALFS_MTIME = "localfs_mtime"
LOCALFS_SIZE = "localfs_size"


def status_of_info(info):
    if info == {}:
        return LOCAL_MISSING
    if info[LOCALFS_TYPE] == common.T_DIR:
        return LOCAL_EMPTY_DIR
    if info[LOCALFS_TYPE] == common.T_UNHANDLED:
        return LOCAL_OTHER  # shouldn't happen
    return LOCAL_FILE


170
171
def get_local_status(path, attempt=0):
    stats = stat_file(path)
172
    try:
173
        status = _get_local_status_from_stats(stats, path)
174
    except OSError as e:
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
        logger.warning("Got error '%s' while listing dir '%s'" % (e, path))
        if attempt > 2:
            raise
        return get_local_status(path, attempt + 1)
    return stats, status


def _get_local_status_from_stats(stats, path):
    if stats is None:
        return LOCAL_MISSING
    mode = stats[stat.ST_MODE]
    if stat.S_ISLNK(mode):
        return LOCAL_SOFTLINK
    if stat.S_ISREG(mode):
        return LOCAL_FILE
    if stat.S_ISDIR(mode):
        if os.listdir(path):
            return LOCAL_NONEMPTY_DIR
        return LOCAL_EMPTY_DIR
    return LOCAL_OTHER


def path_status(path):
    stats, status = get_local_status(path)
    return status
200
201


Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
202
203
204
205
def info_of_regular_file(info):
    return info and info[LOCALFS_TYPE] == common.T_FILE


Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
206
def is_info_eq(info1, info2, unhandled_equal=True):
207
208
209
210
211
    if {} in [info1, info2]:
        return info1 == info2
    if info1[LOCALFS_TYPE] != info2[LOCALFS_TYPE]:
        return False
    if info1[LOCALFS_TYPE] == common.T_UNHANDLED:
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
212
        return unhandled_equal
213
214
215
216
217
218
219
220
    if info1[LOCALFS_TYPE] == common.T_DIR:
        return True
    return eq_float(info1[LOCALFS_MTIME], info2[LOCALFS_MTIME]) \
        and info1[LOCALFS_SIZE] == info2[LOCALFS_SIZE]


class LocalfsTargetHandle(object):
    def __init__(self, settings, target_state):
221
        self.settings = settings
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
222
        self.SIGNATURE = "LocalfsTargetHandle"
223
224
225
226
227
        self.rootpath = settings.local_root_path
        self.cache_hide_name = settings.cache_hide_name
        self.cache_hide_path = settings.cache_hide_path
        self.cache_path = settings.cache_path
        self.get_db = settings.get_db
228
        self.mtime_lag = settings.mtime_lag
229
        self.target_state = target_state
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
230
        self.objname = target_state.objname
231
        self.fspath = utils.join_path(self.rootpath, self.objname)
232
233
234
        self.hidden_filename = None
        self.hidden_path = None

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
235
236
    def get_path_in_cache(self, name):
        return utils.join_path(self.cache_path, name)
237
238
239
240
241
242
243

    @transaction()
    def register_hidden_name(self, filename):
        db = self.get_db()
        f = utils.hash_string(filename)
        hide_filename = utils.join_path(self.cache_hide_name, f)
        self.hidden_filename = hide_filename
244
        self.hidden_path = self.get_path_in_cache(self.hidden_filename)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
245
        if db.get_cachename(hide_filename):
246
            return False
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
247
        db.insert_cachename(hide_filename, self.SIGNATURE, filename)
248
249
250
251
252
        return True

    @transaction()
    def unregister_hidden_name(self, hidden_filename):
        db = self.get_db()
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
253
        db.delete_cachename(hidden_filename)
254
        self.hidden_filename = None
255
        self.hidden_path = None
256

257
    def move_file(self):
258
259
        fspath = self.fspath
        if file_is_open(fspath):
260
            raise common.BusyError("File '%s' is open. Aborting."
261
                                   % fspath)
262

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
263
        new_registered = self.register_hidden_name(self.objname)
264
        hidden_filename = self.hidden_filename
265
        hidden_path = self.hidden_path
266
267
268

        if not new_registered:
            logger.warning("Hiding already registered for file %s" %
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
269
                           (self.objname,))
270
271
            if os.path.lexists(hidden_path):
                logger.warning("File %s already hidden at %s" %
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
272
                               (self.objname, hidden_path))
273
274
                return
        try:
275
            os.rename(fspath, hidden_path)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
276
277
            logger.debug("Hiding file '%s' to '%s'" %
                         (fspath, hidden_path))
278
        except OSError as e:
279
            if e.errno in [OS_NO_FILE_OR_DIR, OS_NOT_A_DIR]:
280
                self.unregister_hidden_name(hidden_filename)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
281
                logger.debug("File '%s' does not exist" % fspath)
282
283
284
                return
            else:
                raise e
285
286
287
288
289
290
291
292
293
294
295
296
297

    def hide_file(self):
        self.move_file()
        if self.hidden_filename is not None:
            if file_is_open(self.hidden_path):
                os.rename(self.hidden_path, self.fspath)
                self.unregister_hidden_name(self.hidden_filename)
                raise common.BusyError("File '%s' is open. Undoing." %
                                       self.hidden_path)
            if path_status(self.hidden_path) == LOCAL_NONEMPTY_DIR:
                os.rename(self.hidden_path, self.fspath)
                self.unregister_hidden_name(self.hidden_filename)
                raise common.ConflictError("'%s' is non-empty" % self.fspath)
298

299
    def apply(self, fetched_path, fetched_live_info, sync_state):
300
        local_status = path_status(self.fspath)
301
302
303
304
305
306
307
        fetched_status = status_of_info(fetched_live_info)
        if local_status in [LOCAL_EMPTY_DIR, LOCAL_NONEMPTY_DIR] \
                and fetched_status == LOCAL_EMPTY_DIR:
            return
        if local_status == LOCAL_MISSING and fetched_status == LOCAL_MISSING:
            return
        if local_status == LOCAL_NONEMPTY_DIR:
308
            raise common.ConflictError("'%s' is non-empty" % self.fspath)
309

310
311
        self.prepare(fetched_path, sync_state)
        self.finalize(fetched_path, fetched_live_info)
312
313
314
        self.cleanup(self.hidden_path)
        self.unregister_hidden_name(self.hidden_filename)

315
    def prepare(self, fetched_path, sync_state):
316
        self.hide_file()
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
317
318
        info_changed = local_path_changes(
            self.hidden_path, sync_state, unhandled_equal=False)
319
        if info_changed is not None and info_changed != {}:
320
            if not files_equal(self.hidden_path, fetched_path):
321
322
323
                self.stash_file()

    def stash_file(self):
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
324
325
        stash_name = mk_stash_name(self.objname)
        stash_path = utils.join_path(self.rootpath, stash_name)
326
327
328
        msg = messaging.ConflictStashMessage(
            objname=self.objname, stash_name=stash_name, logger=logger)
        self.settings.messager.put(msg)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
329
        os.rename(self.hidden_path, stash_path)
330

331
    def finalize(self, filepath, live_info):
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
332
        logger.debug("Finalizing file '%s'" % filepath)
333
334
        if live_info == {}:
            return
335
        if live_info[LOCALFS_TYPE] == common.T_FILE:
336
            time.sleep(self.mtime_lag)
337
            try:
338
                link_file(filepath, self.fspath)
339
            except DirMissing:
340
                make_dirs(os.path.dirname(self.fspath))
341
                link_file(filepath, self.fspath)
342
        elif live_info[LOCALFS_TYPE] == common.T_DIR:
343
            make_dirs(self.fspath)
344
345
        else:
            raise AssertionError("info for fetched file '%s' is %s" %
346
                                 (filepath, live_info))
347

348
349
    def cleanup(self, filepath):
        if filepath is None:
350
            return
351
        status = path_status(filepath)
352
353
        if status == LOCAL_FILE:
            try:
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
354
                logger.debug("Cleaning up file '%s'" % filepath)
355
                os.unlink(filepath)
356
357
358
            except:
                pass
        elif status in [LOCAL_EMPTY_DIR, LOCAL_NONEMPTY_DIR]:
359
            os.rmdir(filepath)
360
361

    def pull(self, source_handle, sync_state):
362
363
364
365
        fetched_path = source_handle.send_file(sync_state)
        fetched_live_info = get_live_info(fetched_path)
        self.apply(fetched_path, fetched_live_info, sync_state)
        self.cleanup(fetched_path)
366
367
368
369
370
371
372
373
374
375
        return self.target_state.set(info=fetched_live_info)


class LocalfsSourceHandle(object):
    @transaction()
    def register_stage_name(self, filename):
        db = self.get_db()
        f = utils.hash_string(filename)
        stage_filename = utils.join_path(self.cache_stage_name, f)
        self.stage_filename = stage_filename
376
377
        stage_path = self.get_path_in_cache(stage_filename)
        self.staged_path = stage_path
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
378
        if db.get_cachename(stage_filename):
379
            return False
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
380
        db.insert_cachename(stage_filename, self.SIGNATURE, filename)
381
382
383
384
385
        return True

    @transaction()
    def unregister_stage_name(self, stage_filename):
        db = self.get_db()
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
386
        db.delete_cachename(stage_filename)
387
        self.stage_filename = None
388
        self.staged_path = None
389

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
390
391
    def get_path_in_cache(self, name):
        return utils.join_path(self.cache_path, name)
392

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
393
    def copy_file(self):
394
        fspath = self.fspath
395
        if file_is_open(fspath):
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
396
397
            raise common.OpenBusyError("File '%s' is open. Aborting"
                                       % fspath)
398
        new_registered = self.register_stage_name(fspath)
399
        stage_filename = self.stage_filename
400
        stage_path = self.staged_path
401
402
403

        if not new_registered:
            logger.warning("Staging already registered for file %s" %
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
404
                           (self.objname,))
405
406
            if os.path.lexists(stage_path):
                logger.warning("File %s already staged at %s" %
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
407
                               (self.objname, stage_path))
408
                return
409

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
410
        logger.debug("Staging file '%s' to '%s'" % (self.objname, stage_path))
411
        try:
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
412
413
414
            shutil.copy2(fspath, stage_path)
        except IOError as e:
            if e.errno in [OS_NO_FILE_OR_DIR, OS_IS_DIR]:
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
415
                logger.debug("Source is not a regular file: '%s'" % fspath)
416
                self.unregister_stage_name(stage_filename)
417
                return
418
419
            else:
                raise e
420

421
    def stage_file(self):
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
        self.copy_file()
        live_info = get_live_info(self.fspath)
        self.check_staged(live_info)
        self.check_update_source_state(live_info)

    def check_staged(self, live_info):
        is_reg = info_of_regular_file(live_info)

        if self.staged_path is None:
            if is_reg:
                m = ("File '%s' is not in a stable state; unstaged"
                     % self.objname)
                logger.warning(m)
                raise common.NotStableBusyError(m)
            return

        if not is_reg:
            os.unlink(self.staged_path)
            self.unregister_stage_name(self.stage_filename)
            logger.warning("Path '%s' is not a regular file; unstaged")
            return

        if file_is_open(self.fspath):
            os.unlink(self.staged_path)
            self.unregister_stage_name(self.stage_filename)
            m = "File '%s' is open; unstaged" % self.objname
            logger.warning(m)
            raise common.OpenBusyError(m)

        if not files_equal(self.staged_path, self.fspath):
            os.unlink(self.staged_path)
            self.unregister_stage_name(self.stage_filename)
            m = "File '%s' contents have changed; unstaged" % self.objname
            logger.warning(m)
            raise common.ChangedBusyError(m)
457
458

    def __init__(self, settings, source_state):
459
        self.settings = settings
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
460
        self.SIGNATURE = "LocalfsSourceHandle"
461
462
463
464
465
466
        self.rootpath = settings.local_root_path
        self.cache_stage_name = settings.cache_stage_name
        self.cache_stage_path = settings.cache_stage_path
        self.cache_path = settings.cache_path
        self.get_db = settings.get_db
        self.source_state = source_state
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
467
        self.objname = source_state.objname
468
        self.fspath = utils.join_path(self.rootpath, self.objname)
469
470
471
        self.stage_filename = None
        self.staged_path = None
        self.heartbeat = settings.heartbeat
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
472
        if info_of_regular_file(self.source_state.info):
473
            self.stage_file()
474

475
476
477
478
479
    @transaction()
    def update_state(self, state):
        db = self.get_db()
        db.put_state(state)

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
480
481
    def check_update_source_state(self, live_info):
        if not is_info_eq(live_info, self.source_state.info):
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
482
483
484
485
            msg = messaging.LiveInfoUpdateMessage(
                archive=self.SIGNATURE, objname=self.objname,
                info=live_info, logger=logger)
            self.settings.messager.put(msg)
486
487
488
489
            new_state = self.source_state.set(info=live_info)
            self.update_state(new_state)
            self.source_state = new_state

490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
    def get_synced_state(self):
        return self.source_state

    def info_is_dir(self):
        try:
            return self.source_state.info[LOCALFS_TYPE] == common.T_DIR
        except KeyError:
            return False

    def info_is_deleted(self):
        return self.source_state.info == {}

    def info_is_deleted_or_unhandled(self):
        return self.source_state.info == {} \
            or self.source_state.info[LOCALFS_TYPE] == common.T_UNHANDLED

    def stash_staged_file(self):
507
        stash_filename = mk_stash_name(self.fspath)
508
        logger.warning("Stashing file '%s' to '%s'" %
509
                       (self.fspath, stash_filename))
510
511
512
513
514
515
        os.rename(self.staged_path, stash_filename)

    def unstage_file(self):
        if self.stage_filename is None:
            return
        staged_path = self.staged_path
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
516
        os.unlink(staged_path)
517
        self.unregister_stage_name(self.stage_filename)
518
519
520
521
522


class LocalfsFileClient(FileClient):
    def __init__(self, settings):
        self.settings = settings
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
523
        self.SIGNATURE = "LocalfsFileClient"
524
525
526
        self.ROOTPATH = settings.local_root_path
        self.CACHEPATH = settings.cache_path
        self.get_db = settings.get_db
527
        self.probe_candidates = utils.ThreadSafeDict()
528

529
530
531
532
533
534
535
536
537
538
    def remove_candidates(self, objnames, ident):
        with self.probe_candidates.lock() as d:
            for objname in objnames:
                try:
                    cached = d.pop(objname)
                    if cached["ident"] != ident:
                        d[objname] = cached
                except KeyError:
                    pass

539
    def list_candidate_files(self, forced=False):
540
541
542
543
544
        with self.probe_candidates.lock() as d:
            if forced:
                candidates = self.walk_filesystem()
                d.update(candidates)
            return d.keys()
545

546
547
548
    def none_info(self):
        return {"ident": None, "info": None}

549
    def walk_filesystem(self):
550
        candidates = {}
551
552
553
554
        for dirpath, dirnames, files in os.walk(self.ROOTPATH):
            rel_dirpath = os.path.relpath(dirpath, start=self.ROOTPATH)
            logger.debug("'%s' '%s'" % (dirpath, rel_dirpath))
            if rel_dirpath != '.':
555
556
                objname = utils.to_standard_sep(rel_dirpath)
                candidates[objname] = self.none_info()
557
            for filename in files:
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
558
559
560
561
562
                if rel_dirpath == '.':
                    prefix = ""
                else:
                    prefix = utils.to_standard_sep(rel_dirpath)
                objname = utils.join_objname(prefix, filename)
563
                candidates[objname] = self.none_info()
564

565
        db_cands = dict((name, self.none_info())
566
                        for name in self.list_files())
567
        candidates.update(db_cands)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
568
        logger.debug("Candidates: %s" % candidates)
569
        return candidates
570

571
572
573
574
575
    @transaction()
    def list_files(self):
        db = self.get_db()
        return db.list_files(self.SIGNATURE)

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
576
577
    def _local_path_changes(self, name, state):
        local_path = utils.join_path(self.ROOTPATH, name)
578
579
        return local_path_changes(local_path, state)

580
581
582
583
584
585
586
587
    def exclude_file(self, objname):
        parts = objname.split(common.OBJECT_DIRSEP)
        init_part = parts[0]
        if init_part in [self.settings.cache_name]:
            return True
        final_part = parts[-1]
        return exclude_pattern.match(final_part)

588
    def probe_file(self, objname, old_state, ref_state, ident):
589
        with self.probe_candidates.lock() as d:
590
591
592
593
594
595
596
            try:
                cached = d[objname]
                cached_info = cached["info"]
                cached["ident"] = ident
            except KeyError:
                cached_info = None

597
        if self.exclude_file(objname):
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
598
599
600
            msg = messaging.IgnoreProbeMessage(
                archive=old_state.archive, objname=objname, logger=logger)
            self.settings.messager.put(msg)
601
602
            return

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
603
        live_info = (self._local_path_changes(objname, old_state)
604
                     if cached_info is None else cached_info)
605
606
607
        if live_info is None:
            return
        live_state = old_state.set(info=live_info)
608
        return live_state
609
610
611
612
613
614
615

    def stage_file(self, source_state):
        return LocalfsSourceHandle(self.settings, source_state)

    def prepare_target(self, target_state):
        return LocalfsTargetHandle(self.settings, target_state)

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
616
    def notifier(self):
617
618
        def handle_path(path):
            rel_path = os.path.relpath(path, start=self.ROOTPATH)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
619
            objname = utils.to_standard_sep(rel_path)
620
            with self.probe_candidates.lock() as d:
621
                d[objname] = self.none_info()
622
623
624
625
626
627
628
629

        class EventHandler(FileSystemEventHandler):
            def on_created(this, event):
                # if not event.is_directory:
                #     return
                path = event.src_path
                if path.startswith(self.CACHEPATH):
                    return
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
630
                logger.debug("Handling %s" % event)
631
632
633
634
635
636
                handle_path(path)

            def on_deleted(this, event):
                path = event.src_path
                if path.startswith(self.CACHEPATH):
                    return
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
637
                logger.debug("Handling %s" % event)
638
639
640
641
642
643
644
645
                handle_path(path)

            def on_modified(this, event):
                if event.is_directory:
                    return
                path = event.src_path
                if path.startswith(self.CACHEPATH):
                    return
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
646
                logger.debug("Handling %s" % event)
647
648
649
650
651
652
653
654
                handle_path(path)

            def on_moved(this, event):
                src_path = event.src_path
                dest_path = event.dest_path
                if src_path.startswith(self.CACHEPATH) or \
                        dest_path.startswith(self.CACHEPATH):
                    return
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
655
                logger.debug("Handling %s" % event)
656
657
658
659
660
661
662
663
664
                handle_path(src_path)
                handle_path(dest_path)

        path = self.ROOTPATH
        event_handler = EventHandler()
        observer = Observer()
        observer.schedule(event_handler, path, recursive=True)
        observer.start()
        return observer