localfs_client.py 21.9 KB
Newer Older
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# Copyright (C) 2015 GRNET S.A.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

16
import os
17
import stat
18
19
20
import re
import datetime
import psutil
21
22
import time

23
24
25
26
27
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
import logging

from agkyra.syncer.file_client import FileClient
28
from agkyra.syncer import utils, common, messaging
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
from agkyra.syncer.database import transaction

logger = logging.getLogger(__name__)

LOCAL_FILE = 0
LOCAL_EMPTY_DIR = 1
LOCAL_NONEMPTY_DIR = 2
LOCAL_MISSING = 3
LOCAL_SOFTLINK = 4
LOCAL_OTHER = 5

OS_FILE_EXISTS = 17
OS_NOT_A_DIR = 20
OS_NO_FILE_OR_DIR = 2

44
DEFAULT_MTIME_PRECISION = 1e-7
45

46
47
48
49
exclude_regexes = ["\.#", "\.~", "~\$", "~.*\.tmp$", "\..*\.swp$"]
exclude_pattern = re.compile('|'.join(exclude_regexes))


50
51
52
53
54
55
56
57
class DirMissing(BaseException):
    pass


def link_file(src, dest):
    try:
        os.link(src, dest)
    except OSError as e:
58
        if e.errno == OS_FILE_EXISTS:
59
            raise common.ConflictError("Cannot link, '%s' exists." % dest)
60
61
62
        if e.errno == OS_NOT_A_DIR:
            raise common.ConflictError(
                "Cannot link, missing path for '%s'." % dest)
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
        if e.errno == OS_NO_FILE_OR_DIR:
            raise DirMissing()


def make_dirs(path):
    try:
        os.makedirs(path)
    except OSError as e:
        if e.errno == OS_FILE_EXISTS and os.path.isdir(path):
            return
        if e.errno in [OS_FILE_EXISTS, OS_NOT_A_DIR]:
            raise common.ConflictError("Cannot make dir '%s'." % path)
        raise


psutil_open_files = \
    (lambda proc: proc.open_files()) if psutil.version_info[0] >= 2 else \
    (lambda proc: proc.get_open_files())


def file_is_open(path):
    for proc in psutil.process_iter():
        try:
            flist = psutil_open_files(proc)
            for nt in flist:
                if nt.path == path:
                    return True
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
90
        except psutil.Error:
91
92
93
94
95
96
97
98
99
100
            pass
    return False


def mk_stash_name(filename):
    tstamp = datetime.datetime.now().strftime("%s")
    return filename + '.' + tstamp + '.local'


def eq_float(f1, f2):
101
    return abs(f1 - f2) < DEFAULT_MTIME_PRECISION
102
103
104


def files_equal(f1, f2):
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
105
    logger.debug("Comparing files: '%s', '%s'" % (f1, f2))
106
107
    stats1, st1 = get_local_status(f1)
    stats2, st2 = get_local_status(f2)
108
109
110
111
    if st1 != st2:
        return False
    if st1 != LOCAL_FILE:
        return True
112
    if stats1[stat.ST_SIZE] != stats2[stat.ST_SIZE]:
113
114
115
116
117
118
119
120
121
122
        return False
    hash1 = utils.hash_file(f1)
    hash2 = utils.hash_file(f2)
    return hash1 == hash2


def info_is_unhandled(info):
    return info != {} and info[LOCALFS_TYPE] == common.T_UNHANDLED


Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
123
def local_path_changes(path, state, unhandled_equal=True):
124
125
    live_info = get_live_info(path)
    info = state.info
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
126
    if is_info_eq(live_info, info, unhandled_equal):
127
128
129
130
131
132
133
        return None
    return live_info


def get_live_info(path):
    if path is None:
        return {}
134
    stats, status = get_local_status(path)
135
136
137
138
139
140
    if status == LOCAL_MISSING:
        return {}
    if status in [LOCAL_SOFTLINK, LOCAL_OTHER]:
        return {LOCALFS_TYPE: common.T_UNHANDLED}
    if status in [LOCAL_EMPTY_DIR, LOCAL_NONEMPTY_DIR]:
        return {LOCALFS_TYPE: common.T_DIR}
141
    live_info = {LOCALFS_MTIME: stats.st_mtime,
142
                 LOCALFS_SIZE: stats[stat.ST_SIZE],
143
144
145
146
147
                 LOCALFS_TYPE: common.T_FILE,
                 }
    return live_info


148
def stat_file(path):
149
    try:
150
        return os.lstat(path)
151
    except OSError as e:
152
        if e.errno in [OS_NO_FILE_OR_DIR, OS_NOT_A_DIR]:
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
            return None
        raise


LOCALFS_TYPE = "localfs_type"
LOCALFS_MTIME = "localfs_mtime"
LOCALFS_SIZE = "localfs_size"


def status_of_info(info):
    if info == {}:
        return LOCAL_MISSING
    if info[LOCALFS_TYPE] == common.T_DIR:
        return LOCAL_EMPTY_DIR
    if info[LOCALFS_TYPE] == common.T_UNHANDLED:
        return LOCAL_OTHER  # shouldn't happen
    return LOCAL_FILE


172
173
def get_local_status(path, attempt=0):
    stats = stat_file(path)
174
    try:
175
        status = _get_local_status_from_stats(stats, path)
176
    except OSError as e:
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
        logger.warning("Got error '%s' while listing dir '%s'" % (e, path))
        if attempt > 2:
            raise
        return get_local_status(path, attempt + 1)
    return stats, status


def _get_local_status_from_stats(stats, path):
    if stats is None:
        return LOCAL_MISSING
    mode = stats[stat.ST_MODE]
    if stat.S_ISLNK(mode):
        return LOCAL_SOFTLINK
    if stat.S_ISREG(mode):
        return LOCAL_FILE
    if stat.S_ISDIR(mode):
        if os.listdir(path):
            return LOCAL_NONEMPTY_DIR
        return LOCAL_EMPTY_DIR
    return LOCAL_OTHER


def path_status(path):
    stats, status = get_local_status(path)
    return status
202
203


Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
204
def is_info_eq(info1, info2, unhandled_equal=True):
205
206
207
208
209
    if {} in [info1, info2]:
        return info1 == info2
    if info1[LOCALFS_TYPE] != info2[LOCALFS_TYPE]:
        return False
    if info1[LOCALFS_TYPE] == common.T_UNHANDLED:
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
210
        return unhandled_equal
211
212
213
214
215
216
217
218
    if info1[LOCALFS_TYPE] == common.T_DIR:
        return True
    return eq_float(info1[LOCALFS_MTIME], info2[LOCALFS_MTIME]) \
        and info1[LOCALFS_SIZE] == info2[LOCALFS_SIZE]


class LocalfsTargetHandle(object):
    def __init__(self, settings, target_state):
219
        self.settings = settings
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
220
        self.SIGNATURE = "LocalfsTargetHandle"
221
222
223
224
225
        self.rootpath = settings.local_root_path
        self.cache_hide_name = settings.cache_hide_name
        self.cache_hide_path = settings.cache_hide_path
        self.cache_path = settings.cache_path
        self.get_db = settings.get_db
226
        self.mtime_lag = settings.mtime_lag
227
        self.target_state = target_state
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
228
        self.objname = target_state.objname
229
        self.fspath = utils.join_path(self.rootpath, self.objname)
230
231
232
        self.hidden_filename = None
        self.hidden_path = None

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
233
234
    def get_path_in_cache(self, name):
        return utils.join_path(self.cache_path, name)
235
236
237
238
239
240
241

    @transaction()
    def register_hidden_name(self, filename):
        db = self.get_db()
        f = utils.hash_string(filename)
        hide_filename = utils.join_path(self.cache_hide_name, f)
        self.hidden_filename = hide_filename
242
        self.hidden_path = self.get_path_in_cache(self.hidden_filename)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
243
        if db.get_cachename(hide_filename):
244
            return False
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
245
        db.insert_cachename(hide_filename, self.SIGNATURE, filename)
246
247
248
249
250
        return True

    @transaction()
    def unregister_hidden_name(self, hidden_filename):
        db = self.get_db()
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
251
        db.delete_cachename(hidden_filename)
252
        self.hidden_filename = None
253
        self.hidden_path = None
254

255
    def move_file(self):
256
257
        fspath = self.fspath
        if file_is_open(fspath):
258
            raise common.BusyError("File '%s' is open. Aborting."
259
                                   % fspath)
260

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
261
        new_registered = self.register_hidden_name(self.objname)
262
        hidden_filename = self.hidden_filename
263
        hidden_path = self.hidden_path
264
265
266

        if not new_registered:
            logger.warning("Hiding already registered for file %s" %
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
267
                           (self.objname,))
268
269
            if os.path.lexists(hidden_path):
                logger.warning("File %s already hidden at %s" %
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
270
                               (self.objname, hidden_path))
271
272
                return
        try:
273
            os.rename(fspath, hidden_path)
274
            logger.info("Hiding file '%s' to '%s'" %
275
                        (fspath, hidden_path))
276
        except OSError as e:
277
            if e.errno in [OS_NO_FILE_OR_DIR, OS_NOT_A_DIR]:
278
                self.unregister_hidden_name(hidden_filename)
279
                logger.info("File '%s' does not exist" % fspath)
280
281
282
                return
            else:
                raise e
283
284
285
286
287
288
289
290
291
292
293
294
295

    def hide_file(self):
        self.move_file()
        if self.hidden_filename is not None:
            if file_is_open(self.hidden_path):
                os.rename(self.hidden_path, self.fspath)
                self.unregister_hidden_name(self.hidden_filename)
                raise common.BusyError("File '%s' is open. Undoing." %
                                       self.hidden_path)
            if path_status(self.hidden_path) == LOCAL_NONEMPTY_DIR:
                os.rename(self.hidden_path, self.fspath)
                self.unregister_hidden_name(self.hidden_filename)
                raise common.ConflictError("'%s' is non-empty" % self.fspath)
296

297
    def apply(self, fetched_path, fetched_live_info, sync_state):
298
        local_status = path_status(self.fspath)
299
300
301
302
303
304
305
        fetched_status = status_of_info(fetched_live_info)
        if local_status in [LOCAL_EMPTY_DIR, LOCAL_NONEMPTY_DIR] \
                and fetched_status == LOCAL_EMPTY_DIR:
            return
        if local_status == LOCAL_MISSING and fetched_status == LOCAL_MISSING:
            return
        if local_status == LOCAL_NONEMPTY_DIR:
306
            raise common.ConflictError("'%s' is non-empty" % self.fspath)
307

308
309
        self.prepare(fetched_path, sync_state)
        self.finalize(fetched_path, fetched_live_info)
310
311
312
        self.cleanup(self.hidden_path)
        self.unregister_hidden_name(self.hidden_filename)

313
    def prepare(self, fetched_path, sync_state):
314
        self.hide_file()
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
315
316
        info_changed = local_path_changes(
            self.hidden_path, sync_state, unhandled_equal=False)
317
        if info_changed is not None and info_changed != {}:
318
            if not files_equal(self.hidden_path, fetched_path):
319
320
321
                self.stash_file()

    def stash_file(self):
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
322
323
        stash_name = mk_stash_name(self.objname)
        stash_path = utils.join_path(self.rootpath, stash_name)
324
325
326
        msg = messaging.ConflictStashMessage(
            objname=self.objname, stash_name=stash_name, logger=logger)
        self.settings.messager.put(msg)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
327
        os.rename(self.hidden_path, stash_path)
328

329
330
    def finalize(self, filepath, live_info):
        logger.info("Finalizing file '%s'" % filepath)
331
332
        if live_info == {}:
            return
333
        if live_info[LOCALFS_TYPE] == common.T_FILE:
334
            time.sleep(self.mtime_lag)
335
            try:
336
                link_file(filepath, self.fspath)
337
            except DirMissing:
338
                make_dirs(os.path.dirname(self.fspath))
339
                link_file(filepath, self.fspath)
340
        elif live_info[LOCALFS_TYPE] == common.T_DIR:
341
            make_dirs(self.fspath)
342
343
        else:
            raise AssertionError("info for fetched file '%s' is %s" %
344
                                 (filepath, live_info))
345

346
347
    def cleanup(self, filepath):
        if filepath is None:
348
            return
349
        status = path_status(filepath)
350
351
        if status == LOCAL_FILE:
            try:
352
353
                logger.info("Cleaning up file '%s'" % filepath)
                os.unlink(filepath)
354
355
356
            except:
                pass
        elif status in [LOCAL_EMPTY_DIR, LOCAL_NONEMPTY_DIR]:
357
            os.rmdir(filepath)
358
359

    def pull(self, source_handle, sync_state):
360
361
362
363
        fetched_path = source_handle.send_file(sync_state)
        fetched_live_info = get_live_info(fetched_path)
        self.apply(fetched_path, fetched_live_info, sync_state)
        self.cleanup(fetched_path)
364
365
366
367
368
369
370
371
372
373
        return self.target_state.set(info=fetched_live_info)


class LocalfsSourceHandle(object):
    @transaction()
    def register_stage_name(self, filename):
        db = self.get_db()
        f = utils.hash_string(filename)
        stage_filename = utils.join_path(self.cache_stage_name, f)
        self.stage_filename = stage_filename
374
375
        stage_path = self.get_path_in_cache(stage_filename)
        self.staged_path = stage_path
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
376
        if db.get_cachename(stage_filename):
377
            return False
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
378
        db.insert_cachename(stage_filename, self.SIGNATURE, filename)
379
380
381
382
383
        return True

    @transaction()
    def unregister_stage_name(self, stage_filename):
        db = self.get_db()
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
384
        db.delete_cachename(stage_filename)
385
        self.stage_filename = None
386
        self.staged_path = None
387

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
388
389
    def get_path_in_cache(self, name):
        return utils.join_path(self.cache_path, name)
390

391
392
    def lock_file(self):
        fspath = self.fspath
393
        if file_is_open(fspath):
394
            raise common.BusyError("File '%s' is open. Aborting"
395
396
                                   % fspath)
        new_registered = self.register_stage_name(fspath)
397
        stage_filename = self.stage_filename
398
        stage_path = self.staged_path
399
400
401

        if not new_registered:
            logger.warning("Staging already registered for file %s" %
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
402
                           (self.objname,))
403
404
            if os.path.lexists(stage_path):
                logger.warning("File %s already staged at %s" %
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
405
                               (self.objname, stage_path))
406
                return
407
408

        logger.info("Staging file '%s' to '%s'" % (self.objname, stage_path))
409
        try:
410
            os.rename(fspath, stage_path)
411
        except OSError as e:
412
            if e.errno in [OS_NO_FILE_OR_DIR, OS_NOT_A_DIR]:
413
                logger.info("Source does not exist: '%s'" % fspath)
414
                self.unregister_stage_name(stage_filename)
415
                return
416
417
            else:
                raise e
418

419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
    def stage_file(self):
        self.lock_file()
        if self.staged_path is not None:
            if file_is_open(self.staged_path):
                os.rename(self.staged_path, self.fspath)
                self.unregister_stage_name(self.stage_filename)
                logger.warning("File '%s' is open; unstaged" % self.objname)
                raise common.BusyError("File '%s' is open. Undoing" %
                                       self.staged_path)

            if path_status(self.staged_path) != LOCAL_FILE:
                os.rename(self.staged_path, self.fspath)
                self.unregister_stage_name(self.stage_filename)
                logger.warning("Object '%s' is not a regular file; unstaged" %
                               self.objname)
434
        self.check_update_source_state()
435
436

    def __init__(self, settings, source_state):
437
        self.settings = settings
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
438
        self.SIGNATURE = "LocalfsSourceHandle"
439
440
441
442
443
444
        self.rootpath = settings.local_root_path
        self.cache_stage_name = settings.cache_stage_name
        self.cache_stage_path = settings.cache_stage_path
        self.cache_path = settings.cache_path
        self.get_db = settings.get_db
        self.source_state = source_state
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
445
        self.objname = source_state.objname
446
        self.fspath = utils.join_path(self.rootpath, self.objname)
447
448
449
        self.stage_filename = None
        self.staged_path = None
        self.heartbeat = settings.heartbeat
450
        if self.needs_staging():
451
            self.stage_file()
452

453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
    @transaction()
    def update_state(self, state):
        db = self.get_db()
        db.put_state(state)

    def check_update_source_state(self):
        live_info = local_path_changes(
            self.staged_path, self.source_state)
        if live_info is not None:
            logger.warning("Actual info differs in %s for object: '%s'; "
                           "updating..." % (self.SIGNATURE, self.objname))
            new_state = self.source_state.set(info=live_info)
            self.update_state(new_state)
            self.source_state = new_state

468
469
470
    def get_synced_state(self):
        return self.source_state

471
472
473
474
    def needs_staging(self):
        info = self.source_state.info
        return info and info[LOCALFS_TYPE] == common.T_FILE

475
476
477
478
479
480
481
482
483
484
485
486
487
488
    def info_is_dir(self):
        try:
            return self.source_state.info[LOCALFS_TYPE] == common.T_DIR
        except KeyError:
            return False

    def info_is_deleted(self):
        return self.source_state.info == {}

    def info_is_deleted_or_unhandled(self):
        return self.source_state.info == {} \
            or self.source_state.info[LOCALFS_TYPE] == common.T_UNHANDLED

    def stash_staged_file(self):
489
        stash_filename = mk_stash_name(self.fspath)
490
        logger.warning("Stashing file '%s' to '%s'" %
491
                       (self.fspath, stash_filename))
492
493
494
495
496
497
498
        os.rename(self.staged_path, stash_filename)

    def unstage_file(self):
        if self.stage_filename is None:
            return
        staged_path = self.staged_path
        try:
499
            link_file(staged_path, self.fspath)
500
501
502
            os.unlink(staged_path)
        except common.ConflictError:
            self.stash_staged_file()
503
        self.unregister_stage_name(self.stage_filename)
504
505
506
507
508


class LocalfsFileClient(FileClient):
    def __init__(self, settings):
        self.settings = settings
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
509
        self.SIGNATURE = "LocalfsFileClient"
510
511
512
        self.ROOTPATH = settings.local_root_path
        self.CACHEPATH = settings.cache_path
        self.get_db = settings.get_db
513
        self.probe_candidates = utils.ThreadSafeDict()
514

515
516
517
518
519
520
521
522
523
524
    def remove_candidates(self, objnames, ident):
        with self.probe_candidates.lock() as d:
            for objname in objnames:
                try:
                    cached = d.pop(objname)
                    if cached["ident"] != ident:
                        d[objname] = cached
                except KeyError:
                    pass

525
    def list_candidate_files(self, forced=False):
526
527
528
529
530
        with self.probe_candidates.lock() as d:
            if forced:
                candidates = self.walk_filesystem()
                d.update(candidates)
            return d.keys()
531

532
533
534
    def none_info(self):
        return {"ident": None, "info": None}

535
    def walk_filesystem(self):
536
        candidates = {}
537
538
539
540
        for dirpath, dirnames, files in os.walk(self.ROOTPATH):
            rel_dirpath = os.path.relpath(dirpath, start=self.ROOTPATH)
            logger.debug("'%s' '%s'" % (dirpath, rel_dirpath))
            if rel_dirpath != '.':
541
542
                objname = utils.to_standard_sep(rel_dirpath)
                candidates[objname] = self.none_info()
543
            for filename in files:
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
544
545
546
547
548
                if rel_dirpath == '.':
                    prefix = ""
                else:
                    prefix = utils.to_standard_sep(rel_dirpath)
                objname = utils.join_objname(prefix, filename)
549
                candidates[objname] = self.none_info()
550

551
        db_cands = dict((name, self.none_info())
552
                        for name in self.list_files())
553
554
555
        candidates.update(db_cands)
        logger.info("Candidates: %s" % candidates)
        return candidates
556

557
558
559
560
561
    @transaction()
    def list_files(self):
        db = self.get_db()
        return db.list_files(self.SIGNATURE)

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
562
563
    def _local_path_changes(self, name, state):
        local_path = utils.join_path(self.ROOTPATH, name)
564
565
        return local_path_changes(local_path, state)

566
567
568
569
570
571
572
573
    def exclude_file(self, objname):
        parts = objname.split(common.OBJECT_DIRSEP)
        init_part = parts[0]
        if init_part in [self.settings.cache_name]:
            return True
        final_part = parts[-1]
        return exclude_pattern.match(final_part)

574
    def probe_file(self, objname, old_state, ref_state, ident):
575
        with self.probe_candidates.lock() as d:
576
577
578
579
580
581
582
            try:
                cached = d[objname]
                cached_info = cached["info"]
                cached["ident"] = ident
            except KeyError:
                cached_info = None

583
584
585
586
587
        if self.exclude_file(objname):
            logger.warning("Ignoring probe archive: %s, object: %s" %
                           (old_state.archive, objname))
            return

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
588
        live_info = (self._local_path_changes(objname, old_state)
589
                     if cached_info is None else cached_info)
590
591
592
        if live_info is None:
            return
        live_state = old_state.set(info=live_info)
593
        return live_state
594
595
596
597
598
599
600

    def stage_file(self, source_state):
        return LocalfsSourceHandle(self.settings, source_state)

    def prepare_target(self, target_state):
        return LocalfsTargetHandle(self.settings, target_state)

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
601
    def notifier(self):
602
603
        def handle_path(path):
            rel_path = os.path.relpath(path, start=self.ROOTPATH)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
604
            objname = utils.to_standard_sep(rel_path)
605
            with self.probe_candidates.lock() as d:
606
                d[objname] = self.none_info()
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649

        class EventHandler(FileSystemEventHandler):
            def on_created(this, event):
                # if not event.is_directory:
                #     return
                path = event.src_path
                if path.startswith(self.CACHEPATH):
                    return
                logger.info("Handling %s" % event)
                handle_path(path)

            def on_deleted(this, event):
                path = event.src_path
                if path.startswith(self.CACHEPATH):
                    return
                logger.info("Handling %s" % event)
                handle_path(path)

            def on_modified(this, event):
                if event.is_directory:
                    return
                path = event.src_path
                if path.startswith(self.CACHEPATH):
                    return
                logger.info("Handling %s" % event)
                handle_path(path)

            def on_moved(this, event):
                src_path = event.src_path
                dest_path = event.dest_path
                if src_path.startswith(self.CACHEPATH) or \
                        dest_path.startswith(self.CACHEPATH):
                    return
                logger.info("Handling %s" % event)
                handle_path(src_path)
                handle_path(dest_path)

        path = self.ROOTPATH
        event_handler = EventHandler()
        observer = Observer()
        observer.schedule(event_handler, path, recursive=True)
        observer.start()
        return observer