syncer.py 21.9 KB
Newer Older
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
# Copyright (C) 2015 GRNET S.A.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

16 17
import threading
import logging
18
from collections import defaultdict
19
import Queue
20 21 22

from agkyra.syncer import common
from agkyra.syncer.setup import SyncerSettings
23
from agkyra.syncer.database import TransactedConnection
24 25
from agkyra.syncer.localfs_client import LocalfsFileClient
from agkyra.syncer.pithos_client import PithosFileClient
26
from agkyra.syncer import messaging, utils
27 28 29 30

logger = logging.getLogger(__name__)


31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
class HandleSyncErrors(object):
    def __init__(self, state, messager, callback=None):
        self.state = state
        self.callback = callback
        self.messager = messager

    def __enter__(self):
        pass

    def __exit__(self, exctype, value, traceback):
        if value is None:
            return
        if not isinstance(value, common.SyncError):
            return False  # re-raise
        hard = isinstance(value, common.HardSyncError)
        if self.callback is not None:
            self.callback(self.state, hard=hard)
        msg = messaging.SyncErrorMessage(
            objname=self.state.objname,
            serial=self.state.serial,
            exception=value, logger=logger)
        self.messager.put(msg)
        return True


56 57 58 59 60 61 62 63 64 65 66
class FileSyncer(object):

    dbname = None
    clients = None

    def __init__(self, settings, master, slave):
        self.settings = settings
        self.master = master
        self.slave = slave
        self.DECISION = 'DECISION'
        self.SYNC = 'SYNC'
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
67 68
        self.MASTER = master.SIGNATURE
        self.SLAVE = slave.SIGNATURE
69
        self.syncer_dbtuple = settings.syncer_dbtuple
70
        self.clients = {self.MASTER: master, self.SLAVE: slave}
71 72 73
        self.notifiers = {}
        self.decide_thread = None
        self.sync_threads = []
74
        self.failed_serials = utils.ThreadSafeDict()
75
        self.sync_queue = Queue.Queue()
76
        self.messager = settings.messager
77
        self.heartbeat = self.settings.heartbeat
78

79 80 81 82 83 84 85
    def thread_is_active(self, t):
        return t and t.is_alive()

    @property
    def decide_active(self):
        return self.thread_is_active(self.decide_thread)

86 87
    @property
    def paused(self):
88
        return not self.decide_active
89

90
    def initiate_probe(self):
91
        self.start_notifiers()
92
        self.probe_all(forced=True)
93 94

    def start_notifiers(self):
95 96 97
        for signature, client in self.clients.iteritems():
            notifier = self.notifiers.get(signature)
            if not self.thread_is_active(notifier):
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
98
                self.notifiers[signature] = client.notifier()
99
            else:
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
100
                logger.info("Notifier %s already up" % signature)
101

102
    def stop_notifiers(self, timeout=None):
103
        for notifier in self.notifiers.values():
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
104 105 106 107 108 109 110
            try:
                notifier.stop()
            except KeyError as e:
                # bypass watchdog inotify bug that causes a KeyError
                # when attempting to stop a notifier after the watched
                # directory has been deleted
                logger.warning("Ignored KeyError: %s" % e)
111 112 113 114 115
            except TypeError as e:
                # bypass watchdog osx bug that causes a TypeError
                # when attempting to stop a notifier after the watched
                # directory has been deleted
                logger.warning("Ignored TypeError: %s" % e)
116
        return utils.wait_joins(self.notifiers.values(), timeout)
117 118

    def start_decide(self):
119 120 121
        if not self.decide_active:
            self.decide_thread = self._poll_decide()

122
    def stop_decide(self, timeout=None):
123 124
        if self.decide_active:
            self.decide_thread.stop()
125 126
            return utils.wait_joins([self.decide_thread], timeout)
        return timeout
127

128 129 130
    def stop_all_daemons(self, timeout=None):
        remaining = self.stop_decide(timeout=timeout)
        return self.stop_notifiers(timeout=remaining)
131

132 133
    def wait_sync_threads(self, timeout=None):
        return utils.wait_joins(self.sync_threads, timeout=timeout)
134

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
135 136
    def get_next_message(self, block=False):
        return self.messager.get(block=block)
137

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
138
    def probe_file(self, archive, objname):
139
        ident = utils.time_stamp()
140 141 142 143 144 145
        try:
            self._probe_files(archive, [objname], ident)
            client = self.clients[archive]
            client.remove_candidates([objname], ident)
        except common.DatabaseError:
            pass
146

147 148 149
    def reg_name(self, objname):
        return utils.reg_name(self.settings, objname)

150
    def _probe_files(self, archive, objnames, ident):
151 152 153
        with TransactedConnection(self.syncer_dbtuple) as db:
            for objname in objnames:
                self._do_probe_file(db, archive, objname, ident)
154

155
    def _do_probe_file(self, db, archive, objname, ident):
156
        logger.debug("Probing archive: %s, object: '%s'" % (archive, objname))
157
        client = self.clients[archive]
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
158 159
        db_state = db.get_state(archive, objname)
        ref_state = db.get_state(self.SYNC, objname)
160
        with self.heartbeat.lock() as hb:
161
            beat = hb.get(self.reg_name(objname))
162
            if beat is not None:
163 164
                beat_thread = beat["thread"]
                if beat_thread is None or beat_thread.is_alive():
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
165 166 167 168
                    msg = messaging.HeartbeatNoProbeMessage(
                        archive=archive, objname=objname, heartbeat=beat,
                        logger=logger)
                    self.messager.put(msg)
169
                    return
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
170
        if db_state.serial != ref_state.serial:
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
171
            msg = messaging.AlreadyProbedMessage(
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
172 173
                archive=archive, objname=objname, serial=db_state.serial,
                logger=logger)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
174
            self.messager.put(msg)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
175
            return
176
        live_state = client.probe_file(objname, db_state, ref_state, ident)
177
        if live_state is not None:
178
            self.update_file_state(db, live_state)
179

180
    def update_file_state(self, db, live_state):
181
        archive = live_state.archive
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
182
        objname = live_state.objname
183
        serial = live_state.serial
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
184
        db_state = db.get_state(archive, objname)
185 186
        if db_state and db_state.serial != serial:
            logger.warning(
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
187
                "Cannot update archive: %s, object: '%s', "
188
                "serial: %s, db_serial: %s" %
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
189
                (archive, objname, serial, db_state.serial))
190 191
            return

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
192
        new_serial = db.new_serial(objname)
193 194
        new_state = live_state.set(serial=new_serial)
        db.put_state(new_state)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
195 196 197 198
        msg = messaging.UpdateMessage(
            archive=archive, objname=objname,
            serial=new_serial, old_serial=serial, logger=logger)
        self.messager.put(msg)
199 200
        if new_serial == 0:
            sync_state = common.FileState(
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
201
                archive=self.SYNC, objname=objname, serial=-1,
202 203 204
                info={})
            db.put_state(sync_state)

205 206 207 208 209 210
    def dry_run_decisions(self, objnames, master=None, slave=None):
        if master is None:
            master = self.MASTER
        if slave is None:
            slave = self.SLAVE
        decisions = []
211 212 213 214
        with TransactedConnection(self.syncer_dbtuple) as db:
            for objname in objnames:
                decision = self._dry_run_decision(db, objname, master, slave)
                decisions.append(decision)
215 216
        return decisions

217
    def _dry_run_decision(self, db, objname, master=None, slave=None):
218 219 220 221 222
        if master is None:
            master = self.MASTER
        if slave is None:
            slave = self.SLAVE
        ident = utils.time_stamp()
223
        return self._do_decide_file_sync(db, objname, master, slave, ident, True)
224

225
    def decide_file_syncs(self, objnames, master=None, slave=None):
226 227 228 229
        if master is None:
            master = self.MASTER
        if slave is None:
            slave = self.SLAVE
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
230
        ident = utils.time_stamp()
231
        syncs = []
232
        try:
233
            with TransactedConnection(self.syncer_dbtuple) as db:
234 235 236 237 238
                for objname in objnames:
                    states = self._decide_file_sync(
                        db, objname, master, slave, ident)
                    if states is not None:
                        syncs.append(states)
239
        except common.DatabaseError:
240
            self.clean_heartbeat(objnames, ident)
241
            return
242
        self.enqueue_syncs(syncs)
243

244 245 246 247 248 249 250 251
    def decide_file_sync(self, objname, master=None, slave=None):
        if master is None:
            master = self.MASTER
        if slave is None:
            slave = self.SLAVE
        self.decide_file_syncs([objname], master, slave)

    def clean_heartbeat(self, objnames, ident=None):
252
        with self.heartbeat.lock() as hb:
253 254 255 256 257 258 259 260 261
            for objname in objnames:
                beat = hb.pop(self.reg_name(objname), None)
                if beat is None:
                    return
                if ident and ident != beat["ident"]:
                    hb[self.reg_name(objname)] = beat
                else:
                    logger.debug("cleaning heartbeat %s, object '%s'"
                                 % (beat, objname))
262 263

    def _decide_file_sync(self, db, objname, master, slave, ident):
264 265 266
        if not self.settings._sync_is_enabled(db):
            logger.warning("Cannot decide '%s'; sync disabled." % objname)
            return
267
        states = self._do_decide_file_sync(db, objname, master, slave, ident)
268 269
        if states is not None:
            with self.heartbeat.lock() as hb:
270
                beat = {"ident": ident, "thread": None}
271
                hb[self.reg_name(objname)] = beat
272 273
        return states

274
    def _do_decide_file_sync(self, db, objname, master, slave, ident,
275
                             dry_run=False):
276
        logger.debug("Deciding object: '%s'" % objname)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
277 278 279 280
        master_state = db.get_state(master, objname)
        slave_state = db.get_state(slave, objname)
        sync_state = db.get_state(self.SYNC, objname)
        decision_state = db.get_state(self.DECISION, objname)
281 282 283
        master_serial = master_state.serial
        slave_serial = slave_state.serial
        sync_serial = sync_state.serial
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
284
        decision_serial = decision_state.serial
285

286
        with self.heartbeat.lock() as hb:
287
            beat = hb.get(self.reg_name(objname))
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
288
            logger.debug("object: %s heartbeat: %s" % (objname, beat))
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
289 290
            if beat is not None:
                if beat["ident"] == ident:
291 292 293 294 295 296
                    logger.warning(
                        "Found used heartbeat ident %s for object %s" %
                        (ident, objname))
                    return None
                beat_thread = beat["thread"]
                if beat_thread is None or beat_thread.is_alive():
297
                    if not dry_run:
298
                        msg = messaging.HeartbeatNoDecideMessage(
299 300
                            objname=objname, heartbeat=beat, logger=logger)
                        self.messager.put(msg)
301
                    return None
302 303 304 305 306 307 308
                if utils.younger_than(beat["ident"],
                                      self.settings.action_max_wait):
                    if not dry_run:
                        msg = messaging.HeartbeatSkipDecideMessage(
                            objname=objname, heartbeat=beat, logger=logger)
                        self.messager.put(msg)
                    return None
309 310
                logger.debug("Ignoring previous run: %s %s" %
                             (objname, beat))
311

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
312
        if decision_serial != sync_serial:
313 314
            with self.failed_serials.lock() as d:
                failed_sync = d.get((decision_serial, objname))
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
315
            if failed_sync is None:
316
                logger.debug(
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
317
                    "Already decided: '%s', decision: %s, sync: %s" %
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
318
                    (objname, decision_serial, sync_serial))
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
319 320 321 322 323 324
                if decision_serial == master_serial:
                    return master_state, slave_state, sync_state
                elif decision_serial == slave_serial:
                    return slave_state, master_state, sync_state
                else:
                    raise AssertionError(
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
325
                        "Decision serial %s for objname '%s' "
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
326
                        "does not match any archive." %
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
327
                        (decision_serial, objname))
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
328
            else:
329 330 331 332
                if not dry_run:
                    msg = messaging.FailedSyncIgnoreDecisionMessage(
                        objname=objname, serial=decision_serial, logger=logger)
                    self.messager.put(msg)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
333 334

        if master_serial > sync_serial:
335 336
            if master_serial == decision_serial:  # this is a failed serial
                return None
337
            if not dry_run:
338
                self._make_decision_state(db, decision_state, master_state)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
339
            return master_state, slave_state, sync_state
340 341
        elif master_serial == sync_serial:
            if slave_serial > sync_serial:
342 343
                if slave_serial == decision_serial:  # this is a failed serial
                    return None
344
                if not dry_run:
345
                    self._make_decision_state(db, decision_state, slave_state)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
346
                return slave_state, master_state, sync_state
347 348 349 350 351 352 353 354 355 356
            elif slave_serial == sync_serial:
                return None
            else:
                raise AssertionError("Slave serial %s, sync serial %s"
                                     % (slave_serial, sync_serial))

        else:
            raise AssertionError("Master serial %s, sync serial %s"
                                 % (master_serial, sync_serial))

357
    def _make_decision_state(self, db, decision_state, source_state):
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
358 359 360 361
        new_decision_state = decision_state.set(
            serial=source_state.serial, info=source_state.info)
        db.put_state(new_decision_state)

362 363 364 365 366 367 368 369 370 371 372 373
    def enqueue_syncs(self, syncs):
        for sync in syncs:
            self.sync_queue.put(sync)

    def launch_syncs(self):
        with self.heartbeat.lock() as hb:
            alive_threads = len([v for v in hb.values()
                                 if v["thread"] is not None
                                 and v["thread"].is_alive()])
        max_alive_threads = self.settings.max_alive_sync_threads
        new_threads = max_alive_threads - alive_threads
        if new_threads > 0:
374
            logger.debug("Can start max %s syncs" % new_threads)
375 376 377 378 379 380 381
            for i in range(new_threads):
                try:
                    tpl = self.sync_queue.get(block=False)
                    self.sync_file(*tpl)
                except Queue.Empty:
                    break

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
382
    def sync_file(self, source_state, target_state, sync_state):
383 384 385 386
        msg = messaging.SyncMessage(
            objname=source_state.objname,
            archive=source_state.archive,
            serial=source_state.serial,
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
387
            info=source_state.info,
388 389
            logger=logger)
        self.messager.put(msg)
390
        thread = threading.Thread(
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
391
            target=self._sync_file,
392
            args=(source_state, target_state, sync_state))
393 394 395 396 397 398 399
        with self.heartbeat.lock() as hb:
            beat = hb.get(self.reg_name(source_state.objname))
            if beat is None:
                raise AssertionError("heartbeat for %s is None" %
                                     source_state.objname)
            assert beat["thread"] is None
            beat["thread"] = thread
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
400
        thread.daemon = True
401
        thread.start()
402
        self.sync_threads.append(thread)
403

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
404
    def _sync_file(self, source_state, target_state, sync_state):
405 406 407
        clients = self.clients
        source_client = clients[source_state.archive]
        target_client = clients[target_state.archive]
408 409 410 411 412 413
        with HandleSyncErrors(
                source_state, self.messager, self.mark_as_failed):
            source_handle = source_client.stage_file(source_state)
            target_client.start_pulling_file(
                source_handle, target_state, sync_state,
                callback=self.ack_file_sync)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
414

415
    def mark_as_failed(self, state, hard=False):
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
416
        serial = state.serial
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
417
        objname = state.objname
418 419 420 421 422 423
        if hard:
            logger.warning(
                "Marking failed serial %s for archive: %s, object: '%s'" %
                (serial, state.archive, objname))
            with self.failed_serials.lock() as d:
                d[(serial, objname)] = state
424

425
    def update_state(self, db, old_state, new_state):
426 427 428 429
        db.put_state(new_state)
        # here we could do any checks needed on the old state,
        # perhaps triggering a probe

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
430
    def ack_file_sync(self, synced_source_state, synced_target_state):
431 432
        with TransactedConnection(self.syncer_dbtuple) as db:
            self._ack_file_sync(db, synced_source_state, synced_target_state)
433
        serial = synced_source_state.serial
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
434
        objname = synced_source_state.objname
435
        target = synced_target_state.archive
436
        self.clean_heartbeat([objname])
437 438 439 440
        msg = messaging.AckSyncMessage(
            archive=target, objname=objname, serial=serial,
            logger=logger)
        self.messager.put(msg)
441

442
    def _ack_file_sync(self, db, synced_source_state, synced_target_state):
443 444 445 446
        serial = synced_source_state.serial
        objname = synced_source_state.objname
        source = synced_source_state.archive
        target = synced_target_state.archive
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
447
        tinfo = synced_target_state.info
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
448 449 450
        logger.debug("Acking archive: %s, object: '%s', serial: %s "
                     "info: %s" %
                     (target, objname, serial, tinfo))
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
451 452
        decision_state = db.get_state(self.DECISION, objname)
        sync_state = db.get_state(self.SYNC, objname)
453 454 455 456 457 458 459 460 461 462

        if serial != decision_state.serial:
            raise AssertionError(
                "Serial mismatch: assumed sync %s, decision %s"
                % (serial, decision_state.serial))
        if serial <= sync_state.serial:
            raise common.SyncError(
                "cannot ack: serial %s < sync serial %s" %
                (serial, sync_state.serial))

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
463
        db_source_state = db.get_state(source, objname)
464
        self.update_state(db, db_source_state, synced_source_state)
465 466 467

        final_target_state = synced_target_state.set(
            serial=serial)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
468
        db_target_state = db.get_state(target, objname)
469
        self.update_state(db, db_target_state, final_target_state)
470 471 472 473 474 475 476 477 478 479 480 481 482 483 484

        sync_info = dict(synced_source_state.info)
        sync_info.update(synced_target_state.info)
        # The 'info' namespace is global. Some attributes may be globally
        # recognizable by all clients with the same semantics, such as
        # a content-hash (e.g. SHA256), while other may be specific to
        # each client. Clients are responsible to protect their private
        # attributes creating their own namespace, for example
        # 'localfs_mtime', 'object_store_etag'
        new_sync_state = sync_state.set(serial=serial, info=sync_info)
        db.put_state(new_sync_state)
        new_decision_state = new_sync_state.set(archive=self.DECISION)
        db.put_state(new_decision_state)

    def list_deciding(self, archives=None):
485
        try:
486 487 488 489
            with TransactedConnection(self.syncer_dbtuple) as db:
                return self._list_deciding(db, archives=archives)
        except common.DatabaseError:
            return set()
490

491
    def _list_deciding(self, db, archives=None):
492 493
        if archives is None:
            archives = (self.MASTER, self.SLAVE)
494 495
        return set(db.list_deciding(archives=archives,
                                    sync=self.SYNC))
496

497
    def probe_archive(self, archive, forced=False):
498
        ident = utils.time_stamp()
499
        client = self.clients[archive]
500 501 502 503 504 505
        try:
            candidates = client.list_candidate_files(forced=forced)
            self._probe_files(archive, candidates, ident)
            client.remove_candidates(candidates, ident)
        except common.DatabaseError:
            pass
506

507 508 509
    def decide_archive(self, archive=None):
        try:
            archives = [archive] if archive is not None else None
510 511 512
            objnames = self.list_deciding(archives)
            self.decide_file_syncs(objnames)
            self.launch_syncs()
513 514
        except common.DatabaseError:
            pass
515 516

    def decide_all_archives(self):
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
517
        logger.debug("Checking candidates to sync")
518
        self.probe_all()
519
        self.decide_archive()
520

521 522 523 524
    def probe_all(self, forced=False):
        self.probe_archive(self.MASTER, forced=forced)
        self.probe_archive(self.SLAVE, forced=forced)

525
    def _poll_decide(self, interval=3):
526 527 528
        thread = utils.StoppableThread(interval, self.decide_all_archives)
        thread.start()
        return thread
529

530 531 532 533 534 535 536 537 538 539 540
    def check_decisions(self):
        deciding = self.list_deciding()
        decisions = self.dry_run_decisions(deciding)
        by_source = defaultdict(list)
        for decision in decisions:
            source_state = decision[0]
            source = source_state.archive
            objname = source_state.objname
            by_source[source].append(objname)
        return by_source

541 542 543 544 545 546 547 548
    # TODO cleanup db of objects deleted in all clients
    # def cleanup(self):
    #     db = self.get_db()
    #     master_deleted = set(db.list_files_with_info(MASTER, {}))
    #     client_deleted = set(db.list_files_with_info(SLAVE, {}))
    #     deleted = master_deleted.intersection(client_deleted)


Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
549 550
def conf(auth_url, auth_token, container, local_root_path, **kwargs):
    settings = SyncerSettings(auth_url=auth_url,
551 552
                              auth_token=auth_token,
                              container=container,
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
553 554
                              local_root_path=local_root_path,
                              **kwargs)
555 556 557
    master = PithosFileClient(settings)
    slave = LocalfsFileClient(settings)
    return FileSyncer(settings, master, slave)