syncer.py 20.4 KB
Newer Older
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# Copyright (C) 2015 GRNET S.A.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

16
17
18
import time
import threading
import logging
19
from collections import defaultdict
20
import Queue
21
22
23

from agkyra.syncer import common
from agkyra.syncer.setup import SyncerSettings
24
from agkyra.syncer.database import TransactedConnection
25
26
from agkyra.syncer.localfs_client import LocalfsFileClient
from agkyra.syncer.pithos_client import PithosFileClient
27
from agkyra.syncer import messaging, utils
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42

logger = logging.getLogger(__name__)


class FileSyncer(object):

    dbname = None
    clients = None

    def __init__(self, settings, master, slave):
        self.settings = settings
        self.master = master
        self.slave = slave
        self.DECISION = 'DECISION'
        self.SYNC = 'SYNC'
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
43
44
        self.MASTER = master.SIGNATURE
        self.SLAVE = slave.SIGNATURE
45
        self.syncer_dbtuple = settings.syncer_dbtuple
46
        self.clients = {self.MASTER: master, self.SLAVE: slave}
47
48
49
        self.notifiers = {}
        self.decide_thread = None
        self.sync_threads = []
50
        self.failed_serials = utils.ThreadSafeDict()
51
        self.sync_queue = Queue.Queue()
52
        self.messager = settings.messager
53
        self.heartbeat = self.settings.heartbeat
54

55
56
57
58
59
60
61
    def thread_is_active(self, t):
        return t and t.is_alive()

    @property
    def decide_active(self):
        return self.thread_is_active(self.decide_thread)

62
63
    @property
    def paused(self):
64
        return not self.decide_active
65

66
    def initiate_probe(self):
67
        self.start_notifiers()
68
        self.probe_all(forced=True)
69
70

    def start_notifiers(self):
71
72
73
        for signature, client in self.clients.iteritems():
            notifier = self.notifiers.get(signature)
            if not self.thread_is_active(notifier):
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
74
                self.notifiers[signature] = client.notifier()
75
            else:
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
76
                logger.info("Notifier %s already up" % signature)
77
78
79

    def stop_notifiers(self):
        for notifier in self.notifiers.values():
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
80
81
82
83
84
85
86
            try:
                notifier.stop()
            except KeyError as e:
                # bypass watchdog inotify bug that causes a KeyError
                # when attempting to stop a notifier after the watched
                # directory has been deleted
                logger.warning("Ignored KeyError: %s" % e)
87
88

    def start_decide(self):
89
90
91
92
93
94
        if not self.decide_active:
            self.decide_thread = self._poll_decide()

    def stop_decide(self):
        if self.decide_active:
            self.decide_thread.stop()
95

96
97
98
99
100
101
102
    def stop_all_daemons(self):
        self.stop_decide()
        self.stop_notifiers()

    def wait_sync_threads(self):
        for thread in self.sync_threads:
            thread.join()
103

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
104
105
    def get_next_message(self, block=False):
        return self.messager.get(block=block)
106

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
107
    def probe_file(self, archive, objname):
108
        ident = utils.time_stamp()
109
110
111
112
113
114
        try:
            self._probe_files(archive, [objname], ident)
            client = self.clients[archive]
            client.remove_candidates([objname], ident)
        except common.DatabaseError:
            pass
115

116
117
118
    def reg_name(self, objname):
        return utils.reg_name(self.settings, objname)

119
    def _probe_files(self, archive, objnames, ident):
120
121
122
        with TransactedConnection(self.syncer_dbtuple) as db:
            for objname in objnames:
                self._do_probe_file(db, archive, objname, ident)
123

124
    def _do_probe_file(self, db, archive, objname, ident):
125
        logger.debug("Probing archive: %s, object: '%s'" % (archive, objname))
126
        client = self.clients[archive]
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
127
128
        db_state = db.get_state(archive, objname)
        ref_state = db.get_state(self.SYNC, objname)
129
        with self.heartbeat.lock() as hb:
130
            beat = hb.get(self.reg_name(objname))
131
            if beat is not None:
132
133
                beat_thread = beat["thread"]
                if beat_thread is None or beat_thread.is_alive():
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
134
135
136
137
                    msg = messaging.HeartbeatNoProbeMessage(
                        archive=archive, objname=objname, heartbeat=beat,
                        logger=logger)
                    self.messager.put(msg)
138
                    return
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
139
        if db_state.serial != ref_state.serial:
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
140
            msg = messaging.AlreadyProbedMessage(
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
141
142
                archive=archive, objname=objname, serial=db_state.serial,
                logger=logger)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
143
            self.messager.put(msg)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
144
            return
145
        live_state = client.probe_file(objname, db_state, ref_state, ident)
146
        if live_state is not None:
147
            self.update_file_state(db, live_state)
148

149
    def update_file_state(self, db, live_state):
150
        archive = live_state.archive
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
151
        objname = live_state.objname
152
        serial = live_state.serial
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
153
        db_state = db.get_state(archive, objname)
154
155
        if db_state and db_state.serial != serial:
            logger.warning(
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
156
                "Cannot update archive: %s, object: '%s', "
157
                "serial: %s, db_serial: %s" %
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
158
                (archive, objname, serial, db_state.serial))
159
160
            return

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
161
        new_serial = db.new_serial(objname)
162
163
        new_state = live_state.set(serial=new_serial)
        db.put_state(new_state)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
164
165
166
167
        msg = messaging.UpdateMessage(
            archive=archive, objname=objname,
            serial=new_serial, old_serial=serial, logger=logger)
        self.messager.put(msg)
168
169
        if new_serial == 0:
            sync_state = common.FileState(
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
170
                archive=self.SYNC, objname=objname, serial=-1,
171
172
173
                info={})
            db.put_state(sync_state)

174
175
176
177
178
179
    def dry_run_decisions(self, objnames, master=None, slave=None):
        if master is None:
            master = self.MASTER
        if slave is None:
            slave = self.SLAVE
        decisions = []
180
181
182
183
        with TransactedConnection(self.syncer_dbtuple) as db:
            for objname in objnames:
                decision = self._dry_run_decision(db, objname, master, slave)
                decisions.append(decision)
184
185
        return decisions

186
    def _dry_run_decision(self, db, objname, master=None, slave=None):
187
188
189
190
191
        if master is None:
            master = self.MASTER
        if slave is None:
            slave = self.SLAVE
        ident = utils.time_stamp()
192
        return self._do_decide_file_sync(db, objname, master, slave, ident, True)
193

194
    def decide_file_syncs(self, objnames, master=None, slave=None):
195
196
197
198
        if master is None:
            master = self.MASTER
        if slave is None:
            slave = self.SLAVE
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
199
        ident = utils.time_stamp()
200
        syncs = []
201
        try:
202
            with TransactedConnection(self.syncer_dbtuple) as db:
203
204
205
206
207
                for objname in objnames:
                    states = self._decide_file_sync(
                        db, objname, master, slave, ident)
                    if states is not None:
                        syncs.append(states)
208
        except common.DatabaseError:
209
            self.clean_heartbeat(objnames, ident)
210
            return
211
        self.enqueue_syncs(syncs)
212

213
214
215
216
217
218
219
220
    def decide_file_sync(self, objname, master=None, slave=None):
        if master is None:
            master = self.MASTER
        if slave is None:
            slave = self.SLAVE
        self.decide_file_syncs([objname], master, slave)

    def clean_heartbeat(self, objnames, ident=None):
221
        with self.heartbeat.lock() as hb:
222
223
224
225
226
227
228
229
230
            for objname in objnames:
                beat = hb.pop(self.reg_name(objname), None)
                if beat is None:
                    return
                if ident and ident != beat["ident"]:
                    hb[self.reg_name(objname)] = beat
                else:
                    logger.debug("cleaning heartbeat %s, object '%s'"
                                 % (beat, objname))
231
232

    def _decide_file_sync(self, db, objname, master, slave, ident):
233
234
235
        if not self.settings._sync_is_enabled(db):
            logger.warning("Cannot decide '%s'; sync disabled." % objname)
            return
236
        states = self._do_decide_file_sync(db, objname, master, slave, ident)
237
238
        if states is not None:
            with self.heartbeat.lock() as hb:
239
                beat = {"ident": ident, "thread": None}
240
                hb[self.reg_name(objname)] = beat
241
242
        return states

243
    def _do_decide_file_sync(self, db, objname, master, slave, ident,
244
                             dry_run=False):
245
        logger.debug("Deciding object: '%s'" % objname)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
246
247
248
249
        master_state = db.get_state(master, objname)
        slave_state = db.get_state(slave, objname)
        sync_state = db.get_state(self.SYNC, objname)
        decision_state = db.get_state(self.DECISION, objname)
250
251
252
        master_serial = master_state.serial
        slave_serial = slave_state.serial
        sync_serial = sync_state.serial
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
253
        decision_serial = decision_state.serial
254

255
        with self.heartbeat.lock() as hb:
256
            beat = hb.get(self.reg_name(objname))
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
257
            logger.debug("object: %s heartbeat: %s" % (objname, beat))
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
258
259
            if beat is not None:
                if beat["ident"] == ident:
260
261
262
263
264
265
                    logger.warning(
                        "Found used heartbeat ident %s for object %s" %
                        (ident, objname))
                    return None
                beat_thread = beat["thread"]
                if beat_thread is None or beat_thread.is_alive():
266
                    if not dry_run:
267
                        msg = messaging.HeartbeatNoDecideMessage(
268
269
                            objname=objname, heartbeat=beat, logger=logger)
                        self.messager.put(msg)
270
271
272
                    return None
                logger.warning("Ignoring previous run: %s %s" %
                               (objname, beat))
273

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
274
        if decision_serial != sync_serial:
275
276
            with self.failed_serials.lock() as d:
                failed_sync = d.get((decision_serial, objname))
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
277
278
279
            if failed_sync is None:
                logger.warning(
                    "Already decided: '%s', decision: %s, sync: %s" %
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
280
                    (objname, decision_serial, sync_serial))
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
281
282
283
284
285
286
                if decision_serial == master_serial:
                    return master_state, slave_state, sync_state
                elif decision_serial == slave_serial:
                    return slave_state, master_state, sync_state
                else:
                    raise AssertionError(
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
287
                        "Decision serial %s for objname '%s' "
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
288
                        "does not match any archive." %
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
289
                        (decision_serial, objname))
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
290
            else:
291
292
293
294
                if not dry_run:
                    msg = messaging.FailedSyncIgnoreDecisionMessage(
                        objname=objname, serial=decision_serial, logger=logger)
                    self.messager.put(msg)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
295
296

        if master_serial > sync_serial:
297
298
            if master_serial == decision_serial:  # this is a failed serial
                return None
299
            if not dry_run:
300
                self._make_decision_state(db, decision_state, master_state)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
301
            return master_state, slave_state, sync_state
302
303
        elif master_serial == sync_serial:
            if slave_serial > sync_serial:
304
305
                if slave_serial == decision_serial:  # this is a failed serial
                    return None
306
                if not dry_run:
307
                    self._make_decision_state(db, decision_state, slave_state)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
308
                return slave_state, master_state, sync_state
309
310
311
312
313
314
315
316
317
318
            elif slave_serial == sync_serial:
                return None
            else:
                raise AssertionError("Slave serial %s, sync serial %s"
                                     % (slave_serial, sync_serial))

        else:
            raise AssertionError("Master serial %s, sync serial %s"
                                 % (master_serial, sync_serial))

319
    def _make_decision_state(self, db, decision_state, source_state):
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
320
321
322
323
        new_decision_state = decision_state.set(
            serial=source_state.serial, info=source_state.info)
        db.put_state(new_decision_state)

324
325
326
327
328
329
330
331
332
333
334
335
    def enqueue_syncs(self, syncs):
        for sync in syncs:
            self.sync_queue.put(sync)

    def launch_syncs(self):
        with self.heartbeat.lock() as hb:
            alive_threads = len([v for v in hb.values()
                                 if v["thread"] is not None
                                 and v["thread"].is_alive()])
        max_alive_threads = self.settings.max_alive_sync_threads
        new_threads = max_alive_threads - alive_threads
        if new_threads > 0:
336
            logger.debug("Can start max %s syncs" % new_threads)
337
338
339
340
341
342
343
            for i in range(new_threads):
                try:
                    tpl = self.sync_queue.get(block=False)
                    self.sync_file(*tpl)
                except Queue.Empty:
                    break

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
344
    def sync_file(self, source_state, target_state, sync_state):
345
346
347
348
        msg = messaging.SyncMessage(
            objname=source_state.objname,
            archive=source_state.archive,
            serial=source_state.serial,
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
349
            info=source_state.info,
350
351
            logger=logger)
        self.messager.put(msg)
352
        thread = threading.Thread(
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
353
            target=self._sync_file,
354
            args=(source_state, target_state, sync_state))
355
356
357
358
359
360
361
        with self.heartbeat.lock() as hb:
            beat = hb.get(self.reg_name(source_state.objname))
            if beat is None:
                raise AssertionError("heartbeat for %s is None" %
                                     source_state.objname)
            assert beat["thread"] is None
            beat["thread"] = thread
362
        thread.start()
363
        self.sync_threads.append(thread)
364

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
365
    def _sync_file(self, source_state, target_state, sync_state):
366
367
368
369
370
371
372
373
374
375
        clients = self.clients
        source_client = clients[source_state.archive]
        try:
            source_handle = source_client.stage_file(source_state)
        except common.SyncError as e:
            logger.warning(e)
            return
        target_client = clients[target_state.archive]
        target_client.start_pulling_file(
            source_handle, target_state, sync_state,
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
376
            callback=self.ack_file_sync,
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
377
378
            failure_callback=self.mark_as_failed)

379
    def mark_as_failed(self, state, hard=False):
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
380
        serial = state.serial
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
381
        objname = state.objname
382
383
384
385
386
387
        if hard:
            logger.warning(
                "Marking failed serial %s for archive: %s, object: '%s'" %
                (serial, state.archive, objname))
            with self.failed_serials.lock() as d:
                d[(serial, objname)] = state
388
        self.clean_heartbeat([objname])
389

390
    def update_state(self, db, old_state, new_state):
391
392
393
394
        db.put_state(new_state)
        # here we could do any checks needed on the old state,
        # perhaps triggering a probe

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
395
    def ack_file_sync(self, synced_source_state, synced_target_state):
396
397
        with TransactedConnection(self.syncer_dbtuple) as db:
            self._ack_file_sync(db, synced_source_state, synced_target_state)
398
        serial = synced_source_state.serial
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
399
        objname = synced_source_state.objname
400
        target = synced_target_state.archive
401
        self.clean_heartbeat([objname])
402
403
404
405
        msg = messaging.AckSyncMessage(
            archive=target, objname=objname, serial=serial,
            logger=logger)
        self.messager.put(msg)
406

407
    def _ack_file_sync(self, db, synced_source_state, synced_target_state):
408
409
410
411
        serial = synced_source_state.serial
        objname = synced_source_state.objname
        source = synced_source_state.archive
        target = synced_target_state.archive
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
412
        tinfo = synced_target_state.info
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
413
414
415
        logger.debug("Acking archive: %s, object: '%s', serial: %s "
                     "info: %s" %
                     (target, objname, serial, tinfo))
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
416
417
        decision_state = db.get_state(self.DECISION, objname)
        sync_state = db.get_state(self.SYNC, objname)
418
419
420
421
422
423
424
425
426
427

        if serial != decision_state.serial:
            raise AssertionError(
                "Serial mismatch: assumed sync %s, decision %s"
                % (serial, decision_state.serial))
        if serial <= sync_state.serial:
            raise common.SyncError(
                "cannot ack: serial %s < sync serial %s" %
                (serial, sync_state.serial))

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
428
        db_source_state = db.get_state(source, objname)
429
        self.update_state(db, db_source_state, synced_source_state)
430
431
432

        final_target_state = synced_target_state.set(
            serial=serial)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
433
        db_target_state = db.get_state(target, objname)
434
        self.update_state(db, db_target_state, final_target_state)
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449

        sync_info = dict(synced_source_state.info)
        sync_info.update(synced_target_state.info)
        # The 'info' namespace is global. Some attributes may be globally
        # recognizable by all clients with the same semantics, such as
        # a content-hash (e.g. SHA256), while other may be specific to
        # each client. Clients are responsible to protect their private
        # attributes creating their own namespace, for example
        # 'localfs_mtime', 'object_store_etag'
        new_sync_state = sync_state.set(serial=serial, info=sync_info)
        db.put_state(new_sync_state)
        new_decision_state = new_sync_state.set(archive=self.DECISION)
        db.put_state(new_decision_state)

    def list_deciding(self, archives=None):
450
        try:
451
452
453
454
            with TransactedConnection(self.syncer_dbtuple) as db:
                return self._list_deciding(db, archives=archives)
        except common.DatabaseError:
            return set()
455

456
    def _list_deciding(self, db, archives=None):
457
458
        if archives is None:
            archives = (self.MASTER, self.SLAVE)
459
460
        return set(db.list_deciding(archives=archives,
                                    sync=self.SYNC))
461

462
    def probe_archive(self, archive, forced=False):
463
        ident = utils.time_stamp()
464
        client = self.clients[archive]
465
466
467
468
469
470
        try:
            candidates = client.list_candidate_files(forced=forced)
            self._probe_files(archive, candidates, ident)
            client.remove_candidates(candidates, ident)
        except common.DatabaseError:
            pass
471

472
473
474
    def decide_archive(self, archive=None):
        try:
            archives = [archive] if archive is not None else None
475
476
477
            objnames = self.list_deciding(archives)
            self.decide_file_syncs(objnames)
            self.launch_syncs()
478
479
        except common.DatabaseError:
            pass
480
481

    def decide_all_archives(self):
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
482
        logger.debug("Checking candidates to sync")
483
        self.probe_all()
484
        self.decide_archive()
485

486
487
488
489
    def probe_all(self, forced=False):
        self.probe_archive(self.MASTER, forced=forced)
        self.probe_archive(self.SLAVE, forced=forced)

490
    def _poll_decide(self, interval=3):
491
492
        class DecideThread(utils.StoppableThread):
            def run_body(this):
493
494
                self.decide_all_archives()
                time.sleep(interval)
495
496
        return utils.start_daemon(DecideThread)

497
498
499
500
501
502
503
504
505
506
507
    def check_decisions(self):
        deciding = self.list_deciding()
        decisions = self.dry_run_decisions(deciding)
        by_source = defaultdict(list)
        for decision in decisions:
            source_state = decision[0]
            source = source_state.archive
            objname = source_state.objname
            by_source[source].append(objname)
        return by_source

508
509
510
511
512
513
514
515
    # TODO cleanup db of objects deleted in all clients
    # def cleanup(self):
    #     db = self.get_db()
    #     master_deleted = set(db.list_files_with_info(MASTER, {}))
    #     client_deleted = set(db.list_files_with_info(SLAVE, {}))
    #     deleted = master_deleted.intersection(client_deleted)


Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
516
517
def conf(auth_url, auth_token, container, local_root_path, **kwargs):
    settings = SyncerSettings(auth_url=auth_url,
518
519
                              auth_token=auth_token,
                              container=container,
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
520
521
                              local_root_path=local_root_path,
                              **kwargs)
522
523
524
    master = PithosFileClient(settings)
    slave = LocalfsFileClient(settings)
    return FileSyncer(settings, master, slave)