syncer.py 18.5 KB
Newer Older
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# Copyright (C) 2015 GRNET S.A.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

16
17
18
import time
import threading
import logging
19
from collections import defaultdict
20
21
22
23
24
25

from agkyra.syncer import common
from agkyra.syncer.setup import SyncerSettings
from agkyra.syncer.database import transaction
from agkyra.syncer.localfs_client import LocalfsFileClient
from agkyra.syncer.pithos_client import PithosFileClient
26
from agkyra.syncer import messaging, utils
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41

logger = logging.getLogger(__name__)


class FileSyncer(object):

    dbname = None
    clients = None

    def __init__(self, settings, master, slave):
        self.settings = settings
        self.master = master
        self.slave = slave
        self.DECISION = 'DECISION'
        self.SYNC = 'SYNC'
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
42
43
        self.MASTER = master.SIGNATURE
        self.SLAVE = slave.SIGNATURE
44
45
        self.get_db = settings.get_db
        self.clients = {self.MASTER: master, self.SLAVE: slave}
46
47
48
        self.notifiers = {}
        self.decide_thread = None
        self.sync_threads = []
49
        self.failed_serials = utils.ThreadSafeDict()
50
        self.messager = settings.messager
51
        self.heartbeat = self.settings.heartbeat
52

53
54
55
56
57
58
59
    def thread_is_active(self, t):
        return t and t.is_alive()

    @property
    def decide_active(self):
        return self.thread_is_active(self.decide_thread)

60
61
    @property
    def paused(self):
62
        return not self.decide_active
63

64
    def initiate_probe(self):
65
        self.start_notifiers()
66
        self.probe_all(forced=True)
67
68

    def start_notifiers(self):
69
70
71
        for signature, client in self.clients.iteritems():
            notifier = self.notifiers.get(signature)
            if not self.thread_is_active(notifier):
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
72
                self.notifiers[signature] = client.notifier()
73
            else:
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
74
                logger.info("Notifier %s already up" % signature)
75
76
77

    def stop_notifiers(self):
        for notifier in self.notifiers.values():
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
78
79
80
81
82
83
84
            try:
                notifier.stop()
            except KeyError as e:
                # bypass watchdog inotify bug that causes a KeyError
                # when attempting to stop a notifier after the watched
                # directory has been deleted
                logger.warning("Ignored KeyError: %s" % e)
85
86

    def start_decide(self):
87
88
89
90
91
92
        if not self.decide_active:
            self.decide_thread = self._poll_decide()

    def stop_decide(self):
        if self.decide_active:
            self.decide_thread.stop()
93

94
95
96
97
98
99
100
    def stop_all_daemons(self):
        self.stop_decide()
        self.stop_notifiers()

    def wait_sync_threads(self):
        for thread in self.sync_threads:
            thread.join()
101

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
102
103
    def get_next_message(self, block=False):
        return self.messager.get(block=block)
104

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
105
    def probe_file(self, archive, objname):
106
        ident = utils.time_stamp()
107
108
109
110
111
112
        try:
            self._probe_files(archive, [objname], ident)
            client = self.clients[archive]
            client.remove_candidates([objname], ident)
        except common.DatabaseError:
            pass
113

114
115
116
    def reg_name(self, objname):
        return utils.reg_name(self.settings, objname)

117
118
119
120
121
122
    @transaction()
    def _probe_files(self, archive, objnames, ident):
        for objname in objnames:
            self._do_probe_file(archive, objname, ident)

    def _do_probe_file(self, archive, objname, ident):
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
123
        logger.info("Probing archive: %s, object: '%s'" % (archive, objname))
124
125
        db = self.get_db()
        client = self.clients[archive]
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
126
127
        db_state = db.get_state(archive, objname)
        ref_state = db.get_state(self.SYNC, objname)
128
        with self.heartbeat.lock() as hb:
129
            beat = hb.get(self.reg_name(objname))
130
131
            if beat is not None:
                if utils.younger_than(
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
132
                        beat["tstamp"], self.settings.action_max_wait):
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
133
134
135
136
                    msg = messaging.HeartbeatNoProbeMessage(
                        archive=archive, objname=objname, heartbeat=beat,
                        logger=logger)
                    self.messager.put(msg)
137
                    return
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
138
        if db_state.serial != ref_state.serial:
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
139
            msg = messaging.AlreadyProbedMessage(
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
140
141
                archive=archive, objname=objname, serial=db_state.serial,
                logger=logger)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
142
            self.messager.put(msg)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
143
            return
144
        live_state = client.probe_file(objname, db_state, ref_state, ident)
145
146
        if live_state is not None:
            self.update_file_state(live_state)
147

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
148
    def update_file_state(self, live_state):
149
150
        db = self.get_db()
        archive = live_state.archive
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
151
        objname = live_state.objname
152
        serial = live_state.serial
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
153
        db_state = db.get_state(archive, objname)
154
155
        if db_state and db_state.serial != serial:
            logger.warning(
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
156
                "Cannot update archive: %s, object: '%s', "
157
                "serial: %s, db_serial: %s" %
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
158
                (archive, objname, serial, db_state.serial))
159
160
            return

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
161
        new_serial = db.new_serial(objname)
162
163
        new_state = live_state.set(serial=new_serial)
        db.put_state(new_state)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
164
165
166
167
        msg = messaging.UpdateMessage(
            archive=archive, objname=objname,
            serial=new_serial, old_serial=serial, logger=logger)
        self.messager.put(msg)
168
169
        if new_serial == 0:
            sync_state = common.FileState(
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
170
                archive=self.SYNC, objname=objname, serial=-1,
171
172
173
                info={})
            db.put_state(sync_state)

174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
    @transaction()
    def dry_run_decisions(self, objnames, master=None, slave=None):
        if master is None:
            master = self.MASTER
        if slave is None:
            slave = self.SLAVE
        decisions = []
        for objname in objnames:
            decision = self._dry_run_decision(objname, master, slave)
            decisions.append(decision)
        return decisions

    def _dry_run_decision(self, objname, master=None, slave=None):
        if master is None:
            master = self.MASTER
        if slave is None:
            slave = self.SLAVE
        ident = utils.time_stamp()
        return self._do_decide_file_sync(objname, master, slave, ident, True)

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
194
    def decide_file_sync(self, objname, master=None, slave=None):
195
196
197
198
        if master is None:
            master = self.MASTER
        if slave is None:
            slave = self.SLAVE
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
199
        ident = utils.time_stamp()
200
201
202
203
        try:
            states = self._decide_file_sync(objname, master, slave, ident)
        except common.DatabaseError:
            return
204
205
        if states is None:
            return
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
206
        self.sync_file(*states)
207
208

    @transaction()
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
209
    def _decide_file_sync(self, objname, master, slave, ident):
210
211
212
213
        db = self.get_db()
        if not self.settings._sync_is_enabled(db):
            logger.warning("Cannot decide '%s'; sync disabled." % objname)
            return
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
214
        states = self._do_decide_file_sync(objname, master, slave, ident)
215
216
        if states is not None:
            with self.heartbeat.lock() as hb:
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
217
                beat = {"ident": ident, "tstamp": utils.time_stamp()}
218
                hb[self.reg_name(objname)] = beat
219
220
        return states

221
222
    def _do_decide_file_sync(self, objname, master, slave, ident,
                             dry_run=False):
223
        db = self.get_db()
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
224
225
226
227
228
        logger.info("Deciding object: '%s'" % objname)
        master_state = db.get_state(master, objname)
        slave_state = db.get_state(slave, objname)
        sync_state = db.get_state(self.SYNC, objname)
        decision_state = db.get_state(self.DECISION, objname)
229
230
231
        master_serial = master_state.serial
        slave_serial = slave_state.serial
        sync_serial = sync_state.serial
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
232
        decision_serial = decision_state.serial
233

234
        with self.heartbeat.lock() as hb:
235
            beat = hb.get(self.reg_name(objname))
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
236
            logger.debug("object: %s heartbeat: %s" % (objname, beat))
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
237
238
            if beat is not None:
                if beat["ident"] == ident:
239
240
241
242
                    if not dry_run:
                        msg = messaging.HeartbeatReplayDecideMessage(
                            objname=objname, heartbeat=beat, logger=logger)
                        self.messager.put(msg)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
243
244
245
                else:
                    if utils.younger_than(
                            beat["tstamp"], self.settings.action_max_wait):
246
247
248
249
                        if not dry_run:
                            msg = messaging.HeartbeatNoDecideMessage(
                                objname=objname, heartbeat=beat, logger=logger)
                            self.messager.put(msg)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
250
251
252
                        return None
                    logger.warning("Ignoring previous run: %s %s" %
                                   (objname, beat))
253

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
254
        if decision_serial != sync_serial:
255
256
            with self.failed_serials.lock() as d:
                failed_sync = d.get((decision_serial, objname))
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
257
258
259
            if failed_sync is None:
                logger.warning(
                    "Already decided: '%s', decision: %s, sync: %s" %
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
260
                    (objname, decision_serial, sync_serial))
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
261
262
263
264
265
266
                if decision_serial == master_serial:
                    return master_state, slave_state, sync_state
                elif decision_serial == slave_serial:
                    return slave_state, master_state, sync_state
                else:
                    raise AssertionError(
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
267
                        "Decision serial %s for objname '%s' "
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
268
                        "does not match any archive." %
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
269
                        (decision_serial, objname))
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
270
            else:
271
272
273
274
                if not dry_run:
                    msg = messaging.FailedSyncIgnoreDecisionMessage(
                        objname=objname, serial=decision_serial, logger=logger)
                    self.messager.put(msg)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
275
276

        if master_serial > sync_serial:
277
278
            if master_serial == decision_serial:  # this is a failed serial
                return None
279
280
            if not dry_run:
                self._make_decision_state(decision_state, master_state)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
281
            return master_state, slave_state, sync_state
282
283
        elif master_serial == sync_serial:
            if slave_serial > sync_serial:
284
285
                if slave_serial == decision_serial:  # this is a failed serial
                    return None
286
287
                if not dry_run:
                    self._make_decision_state(decision_state, slave_state)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
288
                return slave_state, master_state, sync_state
289
290
291
292
293
294
295
296
297
298
            elif slave_serial == sync_serial:
                return None
            else:
                raise AssertionError("Slave serial %s, sync serial %s"
                                     % (slave_serial, sync_serial))

        else:
            raise AssertionError("Master serial %s, sync serial %s"
                                 % (master_serial, sync_serial))

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
299
    def _make_decision_state(self, decision_state, source_state):
300
        db = self.get_db()
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
301
302
303
304
        new_decision_state = decision_state.set(
            serial=source_state.serial, info=source_state.info)
        db.put_state(new_decision_state)

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
305
    def sync_file(self, source_state, target_state, sync_state):
306
307
308
309
        msg = messaging.SyncMessage(
            objname=source_state.objname,
            archive=source_state.archive,
            serial=source_state.serial,
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
310
            info=source_state.info,
311
312
            logger=logger)
        self.messager.put(msg)
313
        thread = threading.Thread(
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
314
            target=self._sync_file,
315
316
            args=(source_state, target_state, sync_state))
        thread.start()
317
        self.sync_threads.append(thread)
318

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
319
    def _sync_file(self, source_state, target_state, sync_state):
320
321
322
323
324
325
326
327
328
329
        clients = self.clients
        source_client = clients[source_state.archive]
        try:
            source_handle = source_client.stage_file(source_state)
        except common.SyncError as e:
            logger.warning(e)
            return
        target_client = clients[target_state.archive]
        target_client.start_pulling_file(
            source_handle, target_state, sync_state,
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
330
            callback=self.ack_file_sync,
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
331
332
            failure_callback=self.mark_as_failed)

333
    def mark_as_failed(self, state, hard=False):
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
334
        serial = state.serial
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
335
        objname = state.objname
336
        with self.heartbeat.lock() as hb:
337
            hb.pop(self.reg_name(objname))
338
339
340
341
342
343
        if hard:
            logger.warning(
                "Marking failed serial %s for archive: %s, object: '%s'" %
                (serial, state.archive, objname))
            with self.failed_serials.lock() as d:
                d[(serial, objname)] = state
344
345
346
347
348
349
350

    def update_state(self, old_state, new_state):
        db = self.get_db()
        db.put_state(new_state)
        # here we could do any checks needed on the old state,
        # perhaps triggering a probe

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
351
    def ack_file_sync(self, synced_source_state, synced_target_state):
352
353
354
355
        try:
            self._ack_file_sync(synced_source_state, synced_target_state)
        except common.DatabaseError:
            return
356
        serial = synced_source_state.serial
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
357
        objname = synced_source_state.objname
358
        target = synced_target_state.archive
359
        with self.heartbeat.lock() as hb:
360
            hb.pop(self.reg_name(objname))
361
362
363
364
        msg = messaging.AckSyncMessage(
            archive=target, objname=objname, serial=serial,
            logger=logger)
        self.messager.put(msg)
365
366
367
368
369
370
371
372

    @transaction()
    def _ack_file_sync(self, synced_source_state, synced_target_state):
        db = self.get_db()
        serial = synced_source_state.serial
        objname = synced_source_state.objname
        source = synced_source_state.archive
        target = synced_target_state.archive
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
373
        tinfo = synced_target_state.info
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
374
375
376
        logger.debug("Acking archive: %s, object: '%s', serial: %s "
                     "info: %s" %
                     (target, objname, serial, tinfo))
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
377
378
        decision_state = db.get_state(self.DECISION, objname)
        sync_state = db.get_state(self.SYNC, objname)
379
380
381
382
383
384
385
386
387
388

        if serial != decision_state.serial:
            raise AssertionError(
                "Serial mismatch: assumed sync %s, decision %s"
                % (serial, decision_state.serial))
        if serial <= sync_state.serial:
            raise common.SyncError(
                "cannot ack: serial %s < sync serial %s" %
                (serial, sync_state.serial))

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
389
        db_source_state = db.get_state(source, objname)
390
391
392
393
        self.update_state(db_source_state, synced_source_state)

        final_target_state = synced_target_state.set(
            serial=serial)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
394
        db_target_state = db.get_state(target, objname)
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
        self.update_state(db_target_state, final_target_state)

        sync_info = dict(synced_source_state.info)
        sync_info.update(synced_target_state.info)
        # The 'info' namespace is global. Some attributes may be globally
        # recognizable by all clients with the same semantics, such as
        # a content-hash (e.g. SHA256), while other may be specific to
        # each client. Clients are responsible to protect their private
        # attributes creating their own namespace, for example
        # 'localfs_mtime', 'object_store_etag'
        new_sync_state = sync_state.set(serial=serial, info=sync_info)
        db.put_state(new_sync_state)
        new_decision_state = new_sync_state.set(archive=self.DECISION)
        db.put_state(new_decision_state)

    def list_deciding(self, archives=None):
411
412
413
414
415
416
417
        try:
            return self._list_deciding(archives=archives)
        except DatabaseError:
            return self.list_deciding(archives=archives)

    @transaction()
    def _list_deciding(self, archives=None):
418
419
420
        db = self.get_db()
        if archives is None:
            archives = (self.MASTER, self.SLAVE)
421
422
        return set(db.list_deciding(archives=archives,
                                    sync=self.SYNC))
423

424
    def probe_archive(self, archive, forced=False):
425
        ident = utils.time_stamp()
426
        client = self.clients[archive]
427
428
429
430
431
432
        try:
            candidates = client.list_candidate_files(forced=forced)
            self._probe_files(archive, candidates, ident)
            client.remove_candidates(candidates, ident)
        except common.DatabaseError:
            pass
433

434
435
436
437
438
439
440
    def decide_archive(self, archive=None):
        try:
            archives = [archive] if archive is not None else None
            for objname in self.list_deciding(archives):
                self.decide_file_sync(objname)
        except common.DatabaseError:
            pass
441
442

    def decide_all_archives(self):
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
443
        logger.debug("Checking candidates to sync")
444
        self.probe_all()
445
        self.decide_archive()
446

447
448
449
450
    def probe_all(self, forced=False):
        self.probe_archive(self.MASTER, forced=forced)
        self.probe_archive(self.SLAVE, forced=forced)

451
    def _poll_decide(self, interval=3):
452
453
        class DecideThread(utils.StoppableThread):
            def run_body(this):
454
455
                self.decide_all_archives()
                time.sleep(interval)
456
457
        return utils.start_daemon(DecideThread)

458
459
460
461
462
463
464
465
466
467
468
    def check_decisions(self):
        deciding = self.list_deciding()
        decisions = self.dry_run_decisions(deciding)
        by_source = defaultdict(list)
        for decision in decisions:
            source_state = decision[0]
            source = source_state.archive
            objname = source_state.objname
            by_source[source].append(objname)
        return by_source

469
470
471
472
473
474
475
476
    # TODO cleanup db of objects deleted in all clients
    # def cleanup(self):
    #     db = self.get_db()
    #     master_deleted = set(db.list_files_with_info(MASTER, {}))
    #     client_deleted = set(db.list_files_with_info(SLAVE, {}))
    #     deleted = master_deleted.intersection(client_deleted)


Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
477
478
def conf(auth_url, auth_token, container, local_root_path, **kwargs):
    settings = SyncerSettings(auth_url=auth_url,
479
480
                              auth_token=auth_token,
                              container=container,
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
481
482
                              local_root_path=local_root_path,
                              **kwargs)
483
484
485
    master = PithosFileClient(settings)
    slave = LocalfsFileClient(settings)
    return FileSyncer(settings, master, slave)