localfs_client.py 21.7 KB
Newer Older
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# Copyright (C) 2015 GRNET S.A.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

16
import os
17
import stat
18
19
20
import re
import datetime
import psutil
21
import time
22
import filecmp
23

24
25
26
27
28
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
import logging

from agkyra.syncer.file_client import FileClient
29
from agkyra.syncer import utils, common, messaging
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
from agkyra.syncer.database import transaction

logger = logging.getLogger(__name__)

LOCAL_FILE = 0
LOCAL_EMPTY_DIR = 1
LOCAL_NONEMPTY_DIR = 2
LOCAL_MISSING = 3
LOCAL_SOFTLINK = 4
LOCAL_OTHER = 5

OS_FILE_EXISTS = 17
OS_NOT_A_DIR = 20
OS_NO_FILE_OR_DIR = 2

45
DEFAULT_MTIME_PRECISION = 1e-7
46

47
48
49
50
exclude_regexes = ["\.#", "\.~", "~\$", "~.*\.tmp$", "\..*\.swp$"]
exclude_pattern = re.compile('|'.join(exclude_regexes))


51
52
53
54
55
56
57
58
class DirMissing(BaseException):
    pass


def link_file(src, dest):
    try:
        os.link(src, dest)
    except OSError as e:
59
        if e.errno == OS_FILE_EXISTS:
60
            raise common.ConflictError("Cannot link, '%s' exists." % dest)
61
62
63
        if e.errno == OS_NOT_A_DIR:
            raise common.ConflictError(
                "Cannot link, missing path for '%s'." % dest)
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
        if e.errno == OS_NO_FILE_OR_DIR:
            raise DirMissing()


def make_dirs(path):
    try:
        os.makedirs(path)
    except OSError as e:
        if e.errno == OS_FILE_EXISTS and os.path.isdir(path):
            return
        if e.errno in [OS_FILE_EXISTS, OS_NOT_A_DIR]:
            raise common.ConflictError("Cannot make dir '%s'." % path)
        raise


psutil_open_files = \
    (lambda proc: proc.open_files()) if psutil.version_info[0] >= 2 else \
    (lambda proc: proc.get_open_files())


def file_is_open(path):
    for proc in psutil.process_iter():
        try:
            flist = psutil_open_files(proc)
            for nt in flist:
                if nt.path == path:
                    return True
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
91
        except psutil.Error:
92
93
94
95
96
97
98
99
100
101
            pass
    return False


def mk_stash_name(filename):
    tstamp = datetime.datetime.now().strftime("%s")
    return filename + '.' + tstamp + '.local'


def eq_float(f1, f2):
102
    return abs(f1 - f2) < DEFAULT_MTIME_PRECISION
103
104
105


def files_equal(f1, f2):
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
106
    logger.debug("Comparing files: '%s', '%s'" % (f1, f2))
107
    return filecmp.cmp(f1, f2, shallow=False)
108
109
110
111
112
113


def info_is_unhandled(info):
    return info != {} and info[LOCALFS_TYPE] == common.T_UNHANDLED


Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
114
def local_path_changes(path, state, unhandled_equal=True):
115
116
    live_info = get_live_info(path)
    info = state.info
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
117
    if is_info_eq(live_info, info, unhandled_equal):
118
119
120
121
122
123
124
        return None
    return live_info


def get_live_info(path):
    if path is None:
        return {}
125
    stats, status = get_local_status(path)
126
127
128
129
130
131
    if status == LOCAL_MISSING:
        return {}
    if status in [LOCAL_SOFTLINK, LOCAL_OTHER]:
        return {LOCALFS_TYPE: common.T_UNHANDLED}
    if status in [LOCAL_EMPTY_DIR, LOCAL_NONEMPTY_DIR]:
        return {LOCALFS_TYPE: common.T_DIR}
132
    live_info = {LOCALFS_MTIME: stats.st_mtime,
133
                 LOCALFS_SIZE: stats[stat.ST_SIZE],
134
135
136
137
138
                 LOCALFS_TYPE: common.T_FILE,
                 }
    return live_info


139
def stat_file(path):
140
    try:
141
        return os.lstat(path)
142
    except OSError as e:
143
        if e.errno in [OS_NO_FILE_OR_DIR, OS_NOT_A_DIR]:
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
            return None
        raise


LOCALFS_TYPE = "localfs_type"
LOCALFS_MTIME = "localfs_mtime"
LOCALFS_SIZE = "localfs_size"


def status_of_info(info):
    if info == {}:
        return LOCAL_MISSING
    if info[LOCALFS_TYPE] == common.T_DIR:
        return LOCAL_EMPTY_DIR
    if info[LOCALFS_TYPE] == common.T_UNHANDLED:
        return LOCAL_OTHER  # shouldn't happen
    return LOCAL_FILE


163
164
def get_local_status(path, attempt=0):
    stats = stat_file(path)
165
    try:
166
        status = _get_local_status_from_stats(stats, path)
167
    except OSError as e:
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
        logger.warning("Got error '%s' while listing dir '%s'" % (e, path))
        if attempt > 2:
            raise
        return get_local_status(path, attempt + 1)
    return stats, status


def _get_local_status_from_stats(stats, path):
    if stats is None:
        return LOCAL_MISSING
    mode = stats[stat.ST_MODE]
    if stat.S_ISLNK(mode):
        return LOCAL_SOFTLINK
    if stat.S_ISREG(mode):
        return LOCAL_FILE
    if stat.S_ISDIR(mode):
        if os.listdir(path):
            return LOCAL_NONEMPTY_DIR
        return LOCAL_EMPTY_DIR
    return LOCAL_OTHER


def path_status(path):
    stats, status = get_local_status(path)
    return status
193
194


Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
195
def is_info_eq(info1, info2, unhandled_equal=True):
196
197
198
199
200
    if {} in [info1, info2]:
        return info1 == info2
    if info1[LOCALFS_TYPE] != info2[LOCALFS_TYPE]:
        return False
    if info1[LOCALFS_TYPE] == common.T_UNHANDLED:
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
201
        return unhandled_equal
202
203
204
205
206
207
208
209
    if info1[LOCALFS_TYPE] == common.T_DIR:
        return True
    return eq_float(info1[LOCALFS_MTIME], info2[LOCALFS_MTIME]) \
        and info1[LOCALFS_SIZE] == info2[LOCALFS_SIZE]


class LocalfsTargetHandle(object):
    def __init__(self, settings, target_state):
210
        self.settings = settings
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
211
        self.SIGNATURE = "LocalfsTargetHandle"
212
213
214
215
216
        self.rootpath = settings.local_root_path
        self.cache_hide_name = settings.cache_hide_name
        self.cache_hide_path = settings.cache_hide_path
        self.cache_path = settings.cache_path
        self.get_db = settings.get_db
217
        self.mtime_lag = settings.mtime_lag
218
        self.target_state = target_state
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
219
        self.objname = target_state.objname
220
        self.fspath = utils.join_path(self.rootpath, self.objname)
221
222
223
        self.hidden_filename = None
        self.hidden_path = None

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
224
225
    def get_path_in_cache(self, name):
        return utils.join_path(self.cache_path, name)
226
227
228
229
230
231
232

    @transaction()
    def register_hidden_name(self, filename):
        db = self.get_db()
        f = utils.hash_string(filename)
        hide_filename = utils.join_path(self.cache_hide_name, f)
        self.hidden_filename = hide_filename
233
        self.hidden_path = self.get_path_in_cache(self.hidden_filename)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
234
        if db.get_cachename(hide_filename):
235
            return False
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
236
        db.insert_cachename(hide_filename, self.SIGNATURE, filename)
237
238
239
240
241
        return True

    @transaction()
    def unregister_hidden_name(self, hidden_filename):
        db = self.get_db()
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
242
        db.delete_cachename(hidden_filename)
243
        self.hidden_filename = None
244
        self.hidden_path = None
245

246
    def move_file(self):
247
248
        fspath = self.fspath
        if file_is_open(fspath):
249
            raise common.BusyError("File '%s' is open. Aborting."
250
                                   % fspath)
251

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
252
        new_registered = self.register_hidden_name(self.objname)
253
        hidden_filename = self.hidden_filename
254
        hidden_path = self.hidden_path
255
256
257

        if not new_registered:
            logger.warning("Hiding already registered for file %s" %
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
258
                           (self.objname,))
259
260
            if os.path.lexists(hidden_path):
                logger.warning("File %s already hidden at %s" %
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
261
                               (self.objname, hidden_path))
262
263
                return
        try:
264
            os.rename(fspath, hidden_path)
265
            logger.info("Hiding file '%s' to '%s'" %
266
                        (fspath, hidden_path))
267
        except OSError as e:
268
            if e.errno in [OS_NO_FILE_OR_DIR, OS_NOT_A_DIR]:
269
                self.unregister_hidden_name(hidden_filename)
270
                logger.info("File '%s' does not exist" % fspath)
271
272
273
                return
            else:
                raise e
274
275
276
277
278
279
280
281
282
283
284
285
286

    def hide_file(self):
        self.move_file()
        if self.hidden_filename is not None:
            if file_is_open(self.hidden_path):
                os.rename(self.hidden_path, self.fspath)
                self.unregister_hidden_name(self.hidden_filename)
                raise common.BusyError("File '%s' is open. Undoing." %
                                       self.hidden_path)
            if path_status(self.hidden_path) == LOCAL_NONEMPTY_DIR:
                os.rename(self.hidden_path, self.fspath)
                self.unregister_hidden_name(self.hidden_filename)
                raise common.ConflictError("'%s' is non-empty" % self.fspath)
287

288
    def apply(self, fetched_path, fetched_live_info, sync_state):
289
        local_status = path_status(self.fspath)
290
291
292
293
294
295
296
        fetched_status = status_of_info(fetched_live_info)
        if local_status in [LOCAL_EMPTY_DIR, LOCAL_NONEMPTY_DIR] \
                and fetched_status == LOCAL_EMPTY_DIR:
            return
        if local_status == LOCAL_MISSING and fetched_status == LOCAL_MISSING:
            return
        if local_status == LOCAL_NONEMPTY_DIR:
297
            raise common.ConflictError("'%s' is non-empty" % self.fspath)
298

299
300
        self.prepare(fetched_path, sync_state)
        self.finalize(fetched_path, fetched_live_info)
301
302
303
        self.cleanup(self.hidden_path)
        self.unregister_hidden_name(self.hidden_filename)

304
    def prepare(self, fetched_path, sync_state):
305
        self.hide_file()
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
306
307
        info_changed = local_path_changes(
            self.hidden_path, sync_state, unhandled_equal=False)
308
        if info_changed is not None and info_changed != {}:
309
            if not files_equal(self.hidden_path, fetched_path):
310
311
312
                self.stash_file()

    def stash_file(self):
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
313
314
        stash_name = mk_stash_name(self.objname)
        stash_path = utils.join_path(self.rootpath, stash_name)
315
316
317
        msg = messaging.ConflictStashMessage(
            objname=self.objname, stash_name=stash_name, logger=logger)
        self.settings.messager.put(msg)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
318
        os.rename(self.hidden_path, stash_path)
319

320
321
    def finalize(self, filepath, live_info):
        logger.info("Finalizing file '%s'" % filepath)
322
323
        if live_info == {}:
            return
324
        if live_info[LOCALFS_TYPE] == common.T_FILE:
325
            time.sleep(self.mtime_lag)
326
            try:
327
                link_file(filepath, self.fspath)
328
            except DirMissing:
329
                make_dirs(os.path.dirname(self.fspath))
330
                link_file(filepath, self.fspath)
331
        elif live_info[LOCALFS_TYPE] == common.T_DIR:
332
            make_dirs(self.fspath)
333
334
        else:
            raise AssertionError("info for fetched file '%s' is %s" %
335
                                 (filepath, live_info))
336

337
338
    def cleanup(self, filepath):
        if filepath is None:
339
            return
340
        status = path_status(filepath)
341
342
        if status == LOCAL_FILE:
            try:
343
344
                logger.info("Cleaning up file '%s'" % filepath)
                os.unlink(filepath)
345
346
347
            except:
                pass
        elif status in [LOCAL_EMPTY_DIR, LOCAL_NONEMPTY_DIR]:
348
            os.rmdir(filepath)
349
350

    def pull(self, source_handle, sync_state):
351
352
353
354
        fetched_path = source_handle.send_file(sync_state)
        fetched_live_info = get_live_info(fetched_path)
        self.apply(fetched_path, fetched_live_info, sync_state)
        self.cleanup(fetched_path)
355
356
357
358
359
360
361
362
363
364
        return self.target_state.set(info=fetched_live_info)


class LocalfsSourceHandle(object):
    @transaction()
    def register_stage_name(self, filename):
        db = self.get_db()
        f = utils.hash_string(filename)
        stage_filename = utils.join_path(self.cache_stage_name, f)
        self.stage_filename = stage_filename
365
366
        stage_path = self.get_path_in_cache(stage_filename)
        self.staged_path = stage_path
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
367
        if db.get_cachename(stage_filename):
368
            return False
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
369
        db.insert_cachename(stage_filename, self.SIGNATURE, filename)
370
371
372
373
374
        return True

    @transaction()
    def unregister_stage_name(self, stage_filename):
        db = self.get_db()
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
375
        db.delete_cachename(stage_filename)
376
        self.stage_filename = None
377
        self.staged_path = None
378

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
379
380
    def get_path_in_cache(self, name):
        return utils.join_path(self.cache_path, name)
381

382
383
    def lock_file(self):
        fspath = self.fspath
384
        if file_is_open(fspath):
385
            raise common.BusyError("File '%s' is open. Aborting"
386
387
                                   % fspath)
        new_registered = self.register_stage_name(fspath)
388
        stage_filename = self.stage_filename
389
        stage_path = self.staged_path
390
391
392

        if not new_registered:
            logger.warning("Staging already registered for file %s" %
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
393
                           (self.objname,))
394
395
            if os.path.lexists(stage_path):
                logger.warning("File %s already staged at %s" %
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
396
                               (self.objname, stage_path))
397
                return
398
399

        logger.info("Staging file '%s' to '%s'" % (self.objname, stage_path))
400
        try:
401
            os.rename(fspath, stage_path)
402
        except OSError as e:
403
            if e.errno in [OS_NO_FILE_OR_DIR, OS_NOT_A_DIR]:
404
                logger.info("Source does not exist: '%s'" % fspath)
405
                self.unregister_stage_name(stage_filename)
406
                return
407
408
            else:
                raise e
409

410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
    def stage_file(self):
        self.lock_file()
        if self.staged_path is not None:
            if file_is_open(self.staged_path):
                os.rename(self.staged_path, self.fspath)
                self.unregister_stage_name(self.stage_filename)
                logger.warning("File '%s' is open; unstaged" % self.objname)
                raise common.BusyError("File '%s' is open. Undoing" %
                                       self.staged_path)

            if path_status(self.staged_path) != LOCAL_FILE:
                os.rename(self.staged_path, self.fspath)
                self.unregister_stage_name(self.stage_filename)
                logger.warning("Object '%s' is not a regular file; unstaged" %
                               self.objname)
425
        self.check_update_source_state()
426
427

    def __init__(self, settings, source_state):
428
        self.settings = settings
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
429
        self.SIGNATURE = "LocalfsSourceHandle"
430
431
432
433
434
435
        self.rootpath = settings.local_root_path
        self.cache_stage_name = settings.cache_stage_name
        self.cache_stage_path = settings.cache_stage_path
        self.cache_path = settings.cache_path
        self.get_db = settings.get_db
        self.source_state = source_state
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
436
        self.objname = source_state.objname
437
        self.fspath = utils.join_path(self.rootpath, self.objname)
438
439
440
        self.stage_filename = None
        self.staged_path = None
        self.heartbeat = settings.heartbeat
441
        if self.needs_staging():
442
            self.stage_file()
443

444
445
446
447
448
449
450
451
452
    @transaction()
    def update_state(self, state):
        db = self.get_db()
        db.put_state(state)

    def check_update_source_state(self):
        live_info = local_path_changes(
            self.staged_path, self.source_state)
        if live_info is not None:
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
453
454
455
456
            msg = messaging.LiveInfoUpdateMessage(
                archive=self.SIGNATURE, objname=self.objname,
                info=live_info, logger=logger)
            self.settings.messager.put(msg)
457
458
459
460
            new_state = self.source_state.set(info=live_info)
            self.update_state(new_state)
            self.source_state = new_state

461
462
463
    def get_synced_state(self):
        return self.source_state

464
465
466
467
    def needs_staging(self):
        info = self.source_state.info
        return info and info[LOCALFS_TYPE] == common.T_FILE

468
469
470
471
472
473
474
475
476
477
478
479
480
481
    def info_is_dir(self):
        try:
            return self.source_state.info[LOCALFS_TYPE] == common.T_DIR
        except KeyError:
            return False

    def info_is_deleted(self):
        return self.source_state.info == {}

    def info_is_deleted_or_unhandled(self):
        return self.source_state.info == {} \
            or self.source_state.info[LOCALFS_TYPE] == common.T_UNHANDLED

    def stash_staged_file(self):
482
        stash_filename = mk_stash_name(self.fspath)
483
        logger.warning("Stashing file '%s' to '%s'" %
484
                       (self.fspath, stash_filename))
485
486
487
488
489
490
491
        os.rename(self.staged_path, stash_filename)

    def unstage_file(self):
        if self.stage_filename is None:
            return
        staged_path = self.staged_path
        try:
492
            link_file(staged_path, self.fspath)
493
494
495
            os.unlink(staged_path)
        except common.ConflictError:
            self.stash_staged_file()
496
        self.unregister_stage_name(self.stage_filename)
497
498
499
500
501


class LocalfsFileClient(FileClient):
    def __init__(self, settings):
        self.settings = settings
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
502
        self.SIGNATURE = "LocalfsFileClient"
503
504
505
        self.ROOTPATH = settings.local_root_path
        self.CACHEPATH = settings.cache_path
        self.get_db = settings.get_db
506
        self.probe_candidates = utils.ThreadSafeDict()
507

508
509
510
511
512
513
514
515
516
517
    def remove_candidates(self, objnames, ident):
        with self.probe_candidates.lock() as d:
            for objname in objnames:
                try:
                    cached = d.pop(objname)
                    if cached["ident"] != ident:
                        d[objname] = cached
                except KeyError:
                    pass

518
    def list_candidate_files(self, forced=False):
519
520
521
522
523
        with self.probe_candidates.lock() as d:
            if forced:
                candidates = self.walk_filesystem()
                d.update(candidates)
            return d.keys()
524

525
526
527
    def none_info(self):
        return {"ident": None, "info": None}

528
    def walk_filesystem(self):
529
        candidates = {}
530
531
532
533
        for dirpath, dirnames, files in os.walk(self.ROOTPATH):
            rel_dirpath = os.path.relpath(dirpath, start=self.ROOTPATH)
            logger.debug("'%s' '%s'" % (dirpath, rel_dirpath))
            if rel_dirpath != '.':
534
535
                objname = utils.to_standard_sep(rel_dirpath)
                candidates[objname] = self.none_info()
536
            for filename in files:
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
537
538
539
540
541
                if rel_dirpath == '.':
                    prefix = ""
                else:
                    prefix = utils.to_standard_sep(rel_dirpath)
                objname = utils.join_objname(prefix, filename)
542
                candidates[objname] = self.none_info()
543

544
        db_cands = dict((name, self.none_info())
545
                        for name in self.list_files())
546
547
548
        candidates.update(db_cands)
        logger.info("Candidates: %s" % candidates)
        return candidates
549

550
551
552
553
554
    @transaction()
    def list_files(self):
        db = self.get_db()
        return db.list_files(self.SIGNATURE)

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
555
556
    def _local_path_changes(self, name, state):
        local_path = utils.join_path(self.ROOTPATH, name)
557
558
        return local_path_changes(local_path, state)

559
560
561
562
563
564
565
566
    def exclude_file(self, objname):
        parts = objname.split(common.OBJECT_DIRSEP)
        init_part = parts[0]
        if init_part in [self.settings.cache_name]:
            return True
        final_part = parts[-1]
        return exclude_pattern.match(final_part)

567
    def probe_file(self, objname, old_state, ref_state, ident):
568
        with self.probe_candidates.lock() as d:
569
570
571
572
573
574
575
            try:
                cached = d[objname]
                cached_info = cached["info"]
                cached["ident"] = ident
            except KeyError:
                cached_info = None

576
577
578
579
580
        if self.exclude_file(objname):
            logger.warning("Ignoring probe archive: %s, object: %s" %
                           (old_state.archive, objname))
            return

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
581
        live_info = (self._local_path_changes(objname, old_state)
582
                     if cached_info is None else cached_info)
583
584
585
        if live_info is None:
            return
        live_state = old_state.set(info=live_info)
586
        return live_state
587
588
589
590
591
592
593

    def stage_file(self, source_state):
        return LocalfsSourceHandle(self.settings, source_state)

    def prepare_target(self, target_state):
        return LocalfsTargetHandle(self.settings, target_state)

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
594
    def notifier(self):
595
596
        def handle_path(path):
            rel_path = os.path.relpath(path, start=self.ROOTPATH)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
597
            objname = utils.to_standard_sep(rel_path)
598
            with self.probe_candidates.lock() as d:
599
                d[objname] = self.none_info()
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642

        class EventHandler(FileSystemEventHandler):
            def on_created(this, event):
                # if not event.is_directory:
                #     return
                path = event.src_path
                if path.startswith(self.CACHEPATH):
                    return
                logger.info("Handling %s" % event)
                handle_path(path)

            def on_deleted(this, event):
                path = event.src_path
                if path.startswith(self.CACHEPATH):
                    return
                logger.info("Handling %s" % event)
                handle_path(path)

            def on_modified(this, event):
                if event.is_directory:
                    return
                path = event.src_path
                if path.startswith(self.CACHEPATH):
                    return
                logger.info("Handling %s" % event)
                handle_path(path)

            def on_moved(this, event):
                src_path = event.src_path
                dest_path = event.dest_path
                if src_path.startswith(self.CACHEPATH) or \
                        dest_path.startswith(self.CACHEPATH):
                    return
                logger.info("Handling %s" % event)
                handle_path(src_path)
                handle_path(dest_path)

        path = self.ROOTPATH
        event_handler = EventHandler()
        observer = Observer()
        observer.schedule(event_handler, path, recursive=True)
        observer.start()
        return observer