syncer.py 20.6 KB
Newer Older
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# Copyright (C) 2015 GRNET S.A.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

16
17
18
import time
import threading
import logging
19
from collections import defaultdict
20
import Queue
21
22
23

from agkyra.syncer import common
from agkyra.syncer.setup import SyncerSettings
24
from agkyra.syncer.database import TransactedConnection
25
26
from agkyra.syncer.localfs_client import LocalfsFileClient
from agkyra.syncer.pithos_client import PithosFileClient
27
from agkyra.syncer import messaging, utils
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42

logger = logging.getLogger(__name__)


class FileSyncer(object):

    dbname = None
    clients = None

    def __init__(self, settings, master, slave):
        self.settings = settings
        self.master = master
        self.slave = slave
        self.DECISION = 'DECISION'
        self.SYNC = 'SYNC'
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
43
44
        self.MASTER = master.SIGNATURE
        self.SLAVE = slave.SIGNATURE
45
        self.syncer_dbtuple = settings.syncer_dbtuple
46
        self.clients = {self.MASTER: master, self.SLAVE: slave}
47
48
49
        self.notifiers = {}
        self.decide_thread = None
        self.sync_threads = []
50
        self.failed_serials = utils.ThreadSafeDict()
51
        self.sync_queue = Queue.Queue()
52
        self.messager = settings.messager
53
        self.heartbeat = self.settings.heartbeat
54

55
56
57
58
59
60
61
    def thread_is_active(self, t):
        return t and t.is_alive()

    @property
    def decide_active(self):
        return self.thread_is_active(self.decide_thread)

62
63
    @property
    def paused(self):
64
        return not self.decide_active
65

66
    def initiate_probe(self):
67
        self.start_notifiers()
68
        self.probe_all(forced=True)
69
70

    def start_notifiers(self):
71
72
73
        for signature, client in self.clients.iteritems():
            notifier = self.notifiers.get(signature)
            if not self.thread_is_active(notifier):
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
74
                self.notifiers[signature] = client.notifier()
75
            else:
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
76
                logger.info("Notifier %s already up" % signature)
77
78
79

    def stop_notifiers(self):
        for notifier in self.notifiers.values():
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
80
81
82
83
84
85
86
            try:
                notifier.stop()
            except KeyError as e:
                # bypass watchdog inotify bug that causes a KeyError
                # when attempting to stop a notifier after the watched
                # directory has been deleted
                logger.warning("Ignored KeyError: %s" % e)
87
88
        for notifier in self.notifiers.values():
            notifier.join()
89
90

    def start_decide(self):
91
92
93
94
95
96
        if not self.decide_active:
            self.decide_thread = self._poll_decide()

    def stop_decide(self):
        if self.decide_active:
            self.decide_thread.stop()
97
            self.decide_thread.join()
98

99
100
101
102
103
104
105
    def stop_all_daemons(self):
        self.stop_decide()
        self.stop_notifiers()

    def wait_sync_threads(self):
        for thread in self.sync_threads:
            thread.join()
106

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
107
108
    def get_next_message(self, block=False):
        return self.messager.get(block=block)
109

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
110
    def probe_file(self, archive, objname):
111
        ident = utils.time_stamp()
112
113
114
115
116
117
        try:
            self._probe_files(archive, [objname], ident)
            client = self.clients[archive]
            client.remove_candidates([objname], ident)
        except common.DatabaseError:
            pass
118

119
120
121
    def reg_name(self, objname):
        return utils.reg_name(self.settings, objname)

122
    def _probe_files(self, archive, objnames, ident):
123
124
125
        with TransactedConnection(self.syncer_dbtuple) as db:
            for objname in objnames:
                self._do_probe_file(db, archive, objname, ident)
126

127
    def _do_probe_file(self, db, archive, objname, ident):
128
        logger.debug("Probing archive: %s, object: '%s'" % (archive, objname))
129
        client = self.clients[archive]
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
130
131
        db_state = db.get_state(archive, objname)
        ref_state = db.get_state(self.SYNC, objname)
132
        with self.heartbeat.lock() as hb:
133
            beat = hb.get(self.reg_name(objname))
134
            if beat is not None:
135
136
                beat_thread = beat["thread"]
                if beat_thread is None or beat_thread.is_alive():
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
137
138
139
140
                    msg = messaging.HeartbeatNoProbeMessage(
                        archive=archive, objname=objname, heartbeat=beat,
                        logger=logger)
                    self.messager.put(msg)
141
                    return
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
142
        if db_state.serial != ref_state.serial:
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
143
            msg = messaging.AlreadyProbedMessage(
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
144
145
                archive=archive, objname=objname, serial=db_state.serial,
                logger=logger)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
146
            self.messager.put(msg)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
147
            return
148
        live_state = client.probe_file(objname, db_state, ref_state, ident)
149
        if live_state is not None:
150
            self.update_file_state(db, live_state)
151

152
    def update_file_state(self, db, live_state):
153
        archive = live_state.archive
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
154
        objname = live_state.objname
155
        serial = live_state.serial
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
156
        db_state = db.get_state(archive, objname)
157
158
        if db_state and db_state.serial != serial:
            logger.warning(
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
159
                "Cannot update archive: %s, object: '%s', "
160
                "serial: %s, db_serial: %s" %
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
161
                (archive, objname, serial, db_state.serial))
162
163
            return

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
164
        new_serial = db.new_serial(objname)
165
166
        new_state = live_state.set(serial=new_serial)
        db.put_state(new_state)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
167
168
169
170
        msg = messaging.UpdateMessage(
            archive=archive, objname=objname,
            serial=new_serial, old_serial=serial, logger=logger)
        self.messager.put(msg)
171
172
        if new_serial == 0:
            sync_state = common.FileState(
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
173
                archive=self.SYNC, objname=objname, serial=-1,
174
175
176
                info={})
            db.put_state(sync_state)

177
178
179
180
181
182
    def dry_run_decisions(self, objnames, master=None, slave=None):
        if master is None:
            master = self.MASTER
        if slave is None:
            slave = self.SLAVE
        decisions = []
183
184
185
186
        with TransactedConnection(self.syncer_dbtuple) as db:
            for objname in objnames:
                decision = self._dry_run_decision(db, objname, master, slave)
                decisions.append(decision)
187
188
        return decisions

189
    def _dry_run_decision(self, db, objname, master=None, slave=None):
190
191
192
193
194
        if master is None:
            master = self.MASTER
        if slave is None:
            slave = self.SLAVE
        ident = utils.time_stamp()
195
        return self._do_decide_file_sync(db, objname, master, slave, ident, True)
196

197
    def decide_file_syncs(self, objnames, master=None, slave=None):
198
199
200
201
        if master is None:
            master = self.MASTER
        if slave is None:
            slave = self.SLAVE
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
202
        ident = utils.time_stamp()
203
        syncs = []
204
        try:
205
            with TransactedConnection(self.syncer_dbtuple) as db:
206
207
208
209
210
                for objname in objnames:
                    states = self._decide_file_sync(
                        db, objname, master, slave, ident)
                    if states is not None:
                        syncs.append(states)
211
        except common.DatabaseError:
212
            self.clean_heartbeat(objnames, ident)
213
            return
214
        self.enqueue_syncs(syncs)
215

216
217
218
219
220
221
222
223
    def decide_file_sync(self, objname, master=None, slave=None):
        if master is None:
            master = self.MASTER
        if slave is None:
            slave = self.SLAVE
        self.decide_file_syncs([objname], master, slave)

    def clean_heartbeat(self, objnames, ident=None):
224
        with self.heartbeat.lock() as hb:
225
226
227
228
229
230
231
232
233
            for objname in objnames:
                beat = hb.pop(self.reg_name(objname), None)
                if beat is None:
                    return
                if ident and ident != beat["ident"]:
                    hb[self.reg_name(objname)] = beat
                else:
                    logger.debug("cleaning heartbeat %s, object '%s'"
                                 % (beat, objname))
234
235

    def _decide_file_sync(self, db, objname, master, slave, ident):
236
237
238
        if not self.settings._sync_is_enabled(db):
            logger.warning("Cannot decide '%s'; sync disabled." % objname)
            return
239
        states = self._do_decide_file_sync(db, objname, master, slave, ident)
240
241
        if states is not None:
            with self.heartbeat.lock() as hb:
242
                beat = {"ident": ident, "thread": None}
243
                hb[self.reg_name(objname)] = beat
244
245
        return states

246
    def _do_decide_file_sync(self, db, objname, master, slave, ident,
247
                             dry_run=False):
248
        logger.debug("Deciding object: '%s'" % objname)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
249
250
251
252
        master_state = db.get_state(master, objname)
        slave_state = db.get_state(slave, objname)
        sync_state = db.get_state(self.SYNC, objname)
        decision_state = db.get_state(self.DECISION, objname)
253
254
255
        master_serial = master_state.serial
        slave_serial = slave_state.serial
        sync_serial = sync_state.serial
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
256
        decision_serial = decision_state.serial
257

258
        with self.heartbeat.lock() as hb:
259
            beat = hb.get(self.reg_name(objname))
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
260
            logger.debug("object: %s heartbeat: %s" % (objname, beat))
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
261
262
            if beat is not None:
                if beat["ident"] == ident:
263
264
265
266
267
268
                    logger.warning(
                        "Found used heartbeat ident %s for object %s" %
                        (ident, objname))
                    return None
                beat_thread = beat["thread"]
                if beat_thread is None or beat_thread.is_alive():
269
                    if not dry_run:
270
                        msg = messaging.HeartbeatNoDecideMessage(
271
272
                            objname=objname, heartbeat=beat, logger=logger)
                        self.messager.put(msg)
273
274
275
                    return None
                logger.warning("Ignoring previous run: %s %s" %
                               (objname, beat))
276

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
277
        if decision_serial != sync_serial:
278
279
            with self.failed_serials.lock() as d:
                failed_sync = d.get((decision_serial, objname))
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
280
281
282
            if failed_sync is None:
                logger.warning(
                    "Already decided: '%s', decision: %s, sync: %s" %
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
283
                    (objname, decision_serial, sync_serial))
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
284
285
286
287
288
289
                if decision_serial == master_serial:
                    return master_state, slave_state, sync_state
                elif decision_serial == slave_serial:
                    return slave_state, master_state, sync_state
                else:
                    raise AssertionError(
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
290
                        "Decision serial %s for objname '%s' "
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
291
                        "does not match any archive." %
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
292
                        (decision_serial, objname))
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
293
            else:
294
295
296
297
                if not dry_run:
                    msg = messaging.FailedSyncIgnoreDecisionMessage(
                        objname=objname, serial=decision_serial, logger=logger)
                    self.messager.put(msg)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
298
299

        if master_serial > sync_serial:
300
301
            if master_serial == decision_serial:  # this is a failed serial
                return None
302
            if not dry_run:
303
                self._make_decision_state(db, decision_state, master_state)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
304
            return master_state, slave_state, sync_state
305
306
        elif master_serial == sync_serial:
            if slave_serial > sync_serial:
307
308
                if slave_serial == decision_serial:  # this is a failed serial
                    return None
309
                if not dry_run:
310
                    self._make_decision_state(db, decision_state, slave_state)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
311
                return slave_state, master_state, sync_state
312
313
314
315
316
317
318
319
320
321
            elif slave_serial == sync_serial:
                return None
            else:
                raise AssertionError("Slave serial %s, sync serial %s"
                                     % (slave_serial, sync_serial))

        else:
            raise AssertionError("Master serial %s, sync serial %s"
                                 % (master_serial, sync_serial))

322
    def _make_decision_state(self, db, decision_state, source_state):
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
323
324
325
326
        new_decision_state = decision_state.set(
            serial=source_state.serial, info=source_state.info)
        db.put_state(new_decision_state)

327
328
329
330
331
332
333
334
335
336
337
338
    def enqueue_syncs(self, syncs):
        for sync in syncs:
            self.sync_queue.put(sync)

    def launch_syncs(self):
        with self.heartbeat.lock() as hb:
            alive_threads = len([v for v in hb.values()
                                 if v["thread"] is not None
                                 and v["thread"].is_alive()])
        max_alive_threads = self.settings.max_alive_sync_threads
        new_threads = max_alive_threads - alive_threads
        if new_threads > 0:
339
            logger.debug("Can start max %s syncs" % new_threads)
340
341
342
343
344
345
346
            for i in range(new_threads):
                try:
                    tpl = self.sync_queue.get(block=False)
                    self.sync_file(*tpl)
                except Queue.Empty:
                    break

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
347
    def sync_file(self, source_state, target_state, sync_state):
348
349
350
351
        msg = messaging.SyncMessage(
            objname=source_state.objname,
            archive=source_state.archive,
            serial=source_state.serial,
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
352
            info=source_state.info,
353
354
            logger=logger)
        self.messager.put(msg)
355
        thread = threading.Thread(
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
356
            target=self._sync_file,
357
            args=(source_state, target_state, sync_state))
358
359
360
361
362
363
364
        with self.heartbeat.lock() as hb:
            beat = hb.get(self.reg_name(source_state.objname))
            if beat is None:
                raise AssertionError("heartbeat for %s is None" %
                                     source_state.objname)
            assert beat["thread"] is None
            beat["thread"] = thread
365
        thread.start()
366
        self.sync_threads.append(thread)
367

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
368
    def _sync_file(self, source_state, target_state, sync_state):
369
370
371
372
373
374
375
376
377
378
        clients = self.clients
        source_client = clients[source_state.archive]
        try:
            source_handle = source_client.stage_file(source_state)
        except common.SyncError as e:
            logger.warning(e)
            return
        target_client = clients[target_state.archive]
        target_client.start_pulling_file(
            source_handle, target_state, sync_state,
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
379
            callback=self.ack_file_sync,
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
380
381
            failure_callback=self.mark_as_failed)

382
    def mark_as_failed(self, state, hard=False):
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
383
        serial = state.serial
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
384
        objname = state.objname
385
386
387
388
389
390
        if hard:
            logger.warning(
                "Marking failed serial %s for archive: %s, object: '%s'" %
                (serial, state.archive, objname))
            with self.failed_serials.lock() as d:
                d[(serial, objname)] = state
391
        self.clean_heartbeat([objname])
392

393
    def update_state(self, db, old_state, new_state):
394
395
396
397
        db.put_state(new_state)
        # here we could do any checks needed on the old state,
        # perhaps triggering a probe

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
398
    def ack_file_sync(self, synced_source_state, synced_target_state):
399
400
        with TransactedConnection(self.syncer_dbtuple) as db:
            self._ack_file_sync(db, synced_source_state, synced_target_state)
401
        serial = synced_source_state.serial
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
402
        objname = synced_source_state.objname
403
        target = synced_target_state.archive
404
        self.clean_heartbeat([objname])
405
406
407
408
        msg = messaging.AckSyncMessage(
            archive=target, objname=objname, serial=serial,
            logger=logger)
        self.messager.put(msg)
409

410
    def _ack_file_sync(self, db, synced_source_state, synced_target_state):
411
412
413
414
        serial = synced_source_state.serial
        objname = synced_source_state.objname
        source = synced_source_state.archive
        target = synced_target_state.archive
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
415
        tinfo = synced_target_state.info
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
416
417
418
        logger.debug("Acking archive: %s, object: '%s', serial: %s "
                     "info: %s" %
                     (target, objname, serial, tinfo))
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
419
420
        decision_state = db.get_state(self.DECISION, objname)
        sync_state = db.get_state(self.SYNC, objname)
421
422
423
424
425
426
427
428
429
430

        if serial != decision_state.serial:
            raise AssertionError(
                "Serial mismatch: assumed sync %s, decision %s"
                % (serial, decision_state.serial))
        if serial <= sync_state.serial:
            raise common.SyncError(
                "cannot ack: serial %s < sync serial %s" %
                (serial, sync_state.serial))

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
431
        db_source_state = db.get_state(source, objname)
432
        self.update_state(db, db_source_state, synced_source_state)
433
434
435

        final_target_state = synced_target_state.set(
            serial=serial)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
436
        db_target_state = db.get_state(target, objname)
437
        self.update_state(db, db_target_state, final_target_state)
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452

        sync_info = dict(synced_source_state.info)
        sync_info.update(synced_target_state.info)
        # The 'info' namespace is global. Some attributes may be globally
        # recognizable by all clients with the same semantics, such as
        # a content-hash (e.g. SHA256), while other may be specific to
        # each client. Clients are responsible to protect their private
        # attributes creating their own namespace, for example
        # 'localfs_mtime', 'object_store_etag'
        new_sync_state = sync_state.set(serial=serial, info=sync_info)
        db.put_state(new_sync_state)
        new_decision_state = new_sync_state.set(archive=self.DECISION)
        db.put_state(new_decision_state)

    def list_deciding(self, archives=None):
453
        try:
454
455
456
457
            with TransactedConnection(self.syncer_dbtuple) as db:
                return self._list_deciding(db, archives=archives)
        except common.DatabaseError:
            return set()
458

459
    def _list_deciding(self, db, archives=None):
460
461
        if archives is None:
            archives = (self.MASTER, self.SLAVE)
462
463
        return set(db.list_deciding(archives=archives,
                                    sync=self.SYNC))
464

465
    def probe_archive(self, archive, forced=False):
466
        ident = utils.time_stamp()
467
        client = self.clients[archive]
468
469
470
471
472
473
        try:
            candidates = client.list_candidate_files(forced=forced)
            self._probe_files(archive, candidates, ident)
            client.remove_candidates(candidates, ident)
        except common.DatabaseError:
            pass
474

475
476
477
    def decide_archive(self, archive=None):
        try:
            archives = [archive] if archive is not None else None
478
479
480
            objnames = self.list_deciding(archives)
            self.decide_file_syncs(objnames)
            self.launch_syncs()
481
482
        except common.DatabaseError:
            pass
483
484

    def decide_all_archives(self):
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
485
        logger.debug("Checking candidates to sync")
486
        self.probe_all()
487
        self.decide_archive()
488

489
490
491
492
    def probe_all(self, forced=False):
        self.probe_archive(self.MASTER, forced=forced)
        self.probe_archive(self.SLAVE, forced=forced)

493
    def _poll_decide(self, interval=3):
494
495
        class DecideThread(utils.StoppableThread):
            def run_body(this):
496
497
                self.decide_all_archives()
                time.sleep(interval)
498
499
        return utils.start_daemon(DecideThread)

500
501
502
503
504
505
506
507
508
509
510
    def check_decisions(self):
        deciding = self.list_deciding()
        decisions = self.dry_run_decisions(deciding)
        by_source = defaultdict(list)
        for decision in decisions:
            source_state = decision[0]
            source = source_state.archive
            objname = source_state.objname
            by_source[source].append(objname)
        return by_source

511
512
513
514
515
516
517
518
    # TODO cleanup db of objects deleted in all clients
    # def cleanup(self):
    #     db = self.get_db()
    #     master_deleted = set(db.list_files_with_info(MASTER, {}))
    #     client_deleted = set(db.list_files_with_info(SLAVE, {}))
    #     deleted = master_deleted.intersection(client_deleted)


Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
519
520
def conf(auth_url, auth_token, container, local_root_path, **kwargs):
    settings = SyncerSettings(auth_url=auth_url,
521
522
                              auth_token=auth_token,
                              container=container,
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
523
524
                              local_root_path=local_root_path,
                              **kwargs)
525
526
527
    master = PithosFileClient(settings)
    slave = LocalfsFileClient(settings)
    return FileSyncer(settings, master, slave)