syncer.py 20.9 KB
Newer Older
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# Copyright (C) 2015 GRNET S.A.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

16
17
import threading
import logging
18
from collections import defaultdict
19
import Queue
20
21
22

from agkyra.syncer import common
from agkyra.syncer.setup import SyncerSettings
23
from agkyra.syncer.database import TransactedConnection
24
25
from agkyra.syncer.localfs_client import LocalfsFileClient
from agkyra.syncer.pithos_client import PithosFileClient
26
from agkyra.syncer import messaging, utils
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41

logger = logging.getLogger(__name__)


class FileSyncer(object):

    dbname = None
    clients = None

    def __init__(self, settings, master, slave):
        self.settings = settings
        self.master = master
        self.slave = slave
        self.DECISION = 'DECISION'
        self.SYNC = 'SYNC'
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
42
43
        self.MASTER = master.SIGNATURE
        self.SLAVE = slave.SIGNATURE
44
        self.syncer_dbtuple = settings.syncer_dbtuple
45
        self.clients = {self.MASTER: master, self.SLAVE: slave}
46
47
48
        self.notifiers = {}
        self.decide_thread = None
        self.sync_threads = []
49
        self.failed_serials = utils.ThreadSafeDict()
50
        self.sync_queue = Queue.Queue()
51
        self.messager = settings.messager
52
        self.heartbeat = self.settings.heartbeat
53

54
55
56
57
58
59
60
    def thread_is_active(self, t):
        return t and t.is_alive()

    @property
    def decide_active(self):
        return self.thread_is_active(self.decide_thread)

61
62
    @property
    def paused(self):
63
        return not self.decide_active
64

65
    def initiate_probe(self):
66
        self.start_notifiers()
67
        self.probe_all(forced=True)
68
69

    def start_notifiers(self):
70
71
72
        for signature, client in self.clients.iteritems():
            notifier = self.notifiers.get(signature)
            if not self.thread_is_active(notifier):
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
73
                self.notifiers[signature] = client.notifier()
74
            else:
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
75
                logger.info("Notifier %s already up" % signature)
76

77
    def stop_notifiers(self, timeout=None):
78
        for notifier in self.notifiers.values():
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
79
80
81
82
83
84
85
            try:
                notifier.stop()
            except KeyError as e:
                # bypass watchdog inotify bug that causes a KeyError
                # when attempting to stop a notifier after the watched
                # directory has been deleted
                logger.warning("Ignored KeyError: %s" % e)
86
87
88
89
90
            except TypeError as e:
                # bypass watchdog osx bug that causes a TypeError
                # when attempting to stop a notifier after the watched
                # directory has been deleted
                logger.warning("Ignored TypeError: %s" % e)
91
        return utils.wait_joins(self.notifiers.values(), timeout)
92
93

    def start_decide(self):
94
95
96
        if not self.decide_active:
            self.decide_thread = self._poll_decide()

97
    def stop_decide(self, timeout=None):
98
99
        if self.decide_active:
            self.decide_thread.stop()
100
101
            return utils.wait_joins([self.decide_thread], timeout)
        return timeout
102

103
104
105
    def stop_all_daemons(self, timeout=None):
        remaining = self.stop_decide(timeout=timeout)
        return self.stop_notifiers(timeout=remaining)
106

107
108
    def wait_sync_threads(self, timeout=None):
        return utils.wait_joins(self.sync_threads, timeout=timeout)
109

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
110
111
    def get_next_message(self, block=False):
        return self.messager.get(block=block)
112

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
113
    def probe_file(self, archive, objname):
114
        ident = utils.time_stamp()
115
116
117
118
119
120
        try:
            self._probe_files(archive, [objname], ident)
            client = self.clients[archive]
            client.remove_candidates([objname], ident)
        except common.DatabaseError:
            pass
121

122
123
124
    def reg_name(self, objname):
        return utils.reg_name(self.settings, objname)

125
    def _probe_files(self, archive, objnames, ident):
126
127
128
        with TransactedConnection(self.syncer_dbtuple) as db:
            for objname in objnames:
                self._do_probe_file(db, archive, objname, ident)
129

130
    def _do_probe_file(self, db, archive, objname, ident):
131
        logger.debug("Probing archive: %s, object: '%s'" % (archive, objname))
132
        client = self.clients[archive]
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
133
134
        db_state = db.get_state(archive, objname)
        ref_state = db.get_state(self.SYNC, objname)
135
        with self.heartbeat.lock() as hb:
136
            beat = hb.get(self.reg_name(objname))
137
            if beat is not None:
138
139
                beat_thread = beat["thread"]
                if beat_thread is None or beat_thread.is_alive():
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
140
141
142
143
                    msg = messaging.HeartbeatNoProbeMessage(
                        archive=archive, objname=objname, heartbeat=beat,
                        logger=logger)
                    self.messager.put(msg)
144
                    return
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
145
        if db_state.serial != ref_state.serial:
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
146
            msg = messaging.AlreadyProbedMessage(
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
147
148
                archive=archive, objname=objname, serial=db_state.serial,
                logger=logger)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
149
            self.messager.put(msg)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
150
            return
151
        live_state = client.probe_file(objname, db_state, ref_state, ident)
152
        if live_state is not None:
153
            self.update_file_state(db, live_state)
154

155
    def update_file_state(self, db, live_state):
156
        archive = live_state.archive
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
157
        objname = live_state.objname
158
        serial = live_state.serial
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
159
        db_state = db.get_state(archive, objname)
160
161
        if db_state and db_state.serial != serial:
            logger.warning(
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
162
                "Cannot update archive: %s, object: '%s', "
163
                "serial: %s, db_serial: %s" %
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
164
                (archive, objname, serial, db_state.serial))
165
166
            return

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
167
        new_serial = db.new_serial(objname)
168
169
        new_state = live_state.set(serial=new_serial)
        db.put_state(new_state)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
170
171
172
173
        msg = messaging.UpdateMessage(
            archive=archive, objname=objname,
            serial=new_serial, old_serial=serial, logger=logger)
        self.messager.put(msg)
174
175
        if new_serial == 0:
            sync_state = common.FileState(
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
176
                archive=self.SYNC, objname=objname, serial=-1,
177
178
179
                info={})
            db.put_state(sync_state)

180
181
182
183
184
185
    def dry_run_decisions(self, objnames, master=None, slave=None):
        if master is None:
            master = self.MASTER
        if slave is None:
            slave = self.SLAVE
        decisions = []
186
187
188
189
        with TransactedConnection(self.syncer_dbtuple) as db:
            for objname in objnames:
                decision = self._dry_run_decision(db, objname, master, slave)
                decisions.append(decision)
190
191
        return decisions

192
    def _dry_run_decision(self, db, objname, master=None, slave=None):
193
194
195
196
197
        if master is None:
            master = self.MASTER
        if slave is None:
            slave = self.SLAVE
        ident = utils.time_stamp()
198
        return self._do_decide_file_sync(db, objname, master, slave, ident, True)
199

200
    def decide_file_syncs(self, objnames, master=None, slave=None):
201
202
203
204
        if master is None:
            master = self.MASTER
        if slave is None:
            slave = self.SLAVE
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
205
        ident = utils.time_stamp()
206
        syncs = []
207
        try:
208
            with TransactedConnection(self.syncer_dbtuple) as db:
209
210
211
212
213
                for objname in objnames:
                    states = self._decide_file_sync(
                        db, objname, master, slave, ident)
                    if states is not None:
                        syncs.append(states)
214
        except common.DatabaseError:
215
            self.clean_heartbeat(objnames, ident)
216
            return
217
        self.enqueue_syncs(syncs)
218

219
220
221
222
223
224
225
226
    def decide_file_sync(self, objname, master=None, slave=None):
        if master is None:
            master = self.MASTER
        if slave is None:
            slave = self.SLAVE
        self.decide_file_syncs([objname], master, slave)

    def clean_heartbeat(self, objnames, ident=None):
227
        with self.heartbeat.lock() as hb:
228
229
230
231
232
233
234
235
236
            for objname in objnames:
                beat = hb.pop(self.reg_name(objname), None)
                if beat is None:
                    return
                if ident and ident != beat["ident"]:
                    hb[self.reg_name(objname)] = beat
                else:
                    logger.debug("cleaning heartbeat %s, object '%s'"
                                 % (beat, objname))
237
238

    def _decide_file_sync(self, db, objname, master, slave, ident):
239
240
241
        if not self.settings._sync_is_enabled(db):
            logger.warning("Cannot decide '%s'; sync disabled." % objname)
            return
242
        states = self._do_decide_file_sync(db, objname, master, slave, ident)
243
244
        if states is not None:
            with self.heartbeat.lock() as hb:
245
                beat = {"ident": ident, "thread": None}
246
                hb[self.reg_name(objname)] = beat
247
248
        return states

249
    def _do_decide_file_sync(self, db, objname, master, slave, ident,
250
                             dry_run=False):
251
        logger.debug("Deciding object: '%s'" % objname)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
252
253
254
255
        master_state = db.get_state(master, objname)
        slave_state = db.get_state(slave, objname)
        sync_state = db.get_state(self.SYNC, objname)
        decision_state = db.get_state(self.DECISION, objname)
256
257
258
        master_serial = master_state.serial
        slave_serial = slave_state.serial
        sync_serial = sync_state.serial
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
259
        decision_serial = decision_state.serial
260

261
        with self.heartbeat.lock() as hb:
262
            beat = hb.get(self.reg_name(objname))
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
263
            logger.debug("object: %s heartbeat: %s" % (objname, beat))
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
264
265
            if beat is not None:
                if beat["ident"] == ident:
266
267
268
269
270
271
                    logger.warning(
                        "Found used heartbeat ident %s for object %s" %
                        (ident, objname))
                    return None
                beat_thread = beat["thread"]
                if beat_thread is None or beat_thread.is_alive():
272
                    if not dry_run:
273
                        msg = messaging.HeartbeatNoDecideMessage(
274
275
                            objname=objname, heartbeat=beat, logger=logger)
                        self.messager.put(msg)
276
277
278
                    return None
                logger.warning("Ignoring previous run: %s %s" %
                               (objname, beat))
279

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
280
        if decision_serial != sync_serial:
281
282
            with self.failed_serials.lock() as d:
                failed_sync = d.get((decision_serial, objname))
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
283
284
285
            if failed_sync is None:
                logger.warning(
                    "Already decided: '%s', decision: %s, sync: %s" %
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
286
                    (objname, decision_serial, sync_serial))
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
287
288
289
290
291
292
                if decision_serial == master_serial:
                    return master_state, slave_state, sync_state
                elif decision_serial == slave_serial:
                    return slave_state, master_state, sync_state
                else:
                    raise AssertionError(
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
293
                        "Decision serial %s for objname '%s' "
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
294
                        "does not match any archive." %
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
295
                        (decision_serial, objname))
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
296
            else:
297
298
299
300
                if not dry_run:
                    msg = messaging.FailedSyncIgnoreDecisionMessage(
                        objname=objname, serial=decision_serial, logger=logger)
                    self.messager.put(msg)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
301
302

        if master_serial > sync_serial:
303
304
            if master_serial == decision_serial:  # this is a failed serial
                return None
305
            if not dry_run:
306
                self._make_decision_state(db, decision_state, master_state)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
307
            return master_state, slave_state, sync_state
308
309
        elif master_serial == sync_serial:
            if slave_serial > sync_serial:
310
311
                if slave_serial == decision_serial:  # this is a failed serial
                    return None
312
                if not dry_run:
313
                    self._make_decision_state(db, decision_state, slave_state)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
314
                return slave_state, master_state, sync_state
315
316
317
318
319
320
321
322
323
324
            elif slave_serial == sync_serial:
                return None
            else:
                raise AssertionError("Slave serial %s, sync serial %s"
                                     % (slave_serial, sync_serial))

        else:
            raise AssertionError("Master serial %s, sync serial %s"
                                 % (master_serial, sync_serial))

325
    def _make_decision_state(self, db, decision_state, source_state):
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
326
327
328
329
        new_decision_state = decision_state.set(
            serial=source_state.serial, info=source_state.info)
        db.put_state(new_decision_state)

330
331
332
333
334
335
336
337
338
339
340
341
    def enqueue_syncs(self, syncs):
        for sync in syncs:
            self.sync_queue.put(sync)

    def launch_syncs(self):
        with self.heartbeat.lock() as hb:
            alive_threads = len([v for v in hb.values()
                                 if v["thread"] is not None
                                 and v["thread"].is_alive()])
        max_alive_threads = self.settings.max_alive_sync_threads
        new_threads = max_alive_threads - alive_threads
        if new_threads > 0:
342
            logger.debug("Can start max %s syncs" % new_threads)
343
344
345
346
347
348
349
            for i in range(new_threads):
                try:
                    tpl = self.sync_queue.get(block=False)
                    self.sync_file(*tpl)
                except Queue.Empty:
                    break

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
350
    def sync_file(self, source_state, target_state, sync_state):
351
352
353
354
        msg = messaging.SyncMessage(
            objname=source_state.objname,
            archive=source_state.archive,
            serial=source_state.serial,
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
355
            info=source_state.info,
356
357
            logger=logger)
        self.messager.put(msg)
358
        thread = threading.Thread(
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
359
            target=self._sync_file,
360
            args=(source_state, target_state, sync_state))
361
362
363
364
365
366
367
        with self.heartbeat.lock() as hb:
            beat = hb.get(self.reg_name(source_state.objname))
            if beat is None:
                raise AssertionError("heartbeat for %s is None" %
                                     source_state.objname)
            assert beat["thread"] is None
            beat["thread"] = thread
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
368
        thread.daemon = True
369
        thread.start()
370
        self.sync_threads.append(thread)
371

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
372
    def _sync_file(self, source_state, target_state, sync_state):
373
374
375
376
377
378
379
380
381
382
        clients = self.clients
        source_client = clients[source_state.archive]
        try:
            source_handle = source_client.stage_file(source_state)
        except common.SyncError as e:
            logger.warning(e)
            return
        target_client = clients[target_state.archive]
        target_client.start_pulling_file(
            source_handle, target_state, sync_state,
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
383
            callback=self.ack_file_sync,
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
384
385
            failure_callback=self.mark_as_failed)

386
    def mark_as_failed(self, state, hard=False):
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
387
        serial = state.serial
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
388
        objname = state.objname
389
390
391
392
393
394
        if hard:
            logger.warning(
                "Marking failed serial %s for archive: %s, object: '%s'" %
                (serial, state.archive, objname))
            with self.failed_serials.lock() as d:
                d[(serial, objname)] = state
395
        self.clean_heartbeat([objname])
396

397
    def update_state(self, db, old_state, new_state):
398
399
400
401
        db.put_state(new_state)
        # here we could do any checks needed on the old state,
        # perhaps triggering a probe

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
402
    def ack_file_sync(self, synced_source_state, synced_target_state):
403
404
        with TransactedConnection(self.syncer_dbtuple) as db:
            self._ack_file_sync(db, synced_source_state, synced_target_state)
405
        serial = synced_source_state.serial
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
406
        objname = synced_source_state.objname
407
        target = synced_target_state.archive
408
        self.clean_heartbeat([objname])
409
410
411
412
        msg = messaging.AckSyncMessage(
            archive=target, objname=objname, serial=serial,
            logger=logger)
        self.messager.put(msg)
413

414
    def _ack_file_sync(self, db, synced_source_state, synced_target_state):
415
416
417
418
        serial = synced_source_state.serial
        objname = synced_source_state.objname
        source = synced_source_state.archive
        target = synced_target_state.archive
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
419
        tinfo = synced_target_state.info
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
420
421
422
        logger.debug("Acking archive: %s, object: '%s', serial: %s "
                     "info: %s" %
                     (target, objname, serial, tinfo))
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
423
424
        decision_state = db.get_state(self.DECISION, objname)
        sync_state = db.get_state(self.SYNC, objname)
425
426
427
428
429
430
431
432
433
434

        if serial != decision_state.serial:
            raise AssertionError(
                "Serial mismatch: assumed sync %s, decision %s"
                % (serial, decision_state.serial))
        if serial <= sync_state.serial:
            raise common.SyncError(
                "cannot ack: serial %s < sync serial %s" %
                (serial, sync_state.serial))

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
435
        db_source_state = db.get_state(source, objname)
436
        self.update_state(db, db_source_state, synced_source_state)
437
438
439

        final_target_state = synced_target_state.set(
            serial=serial)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
440
        db_target_state = db.get_state(target, objname)
441
        self.update_state(db, db_target_state, final_target_state)
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456

        sync_info = dict(synced_source_state.info)
        sync_info.update(synced_target_state.info)
        # The 'info' namespace is global. Some attributes may be globally
        # recognizable by all clients with the same semantics, such as
        # a content-hash (e.g. SHA256), while other may be specific to
        # each client. Clients are responsible to protect their private
        # attributes creating their own namespace, for example
        # 'localfs_mtime', 'object_store_etag'
        new_sync_state = sync_state.set(serial=serial, info=sync_info)
        db.put_state(new_sync_state)
        new_decision_state = new_sync_state.set(archive=self.DECISION)
        db.put_state(new_decision_state)

    def list_deciding(self, archives=None):
457
        try:
458
459
460
461
            with TransactedConnection(self.syncer_dbtuple) as db:
                return self._list_deciding(db, archives=archives)
        except common.DatabaseError:
            return set()
462

463
    def _list_deciding(self, db, archives=None):
464
465
        if archives is None:
            archives = (self.MASTER, self.SLAVE)
466
467
        return set(db.list_deciding(archives=archives,
                                    sync=self.SYNC))
468

469
    def probe_archive(self, archive, forced=False):
470
        ident = utils.time_stamp()
471
        client = self.clients[archive]
472
473
474
475
476
477
        try:
            candidates = client.list_candidate_files(forced=forced)
            self._probe_files(archive, candidates, ident)
            client.remove_candidates(candidates, ident)
        except common.DatabaseError:
            pass
478

479
480
481
    def decide_archive(self, archive=None):
        try:
            archives = [archive] if archive is not None else None
482
483
484
            objnames = self.list_deciding(archives)
            self.decide_file_syncs(objnames)
            self.launch_syncs()
485
486
        except common.DatabaseError:
            pass
487
488

    def decide_all_archives(self):
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
489
        logger.debug("Checking candidates to sync")
490
        self.probe_all()
491
        self.decide_archive()
492

493
494
495
496
    def probe_all(self, forced=False):
        self.probe_archive(self.MASTER, forced=forced)
        self.probe_archive(self.SLAVE, forced=forced)

497
    def _poll_decide(self, interval=3):
498
499
500
        thread = utils.StoppableThread(interval, self.decide_all_archives)
        thread.start()
        return thread
501

502
503
504
505
506
507
508
509
510
511
512
    def check_decisions(self):
        deciding = self.list_deciding()
        decisions = self.dry_run_decisions(deciding)
        by_source = defaultdict(list)
        for decision in decisions:
            source_state = decision[0]
            source = source_state.archive
            objname = source_state.objname
            by_source[source].append(objname)
        return by_source

513
514
515
516
517
518
519
520
    # TODO cleanup db of objects deleted in all clients
    # def cleanup(self):
    #     db = self.get_db()
    #     master_deleted = set(db.list_files_with_info(MASTER, {}))
    #     client_deleted = set(db.list_files_with_info(SLAVE, {}))
    #     deleted = master_deleted.intersection(client_deleted)


Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
521
522
def conf(auth_url, auth_token, container, local_root_path, **kwargs):
    settings = SyncerSettings(auth_url=auth_url,
523
524
                              auth_token=auth_token,
                              container=container,
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
525
526
                              local_root_path=local_root_path,
                              **kwargs)
527
528
529
    master = PithosFileClient(settings)
    slave = LocalfsFileClient(settings)
    return FileSyncer(settings, master, slave)