localfs_client.py 21 KB
Newer Older
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# Copyright (C) 2015 GRNET S.A.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

16
import os
17
import stat
18
19
20
21
22
23
24
25
import re
import datetime
import psutil
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
import logging

from agkyra.syncer.file_client import FileClient
26
from agkyra.syncer import utils, common, messaging
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
from agkyra.syncer.database import transaction

logger = logging.getLogger(__name__)

LOCAL_FILE = 0
LOCAL_EMPTY_DIR = 1
LOCAL_NONEMPTY_DIR = 2
LOCAL_MISSING = 3
LOCAL_SOFTLINK = 4
LOCAL_OTHER = 5

OS_FILE_EXISTS = 17
OS_NOT_A_DIR = 20
OS_NO_FILE_OR_DIR = 2


43
44
45
46
exclude_regexes = ["\.#", "\.~", "~\$", "~.*\.tmp$", "\..*\.swp$"]
exclude_pattern = re.compile('|'.join(exclude_regexes))


47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
class DirMissing(BaseException):
    pass


def link_file(src, dest):
    try:
        os.link(src, dest)
    except OSError as e:
        if e.errno in [OS_FILE_EXISTS, OS_NOT_A_DIR]:
            raise common.ConflictError("Cannot link, '%s' exists." % dest)
        if e.errno == OS_NO_FILE_OR_DIR:
            raise DirMissing()


def make_dirs(path):
    try:
        os.makedirs(path)
    except OSError as e:
        if e.errno == OS_FILE_EXISTS and os.path.isdir(path):
            return
        if e.errno in [OS_FILE_EXISTS, OS_NOT_A_DIR]:
            raise common.ConflictError("Cannot make dir '%s'." % path)
        raise


psutil_open_files = \
    (lambda proc: proc.open_files()) if psutil.version_info[0] >= 2 else \
    (lambda proc: proc.get_open_files())


def file_is_open(path):
    for proc in psutil.process_iter():
        try:
            flist = psutil_open_files(proc)
            for nt in flist:
                if nt.path == path:
                    return True
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
84
        except psutil.Error:
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
            pass
    return False


def mk_stash_name(filename):
    tstamp = datetime.datetime.now().strftime("%s")
    return filename + '.' + tstamp + '.local'


def eq_float(f1, f2):
    return abs(f1 - f2) < 0.01


def files_equal(f1, f2):
    logger.info("Comparing files: '%s', '%s'" % (f1, f2))
100
101
    stats1, st1 = get_local_status(f1)
    stats2, st2 = get_local_status(f2)
102
103
104
105
    if st1 != st2:
        return False
    if st1 != LOCAL_FILE:
        return True
106
    if stats1[stat.ST_SIZE] != stats2[stat.ST_SIZE]:
107
108
109
110
111
112
113
114
115
116
        return False
    hash1 = utils.hash_file(f1)
    hash2 = utils.hash_file(f2)
    return hash1 == hash2


def info_is_unhandled(info):
    return info != {} and info[LOCALFS_TYPE] == common.T_UNHANDLED


Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
117
def local_path_changes(path, state, unhandled_equal=True):
118
119
    live_info = get_live_info(path)
    info = state.info
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
120
    if is_info_eq(live_info, info, unhandled_equal):
121
122
123
124
125
126
127
        return None
    return live_info


def get_live_info(path):
    if path is None:
        return {}
128
    stats, status = get_local_status(path)
129
130
131
132
133
134
    if status == LOCAL_MISSING:
        return {}
    if status in [LOCAL_SOFTLINK, LOCAL_OTHER]:
        return {LOCALFS_TYPE: common.T_UNHANDLED}
    if status in [LOCAL_EMPTY_DIR, LOCAL_NONEMPTY_DIR]:
        return {LOCALFS_TYPE: common.T_DIR}
135
136
    live_info = {LOCALFS_MTIME: stats[stat.ST_MTIME],
                 LOCALFS_SIZE: stats[stat.ST_SIZE],
137
138
139
140
141
                 LOCALFS_TYPE: common.T_FILE,
                 }
    return live_info


142
def stat_file(path):
143
    try:
144
        return os.lstat(path)
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
    except OSError as e:
        if e.errno == OS_NO_FILE_OR_DIR:
            return None
        raise


LOCALFS_TYPE = "localfs_type"
LOCALFS_MTIME = "localfs_mtime"
LOCALFS_SIZE = "localfs_size"


def status_of_info(info):
    if info == {}:
        return LOCAL_MISSING
    if info[LOCALFS_TYPE] == common.T_DIR:
        return LOCAL_EMPTY_DIR
    if info[LOCALFS_TYPE] == common.T_UNHANDLED:
        return LOCAL_OTHER  # shouldn't happen
    return LOCAL_FILE


166
167
def get_local_status(path, attempt=0):
    stats = stat_file(path)
168
    try:
169
        status = _get_local_status_from_stats(stats, path)
170
    except OSError as e:
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
        logger.warning("Got error '%s' while listing dir '%s'" % (e, path))
        if attempt > 2:
            raise
        return get_local_status(path, attempt + 1)
    return stats, status


def _get_local_status_from_stats(stats, path):
    if stats is None:
        return LOCAL_MISSING
    mode = stats[stat.ST_MODE]
    if stat.S_ISLNK(mode):
        return LOCAL_SOFTLINK
    if stat.S_ISREG(mode):
        return LOCAL_FILE
    if stat.S_ISDIR(mode):
        if os.listdir(path):
            return LOCAL_NONEMPTY_DIR
        return LOCAL_EMPTY_DIR
    return LOCAL_OTHER


def path_status(path):
    stats, status = get_local_status(path)
    return status
196
197


Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
198
def is_info_eq(info1, info2, unhandled_equal=True):
199
200
201
202
203
    if {} in [info1, info2]:
        return info1 == info2
    if info1[LOCALFS_TYPE] != info2[LOCALFS_TYPE]:
        return False
    if info1[LOCALFS_TYPE] == common.T_UNHANDLED:
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
204
        return unhandled_equal
205
206
207
208
209
210
211
212
    if info1[LOCALFS_TYPE] == common.T_DIR:
        return True
    return eq_float(info1[LOCALFS_MTIME], info2[LOCALFS_MTIME]) \
        and info1[LOCALFS_SIZE] == info2[LOCALFS_SIZE]


class LocalfsTargetHandle(object):
    def __init__(self, settings, target_state):
213
        self.settings = settings
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
214
        self.SIGNATURE = "LocalfsTargetHandle"
215
216
217
218
219
220
        self.rootpath = settings.local_root_path
        self.cache_hide_name = settings.cache_hide_name
        self.cache_hide_path = settings.cache_hide_path
        self.cache_path = settings.cache_path
        self.get_db = settings.get_db
        self.target_state = target_state
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
221
        self.objname = target_state.objname
222
        self.fspath = utils.join_path(self.rootpath, self.objname)
223
224
225
        self.hidden_filename = None
        self.hidden_path = None

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
226
227
    def get_path_in_cache(self, name):
        return utils.join_path(self.cache_path, name)
228
229
230
231
232
233
234

    @transaction()
    def register_hidden_name(self, filename):
        db = self.get_db()
        f = utils.hash_string(filename)
        hide_filename = utils.join_path(self.cache_hide_name, f)
        self.hidden_filename = hide_filename
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
235
        if db.get_cachename(hide_filename):
236
            return False
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
237
        db.insert_cachename(hide_filename, self.SIGNATURE, filename)
238
239
240
241
242
        return True

    @transaction()
    def unregister_hidden_name(self, hidden_filename):
        db = self.get_db()
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
243
        db.delete_cachename(hidden_filename)
244
245
246
        self.hidden_filename = None

    def hide_file(self):
247
248
        fspath = self.fspath
        if file_is_open(fspath):
249
            raise common.BusyError("File '%s' is open. Aborting."
250
                                   % fspath)
251

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
252
        new_registered = self.register_hidden_name(self.objname)
253
254
255
256
257
258
        hidden_filename = self.hidden_filename
        hidden_path = self.get_path_in_cache(hidden_filename)
        self.hidden_path = hidden_path

        if not new_registered:
            logger.warning("Hiding already registered for file %s" %
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
259
                           (self.objname,))
260
261
            if os.path.lexists(hidden_path):
                logger.warning("File %s already hidden at %s" %
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
262
                               (self.objname, hidden_path))
263
264
                return
        try:
265
            os.rename(fspath, hidden_path)
266
            logger.info("Hiding file '%s' to '%s'" %
267
                        (fspath, hidden_path))
268
269
270
        except OSError as e:
            if e.errno == OS_NO_FILE_OR_DIR:
                self.unregister_hidden_name(hidden_filename)
271
                logger.info("File '%s' does not exist" % fspath)
272
273
274
275
                return
            else:
                raise e
        if file_is_open(hidden_path):
276
            os.rename(hidden_path, fspath)
277
278
279
            self.unregister_hidden_name(hidden_filename)
            raise common.BusyError("File '%s' is open. Undoing." % hidden_path)
        if path_status(hidden_path) == LOCAL_NONEMPTY_DIR:
280
            os.rename(hidden_path, fspath)
281
            self.unregister_hidden_name(hidden_filename)
282
            raise common.ConflictError("'%s' is non-empty" % fspath)
283
284

    def apply(self, fetched_file, fetched_live_info, sync_state):
285
        local_status = path_status(self.fspath)
286
287
288
289
290
291
292
        fetched_status = status_of_info(fetched_live_info)
        if local_status in [LOCAL_EMPTY_DIR, LOCAL_NONEMPTY_DIR] \
                and fetched_status == LOCAL_EMPTY_DIR:
            return
        if local_status == LOCAL_MISSING and fetched_status == LOCAL_MISSING:
            return
        if local_status == LOCAL_NONEMPTY_DIR:
293
            raise common.ConflictError("'%s' is non-empty" % self.fspath)
294
295
296
297
298
299
300
301

        self.prepare(fetched_file, sync_state)
        self.finalize(fetched_file, fetched_live_info)
        self.cleanup(self.hidden_path)
        self.unregister_hidden_name(self.hidden_filename)

    def prepare(self, fetched_file, sync_state):
        self.hide_file()
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
302
303
        info_changed = local_path_changes(
            self.hidden_path, sync_state, unhandled_equal=False)
304
305
306
307
308
        if info_changed is not None and info_changed != {}:
            if not files_equal(self.hidden_path, fetched_file):
                self.stash_file()

    def stash_file(self):
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
309
310
        stash_name = mk_stash_name(self.objname)
        stash_path = utils.join_path(self.rootpath, stash_name)
311
312
313
        msg = messaging.ConflictStashMessage(
            objname=self.objname, stash_name=stash_name, logger=logger)
        self.settings.messager.put(msg)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
314
        os.rename(self.hidden_path, stash_path)
315
316
317
318
319
320
321

    def finalize(self, filename, live_info):
        logger.info("Finalizing file '%s'" % filename)
        if live_info == {}:
            return
        if live_info[LOCALFS_TYPE] != common.T_DIR:
            try:
322
                link_file(filename, self.fspath)
323
            except DirMissing:
324
325
                make_dirs(os.path.dirname(self.fspath))
                link_file(filename, self.fspath)
326
327
        else:
            # assuming empty dir
328
            make_dirs(self.fspath)
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355

    def cleanup(self, filename):
        status = path_status(filename)
        if status == LOCAL_FILE:
            try:
                logger.info("Cleaning up file '%s'" % filename)
                os.unlink(filename)
            except:
                pass
        elif status in [LOCAL_EMPTY_DIR, LOCAL_NONEMPTY_DIR]:
            os.rmdir(filename)

    def pull(self, source_handle, sync_state):
        fetched_file = source_handle.send_file(sync_state)
        fetched_live_info = get_live_info(fetched_file)
        self.apply(fetched_file, fetched_live_info, sync_state)
        self.cleanup(fetched_file)
        return self.target_state.set(info=fetched_live_info)


class LocalfsSourceHandle(object):
    @transaction()
    def register_stage_name(self, filename):
        db = self.get_db()
        f = utils.hash_string(filename)
        stage_filename = utils.join_path(self.cache_stage_name, f)
        self.stage_filename = stage_filename
356
357
        stage_path = self.get_path_in_cache(stage_filename)
        self.staged_path = stage_path
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
358
        if db.get_cachename(stage_filename):
359
            return False
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
360
        db.insert_cachename(stage_filename, self.SIGNATURE, filename)
361
362
363
364
365
        return True

    @transaction()
    def unregister_stage_name(self, stage_filename):
        db = self.get_db()
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
366
        db.delete_cachename(stage_filename)
367
        self.stage_filename = None
368
        self.staged_path = None
369

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
370
371
    def get_path_in_cache(self, name):
        return utils.join_path(self.cache_path, name)
372

373
374
    def lock_file(self, fspath):
        if file_is_open(fspath):
375
            raise common.BusyError("File '%s' is open. Aborting"
376
377
                                   % fspath)
        new_registered = self.register_stage_name(fspath)
378
        stage_filename = self.stage_filename
379
        stage_path = self.staged_path
380
381
382

        if not new_registered:
            logger.warning("Staging already registered for file %s" %
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
383
                           (self.objname,))
384
385
            if os.path.lexists(stage_path):
                logger.warning("File %s already staged at %s" %
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
386
                               (self.objname, stage_path))
387
                return
388
389

        logger.info("Staging file '%s' to '%s'" % (self.objname, stage_path))
390
        try:
391
            os.rename(fspath, stage_path)
392
393
        except OSError as e:
            if e.errno == OS_NO_FILE_OR_DIR:
394
                logger.info("Source does not exist: '%s'" % fspath)
395
                self.unregister_stage_name(stage_filename)
396
397
                self.check_update_source_state()
                return
398
399
400
            else:
                raise e
        if file_is_open(stage_path):
401
            os.rename(stage_path, fspath)
402
            self.unregister_stage_name(stage_filename)
403
            logger.warning("File '%s' is open; unstaged" % self.objname)
404
            raise common.BusyError("File '%s' is open. Undoing" % stage_path)
405
406
407

        self.check_update_source_state()
        if path_status(stage_path) != LOCAL_FILE:
408
            os.rename(stage_path, fspath)
409
410
411
            self.unregister_stage_name(stage_filename)
            logger.warning("Object '%s' is not a regular file; unstaged" %
                           self.objname)
412
413

    def __init__(self, settings, source_state):
414
        self.settings = settings
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
415
        self.SIGNATURE = "LocalfsSourceHandle"
416
417
418
419
420
421
        self.rootpath = settings.local_root_path
        self.cache_stage_name = settings.cache_stage_name
        self.cache_stage_path = settings.cache_stage_path
        self.cache_path = settings.cache_path
        self.get_db = settings.get_db
        self.source_state = source_state
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
422
        self.objname = source_state.objname
423
        self.fspath = utils.join_path(self.rootpath, self.objname)
424
425
426
        self.stage_filename = None
        self.staged_path = None
        self.heartbeat = settings.heartbeat
427
        if self.needs_staging():
428
            self.lock_file(self.fspath)
429

430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
    @transaction()
    def update_state(self, state):
        db = self.get_db()
        db.put_state(state)

    def check_update_source_state(self):
        live_info = local_path_changes(
            self.staged_path, self.source_state)
        if live_info is not None:
            logger.warning("Actual info differs in %s for object: '%s'; "
                           "updating..." % (self.SIGNATURE, self.objname))
            new_state = self.source_state.set(info=live_info)
            self.update_state(new_state)
            self.source_state = new_state

445
446
447
    def get_synced_state(self):
        return self.source_state

448
449
450
451
    def needs_staging(self):
        info = self.source_state.info
        return info and info[LOCALFS_TYPE] == common.T_FILE

452
453
454
455
456
457
458
459
460
461
462
463
464
465
    def info_is_dir(self):
        try:
            return self.source_state.info[LOCALFS_TYPE] == common.T_DIR
        except KeyError:
            return False

    def info_is_deleted(self):
        return self.source_state.info == {}

    def info_is_deleted_or_unhandled(self):
        return self.source_state.info == {} \
            or self.source_state.info[LOCALFS_TYPE] == common.T_UNHANDLED

    def stash_staged_file(self):
466
        stash_filename = mk_stash_name(self.fspath)
467
        logger.warning("Stashing file '%s' to '%s'" %
468
                       (self.fspath, stash_filename))
469
470
471
472
473
474
475
476
477
478
479
480
481
        os.rename(self.staged_path, stash_filename)

    def unstage_file(self):
        self.do_unstage()
        self.unregister_stage_name(self.stage_filename)

    def do_unstage(self):
        if self.stage_filename is None:
            return
        if self.info_is_deleted():
            return
        staged_path = self.staged_path
        try:
482
            link_file(staged_path, self.fspath)
483
484
485
486
487
488
489
490
            os.unlink(staged_path)
        except common.ConflictError:
            self.stash_staged_file()


class LocalfsFileClient(FileClient):
    def __init__(self, settings):
        self.settings = settings
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
491
        self.SIGNATURE = "LocalfsFileClient"
492
493
494
        self.ROOTPATH = settings.local_root_path
        self.CACHEPATH = settings.cache_path
        self.get_db = settings.get_db
495
        self.probe_candidates = utils.ThreadSafeDict()
496

497
498
499
500
501
502
503
504
505
506
    def remove_candidates(self, objnames, ident):
        with self.probe_candidates.lock() as d:
            for objname in objnames:
                try:
                    cached = d.pop(objname)
                    if cached["ident"] != ident:
                        d[objname] = cached
                except KeyError:
                    pass

507
    def list_candidate_files(self, forced=False):
508
509
510
511
512
        with self.probe_candidates.lock() as d:
            if forced:
                candidates = self.walk_filesystem()
                d.update(candidates)
            return d.keys()
513

514
515
516
    def none_info(self):
        return {"ident": None, "info": None}

517
    def walk_filesystem(self):
518
        db = self.get_db()
519
        candidates = {}
520
521
522
523
        for dirpath, dirnames, files in os.walk(self.ROOTPATH):
            rel_dirpath = os.path.relpath(dirpath, start=self.ROOTPATH)
            logger.debug("'%s' '%s'" % (dirpath, rel_dirpath))
            if rel_dirpath != '.':
524
525
                objname = utils.to_standard_sep(rel_dirpath)
                candidates[objname] = self.none_info()
526
            for filename in files:
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
527
528
529
530
531
                if rel_dirpath == '.':
                    prefix = ""
                else:
                    prefix = utils.to_standard_sep(rel_dirpath)
                objname = utils.join_objname(prefix, filename)
532
                candidates[objname] = self.none_info()
533

534
535
        db_cands = dict((name, self.none_info())
                        for name in db.list_files(self.SIGNATURE))
536
537
538
        candidates.update(db_cands)
        logger.info("Candidates: %s" % candidates)
        return candidates
539

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
540
541
    def _local_path_changes(self, name, state):
        local_path = utils.join_path(self.ROOTPATH, name)
542
543
        return local_path_changes(local_path, state)

544
545
546
547
548
549
550
551
    def exclude_file(self, objname):
        parts = objname.split(common.OBJECT_DIRSEP)
        init_part = parts[0]
        if init_part in [self.settings.cache_name]:
            return True
        final_part = parts[-1]
        return exclude_pattern.match(final_part)

552
    def probe_file(self, objname, old_state, ref_state, ident):
553
        with self.probe_candidates.lock() as d:
554
555
556
557
558
559
560
            try:
                cached = d[objname]
                cached_info = cached["info"]
                cached["ident"] = ident
            except KeyError:
                cached_info = None

561
562
563
564
565
        if self.exclude_file(objname):
            logger.warning("Ignoring probe archive: %s, object: %s" %
                           (old_state.archive, objname))
            return

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
566
        live_info = (self._local_path_changes(objname, old_state)
567
                     if cached_info is None else cached_info)
568
569
570
        if live_info is None:
            return
        live_state = old_state.set(info=live_info)
571
        return live_state
572
573
574
575
576
577
578

    def stage_file(self, source_state):
        return LocalfsSourceHandle(self.settings, source_state)

    def prepare_target(self, target_state):
        return LocalfsTargetHandle(self.settings, target_state)

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
579
    def notifier(self):
580
581
        def handle_path(path):
            rel_path = os.path.relpath(path, start=self.ROOTPATH)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
582
            objname = utils.to_standard_sep(rel_path)
583
            with self.probe_candidates.lock() as d:
584
                d[objname] = self.none_info()
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627

        class EventHandler(FileSystemEventHandler):
            def on_created(this, event):
                # if not event.is_directory:
                #     return
                path = event.src_path
                if path.startswith(self.CACHEPATH):
                    return
                logger.info("Handling %s" % event)
                handle_path(path)

            def on_deleted(this, event):
                path = event.src_path
                if path.startswith(self.CACHEPATH):
                    return
                logger.info("Handling %s" % event)
                handle_path(path)

            def on_modified(this, event):
                if event.is_directory:
                    return
                path = event.src_path
                if path.startswith(self.CACHEPATH):
                    return
                logger.info("Handling %s" % event)
                handle_path(path)

            def on_moved(this, event):
                src_path = event.src_path
                dest_path = event.dest_path
                if src_path.startswith(self.CACHEPATH) or \
                        dest_path.startswith(self.CACHEPATH):
                    return
                logger.info("Handling %s" % event)
                handle_path(src_path)
                handle_path(dest_path)

        path = self.ROOTPATH
        event_handler = EventHandler()
        observer = Observer()
        observer.schedule(event_handler, path, recursive=True)
        observer.start()
        return observer