syncer.py 19.2 KB
Newer Older
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# Copyright (C) 2015 GRNET S.A.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

16
17
18
import time
import threading
import logging
19
from collections import defaultdict
20
21
22
23
24
25

from agkyra.syncer import common
from agkyra.syncer.setup import SyncerSettings
from agkyra.syncer.database import transaction
from agkyra.syncer.localfs_client import LocalfsFileClient
from agkyra.syncer.pithos_client import PithosFileClient
26
from agkyra.syncer import messaging, utils
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41

logger = logging.getLogger(__name__)


class FileSyncer(object):

    dbname = None
    clients = None

    def __init__(self, settings, master, slave):
        self.settings = settings
        self.master = master
        self.slave = slave
        self.DECISION = 'DECISION'
        self.SYNC = 'SYNC'
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
42
43
        self.MASTER = master.SIGNATURE
        self.SLAVE = slave.SIGNATURE
44
45
        self.get_db = settings.get_db
        self.clients = {self.MASTER: master, self.SLAVE: slave}
46
47
48
        self.notifiers = {}
        self.decide_thread = None
        self.sync_threads = []
49
        self.failed_serials = utils.ThreadSafeDict()
50
        self.messager = settings.messager
51
        self.heartbeat = self.settings.heartbeat
52

53
54
55
56
57
58
59
    def thread_is_active(self, t):
        return t and t.is_alive()

    @property
    def decide_active(self):
        return self.thread_is_active(self.decide_thread)

60
61
    @property
    def paused(self):
62
        return not self.decide_active
63

64
    def initiate_probe(self):
65
        self.start_notifiers()
66
        self.probe_all(forced=True)
67
68

    def start_notifiers(self):
69
70
71
        for signature, client in self.clients.iteritems():
            notifier = self.notifiers.get(signature)
            if not self.thread_is_active(notifier):
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
72
                self.notifiers[signature] = client.notifier()
73
            else:
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
74
                logger.info("Notifier %s already up" % signature)
75
76
77

    def stop_notifiers(self):
        for notifier in self.notifiers.values():
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
78
79
80
81
82
83
84
            try:
                notifier.stop()
            except KeyError as e:
                # bypass watchdog inotify bug that causes a KeyError
                # when attempting to stop a notifier after the watched
                # directory has been deleted
                logger.warning("Ignored KeyError: %s" % e)
85
86

    def start_decide(self):
87
88
89
90
91
92
        if not self.decide_active:
            self.decide_thread = self._poll_decide()

    def stop_decide(self):
        if self.decide_active:
            self.decide_thread.stop()
93

94
95
96
97
98
99
100
    def stop_all_daemons(self):
        self.stop_decide()
        self.stop_notifiers()

    def wait_sync_threads(self):
        for thread in self.sync_threads:
            thread.join()
101

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
102
103
    def get_next_message(self, block=False):
        return self.messager.get(block=block)
104

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
105
    def probe_file(self, archive, objname):
106
        ident = utils.time_stamp()
107
108
109
110
111
112
        try:
            self._probe_files(archive, [objname], ident)
            client = self.clients[archive]
            client.remove_candidates([objname], ident)
        except common.DatabaseError:
            pass
113

114
115
116
    def reg_name(self, objname):
        return utils.reg_name(self.settings, objname)

117
118
119
120
121
122
    @transaction()
    def _probe_files(self, archive, objnames, ident):
        for objname in objnames:
            self._do_probe_file(archive, objname, ident)

    def _do_probe_file(self, archive, objname, ident):
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
123
        logger.info("Probing archive: %s, object: '%s'" % (archive, objname))
124
125
        db = self.get_db()
        client = self.clients[archive]
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
126
127
        db_state = db.get_state(archive, objname)
        ref_state = db.get_state(self.SYNC, objname)
128
        with self.heartbeat.lock() as hb:
129
            beat = hb.get(self.reg_name(objname))
130
            if beat is not None:
131
132
                beat_thread = beat["thread"]
                if beat_thread is None or beat_thread.is_alive():
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
133
134
135
136
                    msg = messaging.HeartbeatNoProbeMessage(
                        archive=archive, objname=objname, heartbeat=beat,
                        logger=logger)
                    self.messager.put(msg)
137
                    return
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
138
        if db_state.serial != ref_state.serial:
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
139
            msg = messaging.AlreadyProbedMessage(
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
140
141
                archive=archive, objname=objname, serial=db_state.serial,
                logger=logger)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
142
            self.messager.put(msg)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
143
            return
144
        live_state = client.probe_file(objname, db_state, ref_state, ident)
145
146
        if live_state is not None:
            self.update_file_state(live_state)
147

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
148
    def update_file_state(self, live_state):
149
150
        db = self.get_db()
        archive = live_state.archive
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
151
        objname = live_state.objname
152
        serial = live_state.serial
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
153
        db_state = db.get_state(archive, objname)
154
155
        if db_state and db_state.serial != serial:
            logger.warning(
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
156
                "Cannot update archive: %s, object: '%s', "
157
                "serial: %s, db_serial: %s" %
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
158
                (archive, objname, serial, db_state.serial))
159
160
            return

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
161
        new_serial = db.new_serial(objname)
162
163
        new_state = live_state.set(serial=new_serial)
        db.put_state(new_state)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
164
165
166
167
        msg = messaging.UpdateMessage(
            archive=archive, objname=objname,
            serial=new_serial, old_serial=serial, logger=logger)
        self.messager.put(msg)
168
169
        if new_serial == 0:
            sync_state = common.FileState(
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
170
                archive=self.SYNC, objname=objname, serial=-1,
171
172
173
                info={})
            db.put_state(sync_state)

174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
    @transaction()
    def dry_run_decisions(self, objnames, master=None, slave=None):
        if master is None:
            master = self.MASTER
        if slave is None:
            slave = self.SLAVE
        decisions = []
        for objname in objnames:
            decision = self._dry_run_decision(objname, master, slave)
            decisions.append(decision)
        return decisions

    def _dry_run_decision(self, objname, master=None, slave=None):
        if master is None:
            master = self.MASTER
        if slave is None:
            slave = self.SLAVE
        ident = utils.time_stamp()
        return self._do_decide_file_sync(objname, master, slave, ident, True)

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
194
    def decide_file_sync(self, objname, master=None, slave=None):
195
196
197
198
        if master is None:
            master = self.MASTER
        if slave is None:
            slave = self.SLAVE
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
199
        ident = utils.time_stamp()
200
201
202
        try:
            states = self._decide_file_sync(objname, master, slave, ident)
        except common.DatabaseError:
203
204
205
206
207
208
            logger.debug("DatabaseError in _decide_file_sync "
                         "for '%s'; cleaning up heartbeat" % objname)
            with self.heartbeat.lock() as hb:
                beat = hb.get(self.reg_name(objname))
                if beat and beat["ident"] == ident:
                    hb.pop(self.reg_name(objname))
209
            return
210
211
        if states is None:
            return
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
212
        self.sync_file(*states)
213
214

    @transaction()
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
215
    def _decide_file_sync(self, objname, master, slave, ident):
216
217
218
219
        db = self.get_db()
        if not self.settings._sync_is_enabled(db):
            logger.warning("Cannot decide '%s'; sync disabled." % objname)
            return
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
220
        states = self._do_decide_file_sync(objname, master, slave, ident)
221
222
        if states is not None:
            with self.heartbeat.lock() as hb:
223
                beat = {"ident": ident, "thread": None}
224
                hb[self.reg_name(objname)] = beat
225
226
        return states

227
228
    def _do_decide_file_sync(self, objname, master, slave, ident,
                             dry_run=False):
229
        db = self.get_db()
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
230
231
232
233
234
        logger.info("Deciding object: '%s'" % objname)
        master_state = db.get_state(master, objname)
        slave_state = db.get_state(slave, objname)
        sync_state = db.get_state(self.SYNC, objname)
        decision_state = db.get_state(self.DECISION, objname)
235
236
237
        master_serial = master_state.serial
        slave_serial = slave_state.serial
        sync_serial = sync_state.serial
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
238
        decision_serial = decision_state.serial
239

240
        with self.heartbeat.lock() as hb:
241
            beat = hb.get(self.reg_name(objname))
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
242
            logger.debug("object: %s heartbeat: %s" % (objname, beat))
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
243
244
            if beat is not None:
                if beat["ident"] == ident:
245
246
247
248
                    if not dry_run:
                        msg = messaging.HeartbeatReplayDecideMessage(
                            objname=objname, heartbeat=beat, logger=logger)
                        self.messager.put(msg)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
249
                else:
250
251
                    beat_thread = beat["thread"]
                    if beat_thread is None or beat_thread.is_alive():
252
253
254
255
                        if not dry_run:
                            msg = messaging.HeartbeatNoDecideMessage(
                                objname=objname, heartbeat=beat, logger=logger)
                            self.messager.put(msg)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
256
257
258
                        return None
                    logger.warning("Ignoring previous run: %s %s" %
                                   (objname, beat))
259

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
260
        if decision_serial != sync_serial:
261
262
            with self.failed_serials.lock() as d:
                failed_sync = d.get((decision_serial, objname))
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
263
264
265
            if failed_sync is None:
                logger.warning(
                    "Already decided: '%s', decision: %s, sync: %s" %
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
266
                    (objname, decision_serial, sync_serial))
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
267
268
269
270
271
272
                if decision_serial == master_serial:
                    return master_state, slave_state, sync_state
                elif decision_serial == slave_serial:
                    return slave_state, master_state, sync_state
                else:
                    raise AssertionError(
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
273
                        "Decision serial %s for objname '%s' "
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
274
                        "does not match any archive." %
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
275
                        (decision_serial, objname))
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
276
            else:
277
278
279
280
                if not dry_run:
                    msg = messaging.FailedSyncIgnoreDecisionMessage(
                        objname=objname, serial=decision_serial, logger=logger)
                    self.messager.put(msg)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
281
282

        if master_serial > sync_serial:
283
284
            if master_serial == decision_serial:  # this is a failed serial
                return None
285
286
            if not dry_run:
                self._make_decision_state(decision_state, master_state)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
287
            return master_state, slave_state, sync_state
288
289
        elif master_serial == sync_serial:
            if slave_serial > sync_serial:
290
291
                if slave_serial == decision_serial:  # this is a failed serial
                    return None
292
293
                if not dry_run:
                    self._make_decision_state(decision_state, slave_state)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
294
                return slave_state, master_state, sync_state
295
296
297
298
299
300
301
302
303
304
            elif slave_serial == sync_serial:
                return None
            else:
                raise AssertionError("Slave serial %s, sync serial %s"
                                     % (slave_serial, sync_serial))

        else:
            raise AssertionError("Master serial %s, sync serial %s"
                                 % (master_serial, sync_serial))

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
305
    def _make_decision_state(self, decision_state, source_state):
306
        db = self.get_db()
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
307
308
309
310
        new_decision_state = decision_state.set(
            serial=source_state.serial, info=source_state.info)
        db.put_state(new_decision_state)

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
311
    def sync_file(self, source_state, target_state, sync_state):
312
313
314
315
        msg = messaging.SyncMessage(
            objname=source_state.objname,
            archive=source_state.archive,
            serial=source_state.serial,
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
316
            info=source_state.info,
317
318
            logger=logger)
        self.messager.put(msg)
319
        thread = threading.Thread(
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
320
            target=self._sync_file,
321
            args=(source_state, target_state, sync_state))
322
323
324
325
326
327
328
        with self.heartbeat.lock() as hb:
            beat = hb.get(self.reg_name(source_state.objname))
            if beat is None:
                raise AssertionError("heartbeat for %s is None" %
                                     source_state.objname)
            assert beat["thread"] is None
            beat["thread"] = thread
329
        thread.start()
330
        self.sync_threads.append(thread)
331

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
332
    def _sync_file(self, source_state, target_state, sync_state):
333
334
335
336
337
338
339
340
341
342
        clients = self.clients
        source_client = clients[source_state.archive]
        try:
            source_handle = source_client.stage_file(source_state)
        except common.SyncError as e:
            logger.warning(e)
            return
        target_client = clients[target_state.archive]
        target_client.start_pulling_file(
            source_handle, target_state, sync_state,
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
343
            callback=self.ack_file_sync,
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
344
345
            failure_callback=self.mark_as_failed)

346
    def mark_as_failed(self, state, hard=False):
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
347
        serial = state.serial
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
348
        objname = state.objname
349
350
351
352
353
354
        if hard:
            logger.warning(
                "Marking failed serial %s for archive: %s, object: '%s'" %
                (serial, state.archive, objname))
            with self.failed_serials.lock() as d:
                d[(serial, objname)] = state
355
356
        with self.heartbeat.lock() as hb:
            hb.pop(self.reg_name(objname))
357
358
359
360
361
362
363

    def update_state(self, old_state, new_state):
        db = self.get_db()
        db.put_state(new_state)
        # here we could do any checks needed on the old state,
        # perhaps triggering a probe

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
364
    def ack_file_sync(self, synced_source_state, synced_target_state):
365
366
367
        try:
            self._ack_file_sync(synced_source_state, synced_target_state)
        except common.DatabaseError:
368
            # maybe clear heartbeat here too
369
            return
370
        serial = synced_source_state.serial
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
371
        objname = synced_source_state.objname
372
        target = synced_target_state.archive
373
        with self.heartbeat.lock() as hb:
374
            hb.pop(self.reg_name(objname))
375
376
377
378
        msg = messaging.AckSyncMessage(
            archive=target, objname=objname, serial=serial,
            logger=logger)
        self.messager.put(msg)
379
380
381
382
383
384
385
386

    @transaction()
    def _ack_file_sync(self, synced_source_state, synced_target_state):
        db = self.get_db()
        serial = synced_source_state.serial
        objname = synced_source_state.objname
        source = synced_source_state.archive
        target = synced_target_state.archive
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
387
        tinfo = synced_target_state.info
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
388
389
390
        logger.debug("Acking archive: %s, object: '%s', serial: %s "
                     "info: %s" %
                     (target, objname, serial, tinfo))
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
391
392
        decision_state = db.get_state(self.DECISION, objname)
        sync_state = db.get_state(self.SYNC, objname)
393
394
395
396
397
398
399
400
401
402

        if serial != decision_state.serial:
            raise AssertionError(
                "Serial mismatch: assumed sync %s, decision %s"
                % (serial, decision_state.serial))
        if serial <= sync_state.serial:
            raise common.SyncError(
                "cannot ack: serial %s < sync serial %s" %
                (serial, sync_state.serial))

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
403
        db_source_state = db.get_state(source, objname)
404
405
406
407
        self.update_state(db_source_state, synced_source_state)

        final_target_state = synced_target_state.set(
            serial=serial)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
408
        db_target_state = db.get_state(target, objname)
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
        self.update_state(db_target_state, final_target_state)

        sync_info = dict(synced_source_state.info)
        sync_info.update(synced_target_state.info)
        # The 'info' namespace is global. Some attributes may be globally
        # recognizable by all clients with the same semantics, such as
        # a content-hash (e.g. SHA256), while other may be specific to
        # each client. Clients are responsible to protect their private
        # attributes creating their own namespace, for example
        # 'localfs_mtime', 'object_store_etag'
        new_sync_state = sync_state.set(serial=serial, info=sync_info)
        db.put_state(new_sync_state)
        new_decision_state = new_sync_state.set(archive=self.DECISION)
        db.put_state(new_decision_state)

    def list_deciding(self, archives=None):
425
426
427
428
429
430
431
        try:
            return self._list_deciding(archives=archives)
        except DatabaseError:
            return self.list_deciding(archives=archives)

    @transaction()
    def _list_deciding(self, archives=None):
432
433
434
        db = self.get_db()
        if archives is None:
            archives = (self.MASTER, self.SLAVE)
435
436
        return set(db.list_deciding(archives=archives,
                                    sync=self.SYNC))
437

438
    def probe_archive(self, archive, forced=False):
439
        ident = utils.time_stamp()
440
        client = self.clients[archive]
441
442
443
444
445
446
        try:
            candidates = client.list_candidate_files(forced=forced)
            self._probe_files(archive, candidates, ident)
            client.remove_candidates(candidates, ident)
        except common.DatabaseError:
            pass
447

448
449
450
451
452
453
454
    def decide_archive(self, archive=None):
        try:
            archives = [archive] if archive is not None else None
            for objname in self.list_deciding(archives):
                self.decide_file_sync(objname)
        except common.DatabaseError:
            pass
455
456

    def decide_all_archives(self):
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
457
        logger.debug("Checking candidates to sync")
458
        self.probe_all()
459
        self.decide_archive()
460

461
462
463
464
    def probe_all(self, forced=False):
        self.probe_archive(self.MASTER, forced=forced)
        self.probe_archive(self.SLAVE, forced=forced)

465
    def _poll_decide(self, interval=3):
466
467
        class DecideThread(utils.StoppableThread):
            def run_body(this):
468
469
                self.decide_all_archives()
                time.sleep(interval)
470
471
        return utils.start_daemon(DecideThread)

472
473
474
475
476
477
478
479
480
481
482
    def check_decisions(self):
        deciding = self.list_deciding()
        decisions = self.dry_run_decisions(deciding)
        by_source = defaultdict(list)
        for decision in decisions:
            source_state = decision[0]
            source = source_state.archive
            objname = source_state.objname
            by_source[source].append(objname)
        return by_source

483
484
485
486
487
488
489
490
    # TODO cleanup db of objects deleted in all clients
    # def cleanup(self):
    #     db = self.get_db()
    #     master_deleted = set(db.list_files_with_info(MASTER, {}))
    #     client_deleted = set(db.list_files_with_info(SLAVE, {}))
    #     deleted = master_deleted.intersection(client_deleted)


Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
491
492
def conf(auth_url, auth_token, container, local_root_path, **kwargs):
    settings = SyncerSettings(auth_url=auth_url,
493
494
                              auth_token=auth_token,
                              container=container,
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
495
496
                              local_root_path=local_root_path,
                              **kwargs)
497
498
499
    master = PithosFileClient(settings)
    slave = LocalfsFileClient(settings)
    return FileSyncer(settings, master, slave)