localfs_client.py 24.9 KB
Newer Older
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# Copyright (C) 2015 GRNET S.A.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

16
import os
17
import stat
18
19
20
import re
import datetime
import psutil
21
import time
22
import filecmp
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
23
import shutil
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
24
import errno
25

26
27
28
29
30
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
import logging

from agkyra.syncer.file_client import FileClient
31
from agkyra.syncer import utils, common, messaging
32
33
34
35
36
37
38
39
40
41
42
from agkyra.syncer.database import transaction

logger = logging.getLogger(__name__)

LOCAL_FILE = 0
LOCAL_EMPTY_DIR = 1
LOCAL_NONEMPTY_DIR = 2
LOCAL_MISSING = 3
LOCAL_SOFTLINK = 4
LOCAL_OTHER = 5

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
43
DEFAULT_MTIME_PRECISION = 1e-4
44

45
46
47
48
exclude_regexes = ["\.#", "\.~", "~\$", "~.*\.tmp$", "\..*\.swp$"]
exclude_pattern = re.compile('|'.join(exclude_regexes))


49
50
51
52
53
class DirMissing(BaseException):
    pass


def link_file(src, dest):
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
54
    cmd = os.rename if utils.iswin() else os.link
55
    try:
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
56
        cmd(src, dest)
57
    except OSError as e:
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
58
        if e.errno == errno.EEXIST:
59
            raise common.ConflictError("Cannot link, '%s' exists." % dest)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
60
        if e.errno in [errno.ENOTDIR, errno.EINVAL]:
61
62
            raise common.ConflictError(
                "Cannot link, missing path for '%s'." % dest)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
63
        if e.errno == errno.ENOENT:
64
65
66
67
68
69
70
            raise DirMissing()


def make_dirs(path):
    try:
        os.makedirs(path)
    except OSError as e:
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
71
        if e.errno == errno.EEXIST and os.path.isdir(path):
72
            return
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
73
        if e.errno in [errno.EEXIST, errno.ENOTDIR, errno.ENOENT]:
74
75
76
77
            raise common.ConflictError("Cannot make dir '%s'." % path)
        raise


78
79
80
81
82
83
84
85
86
def list_dir(path):
    try:
        return os.listdir(path)
    except OSError as e:
        if e.errno in [errno.ENOTDIR, errno.ENOENT, errno.EINVAL]:
            return []
        raise


87
88
89
90
91
92
93
94
95
96
psutil_open_files = \
    (lambda proc: proc.open_files()) if psutil.version_info[0] >= 2 else \
    (lambda proc: proc.get_open_files())


def file_is_open(path):
    for proc in psutil.process_iter():
        try:
            flist = psutil_open_files(proc)
            for nt in flist:
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
97
98
99
100
101
                try:
                    nt_path = utils.to_unicode(nt.path)
                except UnicodeDecodeError as e:
                    continue
                if nt_path == path:
102
                    return True
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
103
        except psutil.Error:
104
105
106
107
            pass
    return False


108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
def is_iso_date(tstamp):
    try:
        datetime.datetime.strptime(tstamp, "%Y-%m-%dT%H:%M:%S.%f")
        return True
    except ValueError:
        return False


def get_orig_name(filename):
    parts = filename.split('_')
    if len(parts) < 3 or parts[-1] != utils.NODE or not is_iso_date(parts[-2]):
        return filename
    orig = "_".join(parts[:-2])
    if not orig:
        return filename
    return orig


126
def mk_stash_name(filename):
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
127
    tstamp = utils.str_time_stamp()
128
129
    orig = get_orig_name(filename)
    return orig + '_' + tstamp + '_' + utils.NODE
130
131
132


def eq_float(f1, f2):
133
    return abs(f1 - f2) < DEFAULT_MTIME_PRECISION
134
135
136


def files_equal(f1, f2):
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
137
    logger.debug("Comparing files: '%s', '%s'" % (f1, f2))
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
138
139
140
    try:
        return filecmp.cmp(f1, f2, shallow=False)
    except OSError as e:
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
141
        if e.errno in [errno.ENOENT, errno.ENOTDIR]:
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
142
143
            return False
        raise
144
145
146
147
148
149


def info_is_unhandled(info):
    return info != {} and info[LOCALFS_TYPE] == common.T_UNHANDLED


150
151
def local_path_changes(settings, path, state, unhandled_equal=True):
    live_info = get_live_info(settings, path)
152
    info = state.info
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
153
    if is_info_eq(live_info, info, unhandled_equal):
154
155
156
157
        return None
    return live_info


158
159
160
161
162
163
def is_actual_path(path):
    prefix = path.rstrip(os.path.sep)
    while True:
        prefix, basename = os.path.split(prefix)
        if not basename:
            return True
164
        if not basename in list_dir(prefix):
165
166
167
            return False

def get_live_info(settings, path):
168
169
    if path is None:
        return {}
170
171
    if settings.case_insensitive:
        if not is_actual_path(path):
172
            return {}
173
    stats, status = get_local_status(path)
174
175
176
177
178
179
    if status == LOCAL_MISSING:
        return {}
    if status in [LOCAL_SOFTLINK, LOCAL_OTHER]:
        return {LOCALFS_TYPE: common.T_UNHANDLED}
    if status in [LOCAL_EMPTY_DIR, LOCAL_NONEMPTY_DIR]:
        return {LOCALFS_TYPE: common.T_DIR}
180
    live_info = {LOCALFS_MTIME: stats.st_mtime,
181
                 LOCALFS_SIZE: stats[stat.ST_SIZE],
182
183
184
185
186
                 LOCALFS_TYPE: common.T_FILE,
                 }
    return live_info


187
def stat_file(path):
188
    try:
189
        return os.lstat(path)
190
    except OSError as e:
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
191
        if e.errno in [errno.ENOENT, errno.ENOTDIR]:
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
            return None
        raise


LOCALFS_TYPE = "localfs_type"
LOCALFS_MTIME = "localfs_mtime"
LOCALFS_SIZE = "localfs_size"


def status_of_info(info):
    if info == {}:
        return LOCAL_MISSING
    if info[LOCALFS_TYPE] == common.T_DIR:
        return LOCAL_EMPTY_DIR
    if info[LOCALFS_TYPE] == common.T_UNHANDLED:
        return LOCAL_OTHER  # shouldn't happen
    return LOCAL_FILE


211
212
def get_local_status(path, attempt=0):
    stats = stat_file(path)
213
    try:
214
        status = _get_local_status_from_stats(stats, path)
215
    except OSError as e:
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
        logger.warning("Got error '%s' while listing dir '%s'" % (e, path))
        if attempt > 2:
            raise
        return get_local_status(path, attempt + 1)
    return stats, status


def _get_local_status_from_stats(stats, path):
    if stats is None:
        return LOCAL_MISSING
    mode = stats[stat.ST_MODE]
    if stat.S_ISLNK(mode):
        return LOCAL_SOFTLINK
    if stat.S_ISREG(mode):
        return LOCAL_FILE
    if stat.S_ISDIR(mode):
232
        if list_dir(path):
233
234
235
236
237
238
239
240
            return LOCAL_NONEMPTY_DIR
        return LOCAL_EMPTY_DIR
    return LOCAL_OTHER


def path_status(path):
    stats, status = get_local_status(path)
    return status
241
242


Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
243
244
245
246
def info_of_regular_file(info):
    return info and info[LOCALFS_TYPE] == common.T_FILE


Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
247
def is_info_eq(info1, info2, unhandled_equal=True):
248
249
250
251
252
    if {} in [info1, info2]:
        return info1 == info2
    if info1[LOCALFS_TYPE] != info2[LOCALFS_TYPE]:
        return False
    if info1[LOCALFS_TYPE] == common.T_UNHANDLED:
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
253
        return unhandled_equal
254
255
256
257
258
259
260
261
    if info1[LOCALFS_TYPE] == common.T_DIR:
        return True
    return eq_float(info1[LOCALFS_MTIME], info2[LOCALFS_MTIME]) \
        and info1[LOCALFS_SIZE] == info2[LOCALFS_SIZE]


class LocalfsTargetHandle(object):
    def __init__(self, settings, target_state):
262
        self.settings = settings
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
263
        self.SIGNATURE = "LocalfsTargetHandle"
264
265
266
267
268
        self.rootpath = settings.local_root_path
        self.cache_hide_name = settings.cache_hide_name
        self.cache_hide_path = settings.cache_hide_path
        self.cache_path = settings.cache_path
        self.get_db = settings.get_db
269
        self.mtime_lag = settings.mtime_lag
270
        self.target_state = target_state
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
271
        self.objname = target_state.objname
272
        self.fspath = utils.join_path(self.rootpath, self.objname)
273
274
275
        self.hidden_filename = None
        self.hidden_path = None

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
276
277
    def get_path_in_cache(self, name):
        return utils.join_path(self.cache_path, name)
278
279
280
281
282
283
284

    @transaction()
    def register_hidden_name(self, filename):
        db = self.get_db()
        f = utils.hash_string(filename)
        hide_filename = utils.join_path(self.cache_hide_name, f)
        self.hidden_filename = hide_filename
285
        self.hidden_path = self.get_path_in_cache(self.hidden_filename)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
286
        if db.get_cachename(hide_filename):
287
            return False
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
288
        db.insert_cachename(hide_filename, self.SIGNATURE, filename)
289
290
291
292
293
        return True

    @transaction()
    def unregister_hidden_name(self, hidden_filename):
        db = self.get_db()
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
294
        db.delete_cachename(hidden_filename)
295
        self.hidden_filename = None
296
        self.hidden_path = None
297

298
    def move_file(self):
299
300
        fspath = self.fspath
        if file_is_open(fspath):
301
            raise common.BusyError("File '%s' is open. Aborting."
302
                                   % fspath)
303

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
304
        new_registered = self.register_hidden_name(self.objname)
305
        hidden_filename = self.hidden_filename
306
        hidden_path = self.hidden_path
307
308
309

        if not new_registered:
            logger.warning("Hiding already registered for file %s" %
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
310
                           (self.objname,))
311
312
            if os.path.lexists(hidden_path):
                logger.warning("File %s already hidden at %s" %
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
313
                               (self.objname, hidden_path))
314
315
                return
        try:
316
            os.rename(fspath, hidden_path)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
317
318
            logger.debug("Hiding file '%s' to '%s'" %
                         (fspath, hidden_path))
319
        except OSError as e:
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
320
            if e.errno in [errno.ENOENT, errno.ENOTDIR]:
321
                self.unregister_hidden_name(hidden_filename)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
322
                logger.debug("File '%s' does not exist" % fspath)
323
                return
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
324
325
326
327
            elif e.errno == errno.EACCES:
                self.unregister_hidden_name(hidden_filename)
                raise common.BusyError("File '%' is open. Undoing." %
                                       hidden_path)
328
329
            else:
                raise e
330
331
332
333
334
335
336
337
338
339
340
341
342

    def hide_file(self):
        self.move_file()
        if self.hidden_filename is not None:
            if file_is_open(self.hidden_path):
                os.rename(self.hidden_path, self.fspath)
                self.unregister_hidden_name(self.hidden_filename)
                raise common.BusyError("File '%s' is open. Undoing." %
                                       self.hidden_path)
            if path_status(self.hidden_path) == LOCAL_NONEMPTY_DIR:
                os.rename(self.hidden_path, self.fspath)
                self.unregister_hidden_name(self.hidden_filename)
                raise common.ConflictError("'%s' is non-empty" % self.fspath)
343

344
    def apply(self, fetched_path, fetched_live_info, sync_state):
345
        local_status = path_status(self.fspath)
346
347
348
349
350
351
352
        fetched_status = status_of_info(fetched_live_info)
        if local_status in [LOCAL_EMPTY_DIR, LOCAL_NONEMPTY_DIR] \
                and fetched_status == LOCAL_EMPTY_DIR:
            return
        if local_status == LOCAL_MISSING and fetched_status == LOCAL_MISSING:
            return
        if local_status == LOCAL_NONEMPTY_DIR:
353
            raise common.ConflictError("'%s' is non-empty" % self.fspath)
354

355
356
        self.prepare(fetched_path, sync_state)
        self.finalize(fetched_path, fetched_live_info)
357
358
359
        self.cleanup(self.hidden_path)
        self.unregister_hidden_name(self.hidden_filename)

360
    def prepare(self, fetched_path, sync_state):
361
        self.hide_file()
362
        info_changed = local_path_changes(self.settings,
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
363
            self.hidden_path, sync_state, unhandled_equal=False)
364
        if info_changed is not None and info_changed != {}:
365
            if not files_equal(self.hidden_path, fetched_path):
366
367
368
                self.stash_file()

    def stash_file(self):
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
369
370
        stash_name = mk_stash_name(self.objname)
        stash_path = utils.join_path(self.rootpath, stash_name)
371
372
373
        msg = messaging.ConflictStashMessage(
            objname=self.objname, stash_name=stash_name, logger=logger)
        self.settings.messager.put(msg)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
374
        os.rename(self.hidden_path, stash_path)
375

376
    def finalize(self, filepath, live_info):
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
377
        logger.debug("Finalizing file '%s'" % filepath)
378
379
        if live_info == {}:
            return
380
        if live_info[LOCALFS_TYPE] == common.T_FILE:
381
            time.sleep(self.mtime_lag)
382
            try:
383
                link_file(filepath, self.fspath)
384
            except DirMissing:
385
                make_dirs(os.path.dirname(self.fspath))
386
                link_file(filepath, self.fspath)
387
        elif live_info[LOCALFS_TYPE] == common.T_DIR:
388
            make_dirs(self.fspath)
389
390
        else:
            raise AssertionError("info for fetched file '%s' is %s" %
391
                                 (filepath, live_info))
392

393
394
    def cleanup(self, filepath):
        if filepath is None:
395
            return
396
        status = path_status(filepath)
397
398
        if status == LOCAL_FILE:
            try:
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
399
                logger.debug("Cleaning up file '%s'" % filepath)
400
                os.unlink(filepath)
401
402
403
            except:
                pass
        elif status in [LOCAL_EMPTY_DIR, LOCAL_NONEMPTY_DIR]:
404
            os.rmdir(filepath)
405
406

    def pull(self, source_handle, sync_state):
407
        fetched_path = source_handle.send_file(sync_state)
408
        fetched_live_info = get_live_info(self.settings, fetched_path)
409
410
        self.apply(fetched_path, fetched_live_info, sync_state)
        self.cleanup(fetched_path)
411
412
413
414
415
416
417
418
419
420
        return self.target_state.set(info=fetched_live_info)


class LocalfsSourceHandle(object):
    @transaction()
    def register_stage_name(self, filename):
        db = self.get_db()
        f = utils.hash_string(filename)
        stage_filename = utils.join_path(self.cache_stage_name, f)
        self.stage_filename = stage_filename
421
422
        stage_path = self.get_path_in_cache(stage_filename)
        self.staged_path = stage_path
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
423
        if db.get_cachename(stage_filename):
424
            return False
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
425
        db.insert_cachename(stage_filename, self.SIGNATURE, filename)
426
427
428
429
430
        return True

    @transaction()
    def unregister_stage_name(self, stage_filename):
        db = self.get_db()
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
431
        db.delete_cachename(stage_filename)
432
        self.stage_filename = None
433
        self.staged_path = None
434

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
435
436
    def get_path_in_cache(self, name):
        return utils.join_path(self.cache_path, name)
437

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
438
    def copy_file(self):
439
        fspath = self.fspath
440
        if file_is_open(fspath):
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
441
442
            raise common.OpenBusyError("File '%s' is open. Aborting"
                                       % fspath)
443
        new_registered = self.register_stage_name(fspath)
444
        stage_filename = self.stage_filename
445
        stage_path = self.staged_path
446
447
448

        if not new_registered:
            logger.warning("Staging already registered for file %s" %
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
449
                           (self.objname,))
450
451
            if os.path.lexists(stage_path):
                logger.warning("File %s already staged at %s" %
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
452
                               (self.objname, stage_path))
453
                return
454

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
455
        logger.debug("Staging file '%s' to '%s'" % (self.objname, stage_path))
456
        try:
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
457
458
            shutil.copy2(fspath, stage_path)
        except IOError as e:
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
459
            if e.errno in [errno.ENOENT, errno.EISDIR]:
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
460
                logger.debug("Source is not a regular file: '%s'" % fspath)
461
                self.unregister_stage_name(stage_filename)
462
                return
463
464
            else:
                raise e
465

466
    def stage_file(self):
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
467
        self.copy_file()
468
        live_info = get_live_info(self.settings, self.fspath)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
        self.check_staged(live_info)
        self.check_update_source_state(live_info)

    def check_staged(self, live_info):
        is_reg = info_of_regular_file(live_info)

        if self.staged_path is None:
            if is_reg:
                m = ("File '%s' is not in a stable state; unstaged"
                     % self.objname)
                logger.warning(m)
                raise common.NotStableBusyError(m)
            return

        if not is_reg:
            os.unlink(self.staged_path)
            self.unregister_stage_name(self.stage_filename)
            logger.warning("Path '%s' is not a regular file; unstaged")
            return

        if file_is_open(self.fspath):
            os.unlink(self.staged_path)
            self.unregister_stage_name(self.stage_filename)
            m = "File '%s' is open; unstaged" % self.objname
            logger.warning(m)
            raise common.OpenBusyError(m)

        if not files_equal(self.staged_path, self.fspath):
            os.unlink(self.staged_path)
            self.unregister_stage_name(self.stage_filename)
            m = "File '%s' contents have changed; unstaged" % self.objname
            logger.warning(m)
            raise common.ChangedBusyError(m)
502
503

    def __init__(self, settings, source_state):
504
        self.settings = settings
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
505
        self.SIGNATURE = "LocalfsSourceHandle"
506
507
508
509
510
511
        self.rootpath = settings.local_root_path
        self.cache_stage_name = settings.cache_stage_name
        self.cache_stage_path = settings.cache_stage_path
        self.cache_path = settings.cache_path
        self.get_db = settings.get_db
        self.source_state = source_state
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
512
        self.objname = source_state.objname
513
        self.fspath = utils.join_path(self.rootpath, self.objname)
514
515
516
        self.stage_filename = None
        self.staged_path = None
        self.heartbeat = settings.heartbeat
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
517
        if info_of_regular_file(self.source_state.info):
518
            self.stage_file()
519

520
521
522
523
524
    @transaction()
    def update_state(self, state):
        db = self.get_db()
        db.put_state(state)

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
525
526
    def check_update_source_state(self, live_info):
        if not is_info_eq(live_info, self.source_state.info):
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
527
528
529
530
            msg = messaging.LiveInfoUpdateMessage(
                archive=self.SIGNATURE, objname=self.objname,
                info=live_info, logger=logger)
            self.settings.messager.put(msg)
531
532
533
534
            new_state = self.source_state.set(info=live_info)
            self.update_state(new_state)
            self.source_state = new_state

535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
    def get_synced_state(self):
        return self.source_state

    def info_is_dir(self):
        try:
            return self.source_state.info[LOCALFS_TYPE] == common.T_DIR
        except KeyError:
            return False

    def info_is_deleted(self):
        return self.source_state.info == {}

    def info_is_deleted_or_unhandled(self):
        return self.source_state.info == {} \
            or self.source_state.info[LOCALFS_TYPE] == common.T_UNHANDLED

    def stash_staged_file(self):
552
        stash_filename = mk_stash_name(self.fspath)
553
        logger.warning("Stashing file '%s' to '%s'" %
554
                       (self.fspath, stash_filename))
555
556
557
558
559
560
        os.rename(self.staged_path, stash_filename)

    def unstage_file(self):
        if self.stage_filename is None:
            return
        staged_path = self.staged_path
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
561
        os.unlink(staged_path)
562
        self.unregister_stage_name(self.stage_filename)
563
564
565
566
567


class LocalfsFileClient(FileClient):
    def __init__(self, settings):
        self.settings = settings
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
568
        self.SIGNATURE = "LocalfsFileClient"
569
570
571
        self.ROOTPATH = settings.local_root_path
        self.CACHEPATH = settings.cache_path
        self.get_db = settings.get_db
572
        self.probe_candidates = utils.ThreadSafeDict()
573
574
575
576
577
        self.check_enabled()

    def check_enabled(self):
        if not self.settings.localfs_is_enabled():
            msg = messaging.LocalfsSyncDisabled(logger=logger)
578
579
580
        else:
            msg = messaging.LocalfsSyncEnabled(logger=logger)
        self.settings.messager.put(msg)
581

582
583
584
585
586
587
588
589
590
591
    def remove_candidates(self, objnames, ident):
        with self.probe_candidates.lock() as d:
            for objname in objnames:
                try:
                    cached = d.pop(objname)
                    if cached["ident"] != ident:
                        d[objname] = cached
                except KeyError:
                    pass

592
    def list_candidate_files(self, forced=False):
593
594
595
596
597
598
599
600
        if not self.settings.localfs_is_enabled():
            return {}
        if not os.path.isdir(self.ROOTPATH):
            self.settings.set_localfs_enabled(False)
            msg = messaging.LocalfsSyncDisabled(logger=logger)
            self.settings.messager.put(msg)
            return {}

601
602
603
604
605
        with self.probe_candidates.lock() as d:
            if forced:
                candidates = self.walk_filesystem()
                d.update(candidates)
            return d.keys()
606

607
608
609
    def none_info(self):
        return {"ident": None, "info": None}

610
    def walk_filesystem(self):
611
        candidates = {}
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
612
613
614
615
616
617
        rootpath = utils.from_unicode(self.ROOTPATH)
        for dirpath, dirnames, files in os.walk(rootpath):
            try:
                dirpath = utils.to_unicode(dirpath)
            except UnicodeDecodeError as e:
                continue
618
619
620
            rel_dirpath = os.path.relpath(dirpath, start=self.ROOTPATH)
            logger.debug("'%s' '%s'" % (dirpath, rel_dirpath))
            if rel_dirpath != '.':
621
622
                objname = utils.to_standard_sep(rel_dirpath)
                candidates[objname] = self.none_info()
623
            for filename in files:
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
624
625
626
627
                try:
                    filename = utils.to_unicode(filename)
                except UnicodeDecodeError as e:
                    continue
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
628
629
630
631
632
                if rel_dirpath == '.':
                    prefix = ""
                else:
                    prefix = utils.to_standard_sep(rel_dirpath)
                objname = utils.join_objname(prefix, filename)
633
                candidates[objname] = self.none_info()
634

635
        db_cands = dict((name, self.none_info())
636
                        for name in self.list_files())
637
        candidates.update(db_cands)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
638
        logger.debug("Candidates: %s" % candidates)
639
        return candidates
640

641
642
643
644
645
    @transaction()
    def list_files(self):
        db = self.get_db()
        return db.list_files(self.SIGNATURE)

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
646
647
    def _local_path_changes(self, name, state):
        local_path = utils.join_path(self.ROOTPATH, name)
648
        return local_path_changes(self.settings, local_path, state)
649

650
651
652
653
654
655
656
657
    def exclude_file(self, objname):
        parts = objname.split(common.OBJECT_DIRSEP)
        init_part = parts[0]
        if init_part in [self.settings.cache_name]:
            return True
        final_part = parts[-1]
        return exclude_pattern.match(final_part)

658
    def probe_file(self, objname, old_state, ref_state, ident):
659
        with self.probe_candidates.lock() as d:
660
661
662
663
664
665
666
            try:
                cached = d[objname]
                cached_info = cached["info"]
                cached["ident"] = ident
            except KeyError:
                cached_info = None

667
        if self.exclude_file(objname):
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
668
669
670
            msg = messaging.IgnoreProbeMessage(
                archive=old_state.archive, objname=objname, logger=logger)
            self.settings.messager.put(msg)
671
672
            return

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
673
        live_info = (self._local_path_changes(objname, old_state)
674
                     if cached_info is None else cached_info)
675
676
677
        if live_info is None:
            return
        live_state = old_state.set(info=live_info)
678
        return live_state
679
680
681
682
683
684
685

    def stage_file(self, source_state):
        return LocalfsSourceHandle(self.settings, source_state)

    def prepare_target(self, target_state):
        return LocalfsTargetHandle(self.settings, target_state)

686
687
688
689
690
    @transaction()
    def get_dir_contents(self, objname):
        db = self.get_db()
        return db.get_dir_contents(self.SIGNATURE, objname)

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
691
    def notifier(self):
692
        def handle_path(path, rec=False):
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
693
694
695
696
            try:
                path = utils.to_unicode(path)
            except UnicodeDecodeError as e:
                return
697
698
            if path.startswith(self.CACHEPATH):
                return
699
            rel_path = os.path.relpath(path, start=self.ROOTPATH)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
700
            objname = utils.to_standard_sep(rel_path)
701
            with self.probe_candidates.lock() as d:
702
                d[objname] = self.none_info()
703
704
705
706
                if rec:
                    leaves = self.get_dir_contents(objname)
                    for leaf in leaves:
                        d[leaf] = self.none_info()
707

708
        root_path = utils.from_unicode(self.ROOTPATH)
709
710
711
712
713
        class EventHandler(FileSystemEventHandler):
            def on_created(this, event):
                # if not event.is_directory:
                #     return
                path = event.src_path
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
714
                logger.debug("Handling %s" % event)
715
716
717
718
                handle_path(path)

            def on_deleted(this, event):
                path = event.src_path
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
719
                logger.debug("Handling %s" % event)
720
721
722
723
                if path == root_path:
                    self.settings.set_localfs_enabled(False)
                    msg = messaging.LocalfsSyncDisabled(logger=logger)
                    self.settings.messager.put(msg)
724
                handle_path(path, rec=utils.iswin())
725
726
727
728
729

            def on_modified(this, event):
                if event.is_directory:
                    return
                path = event.src_path
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
730
                logger.debug("Handling %s" % event)
731
732
733
734
735
                handle_path(path)

            def on_moved(this, event):
                src_path = event.src_path
                dest_path = event.dest_path
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
736
                logger.debug("Handling %s" % event)
737
738
739
740
741
                handle_path(src_path)
                handle_path(dest_path)

        event_handler = EventHandler()
        observer = Observer()
742
        observer.schedule(event_handler, root_path, recursive=True)
743
744
        observer.start()
        return observer