syncer.py 18.2 KB
Newer Older
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# Copyright (C) 2015 GRNET S.A.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

16
17
18
import time
import threading
import logging
19
from collections import defaultdict
20
21
22
23
24
25

from agkyra.syncer import common
from agkyra.syncer.setup import SyncerSettings
from agkyra.syncer.database import transaction
from agkyra.syncer.localfs_client import LocalfsFileClient
from agkyra.syncer.pithos_client import PithosFileClient
26
from agkyra.syncer import messaging, utils
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41

logger = logging.getLogger(__name__)


class FileSyncer(object):

    dbname = None
    clients = None

    def __init__(self, settings, master, slave):
        self.settings = settings
        self.master = master
        self.slave = slave
        self.DECISION = 'DECISION'
        self.SYNC = 'SYNC'
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
42
43
        self.MASTER = master.SIGNATURE
        self.SLAVE = slave.SIGNATURE
44
45
        self.get_db = settings.get_db
        self.clients = {self.MASTER: master, self.SLAVE: slave}
46
47
48
        self.notifiers = {}
        self.decide_thread = None
        self.sync_threads = []
49
        self.failed_serials = utils.ThreadSafeDict()
50
        self.messager = settings.messager
51
        self.heartbeat = self.settings.heartbeat
52

53
54
55
56
57
58
59
    def thread_is_active(self, t):
        return t and t.is_alive()

    @property
    def decide_active(self):
        return self.thread_is_active(self.decide_thread)

60
61
    @property
    def paused(self):
62
        return not self.decide_active
63

64
    def initiate_probe(self):
65
        self.start_notifiers()
66
        self.probe_all(forced=True)
67
68

    def start_notifiers(self):
69
70
71
        for signature, client in self.clients.iteritems():
            notifier = self.notifiers.get(signature)
            if not self.thread_is_active(notifier):
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
72
                self.notifiers[signature] = client.notifier()
73
            else:
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
74
                logger.info("Notifier %s already up" % signature)
75
76
77
78

    def stop_notifiers(self):
        for notifier in self.notifiers.values():
            notifier.stop()
79
80

    def start_decide(self):
81
82
83
84
85
86
        if not self.decide_active:
            self.decide_thread = self._poll_decide()

    def stop_decide(self):
        if self.decide_active:
            self.decide_thread.stop()
87

88
89
90
91
92
93
94
    def stop_all_daemons(self):
        self.stop_decide()
        self.stop_notifiers()

    def wait_sync_threads(self):
        for thread in self.sync_threads:
            thread.join()
95

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
96
97
    def get_next_message(self, block=False):
        return self.messager.get(block=block)
98

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
99
    def probe_file(self, archive, objname):
100
        ident = utils.time_stamp()
101
102
103
104
105
106
        try:
            self._probe_files(archive, [objname], ident)
            client = self.clients[archive]
            client.remove_candidates([objname], ident)
        except common.DatabaseError:
            pass
107

108
109
110
    def reg_name(self, objname):
        return utils.reg_name(self.settings, objname)

111
112
113
114
115
116
    @transaction()
    def _probe_files(self, archive, objnames, ident):
        for objname in objnames:
            self._do_probe_file(archive, objname, ident)

    def _do_probe_file(self, archive, objname, ident):
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
117
        logger.info("Probing archive: %s, object: '%s'" % (archive, objname))
118
119
        db = self.get_db()
        client = self.clients[archive]
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
120
121
        db_state = db.get_state(archive, objname)
        ref_state = db.get_state(self.SYNC, objname)
122
        with self.heartbeat.lock() as hb:
123
            beat = hb.get(self.reg_name(objname))
124
125
            if beat is not None:
                if utils.younger_than(
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
126
                        beat["tstamp"], self.settings.action_max_wait):
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
127
128
129
130
                    msg = messaging.HeartbeatNoProbeMessage(
                        archive=archive, objname=objname, heartbeat=beat,
                        logger=logger)
                    self.messager.put(msg)
131
                    return
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
132
        if db_state.serial != ref_state.serial:
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
133
            msg = messaging.AlreadyProbedMessage(
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
134
135
                archive=archive, objname=objname, serial=db_state.serial,
                logger=logger)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
136
            self.messager.put(msg)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
137
            return
138
        live_state = client.probe_file(objname, db_state, ref_state, ident)
139
140
        if live_state is not None:
            self.update_file_state(live_state)
141

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
142
    def update_file_state(self, live_state):
143
144
        db = self.get_db()
        archive = live_state.archive
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
145
        objname = live_state.objname
146
        serial = live_state.serial
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
147
        db_state = db.get_state(archive, objname)
148
149
        if db_state and db_state.serial != serial:
            logger.warning(
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
150
                "Cannot update archive: %s, object: '%s', "
151
                "serial: %s, db_serial: %s" %
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
152
                (archive, objname, serial, db_state.serial))
153
154
            return

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
155
        new_serial = db.new_serial(objname)
156
157
        new_state = live_state.set(serial=new_serial)
        db.put_state(new_state)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
158
159
160
161
        msg = messaging.UpdateMessage(
            archive=archive, objname=objname,
            serial=new_serial, old_serial=serial, logger=logger)
        self.messager.put(msg)
162
163
        if new_serial == 0:
            sync_state = common.FileState(
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
164
                archive=self.SYNC, objname=objname, serial=-1,
165
166
167
                info={})
            db.put_state(sync_state)

168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
    @transaction()
    def dry_run_decisions(self, objnames, master=None, slave=None):
        if master is None:
            master = self.MASTER
        if slave is None:
            slave = self.SLAVE
        decisions = []
        for objname in objnames:
            decision = self._dry_run_decision(objname, master, slave)
            decisions.append(decision)
        return decisions

    def _dry_run_decision(self, objname, master=None, slave=None):
        if master is None:
            master = self.MASTER
        if slave is None:
            slave = self.SLAVE
        ident = utils.time_stamp()
        return self._do_decide_file_sync(objname, master, slave, ident, True)

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
188
    def decide_file_sync(self, objname, master=None, slave=None):
189
190
191
192
        if master is None:
            master = self.MASTER
        if slave is None:
            slave = self.SLAVE
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
193
        ident = utils.time_stamp()
194
195
196
197
        try:
            states = self._decide_file_sync(objname, master, slave, ident)
        except common.DatabaseError:
            return
198
199
        if states is None:
            return
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
200
        self.sync_file(*states)
201
202

    @transaction()
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
203
    def _decide_file_sync(self, objname, master, slave, ident):
204
205
206
207
        db = self.get_db()
        if not self.settings._sync_is_enabled(db):
            logger.warning("Cannot decide '%s'; sync disabled." % objname)
            return
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
208
        states = self._do_decide_file_sync(objname, master, slave, ident)
209
210
        if states is not None:
            with self.heartbeat.lock() as hb:
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
211
                beat = {"ident": ident, "tstamp": utils.time_stamp()}
212
                hb[self.reg_name(objname)] = beat
213
214
        return states

215
216
    def _do_decide_file_sync(self, objname, master, slave, ident,
                             dry_run=False):
217
        db = self.get_db()
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
218
219
220
221
222
        logger.info("Deciding object: '%s'" % objname)
        master_state = db.get_state(master, objname)
        slave_state = db.get_state(slave, objname)
        sync_state = db.get_state(self.SYNC, objname)
        decision_state = db.get_state(self.DECISION, objname)
223
224
225
        master_serial = master_state.serial
        slave_serial = slave_state.serial
        sync_serial = sync_state.serial
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
226
        decision_serial = decision_state.serial
227

228
        with self.heartbeat.lock() as hb:
229
            beat = hb.get(self.reg_name(objname))
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
230
            logger.debug("object: %s heartbeat: %s" % (objname, beat))
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
231
232
            if beat is not None:
                if beat["ident"] == ident:
233
234
235
236
                    if not dry_run:
                        msg = messaging.HeartbeatReplayDecideMessage(
                            objname=objname, heartbeat=beat, logger=logger)
                        self.messager.put(msg)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
237
238
239
                else:
                    if utils.younger_than(
                            beat["tstamp"], self.settings.action_max_wait):
240
241
242
243
                        if not dry_run:
                            msg = messaging.HeartbeatNoDecideMessage(
                                objname=objname, heartbeat=beat, logger=logger)
                            self.messager.put(msg)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
244
245
246
                        return None
                    logger.warning("Ignoring previous run: %s %s" %
                                   (objname, beat))
247

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
248
        if decision_serial != sync_serial:
249
250
            with self.failed_serials.lock() as d:
                failed_sync = d.get((decision_serial, objname))
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
251
252
253
            if failed_sync is None:
                logger.warning(
                    "Already decided: '%s', decision: %s, sync: %s" %
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
254
                    (objname, decision_serial, sync_serial))
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
255
256
257
258
259
260
                if decision_serial == master_serial:
                    return master_state, slave_state, sync_state
                elif decision_serial == slave_serial:
                    return slave_state, master_state, sync_state
                else:
                    raise AssertionError(
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
261
                        "Decision serial %s for objname '%s' "
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
262
                        "does not match any archive." %
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
263
                        (decision_serial, objname))
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
264
            else:
265
266
267
268
                if not dry_run:
                    msg = messaging.FailedSyncIgnoreDecisionMessage(
                        objname=objname, serial=decision_serial, logger=logger)
                    self.messager.put(msg)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
269
270

        if master_serial > sync_serial:
271
272
            if master_serial == decision_serial:  # this is a failed serial
                return None
273
274
            if not dry_run:
                self._make_decision_state(decision_state, master_state)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
275
            return master_state, slave_state, sync_state
276
277
        elif master_serial == sync_serial:
            if slave_serial > sync_serial:
278
279
                if slave_serial == decision_serial:  # this is a failed serial
                    return None
280
281
                if not dry_run:
                    self._make_decision_state(decision_state, slave_state)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
282
                return slave_state, master_state, sync_state
283
284
285
286
287
288
289
290
291
292
            elif slave_serial == sync_serial:
                return None
            else:
                raise AssertionError("Slave serial %s, sync serial %s"
                                     % (slave_serial, sync_serial))

        else:
            raise AssertionError("Master serial %s, sync serial %s"
                                 % (master_serial, sync_serial))

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
293
    def _make_decision_state(self, decision_state, source_state):
294
        db = self.get_db()
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
295
296
297
298
        new_decision_state = decision_state.set(
            serial=source_state.serial, info=source_state.info)
        db.put_state(new_decision_state)

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
299
    def sync_file(self, source_state, target_state, sync_state):
300
301
302
303
        msg = messaging.SyncMessage(
            objname=source_state.objname,
            archive=source_state.archive,
            serial=source_state.serial,
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
304
            info=source_state.info,
305
306
            logger=logger)
        self.messager.put(msg)
307
        thread = threading.Thread(
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
308
            target=self._sync_file,
309
310
            args=(source_state, target_state, sync_state))
        thread.start()
311
        self.sync_threads.append(thread)
312

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
313
    def _sync_file(self, source_state, target_state, sync_state):
314
315
316
317
318
319
320
321
322
323
        clients = self.clients
        source_client = clients[source_state.archive]
        try:
            source_handle = source_client.stage_file(source_state)
        except common.SyncError as e:
            logger.warning(e)
            return
        target_client = clients[target_state.archive]
        target_client.start_pulling_file(
            source_handle, target_state, sync_state,
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
324
            callback=self.ack_file_sync,
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
325
326
            failure_callback=self.mark_as_failed)

327
    def mark_as_failed(self, state, hard=False):
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
328
        serial = state.serial
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
329
        objname = state.objname
330
        with self.heartbeat.lock() as hb:
331
            hb.pop(self.reg_name(objname))
332
333
334
335
336
337
        if hard:
            logger.warning(
                "Marking failed serial %s for archive: %s, object: '%s'" %
                (serial, state.archive, objname))
            with self.failed_serials.lock() as d:
                d[(serial, objname)] = state
338
339
340
341
342
343
344

    def update_state(self, old_state, new_state):
        db = self.get_db()
        db.put_state(new_state)
        # here we could do any checks needed on the old state,
        # perhaps triggering a probe

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
345
    def ack_file_sync(self, synced_source_state, synced_target_state):
346
347
348
349
        try:
            self._ack_file_sync(synced_source_state, synced_target_state)
        except common.DatabaseError:
            return
350
        serial = synced_source_state.serial
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
351
        objname = synced_source_state.objname
352
        target = synced_target_state.archive
353
        with self.heartbeat.lock() as hb:
354
            hb.pop(self.reg_name(objname))
355
356
357
358
        msg = messaging.AckSyncMessage(
            archive=target, objname=objname, serial=serial,
            logger=logger)
        self.messager.put(msg)
359
360
361
362
363
364
365
366

    @transaction()
    def _ack_file_sync(self, synced_source_state, synced_target_state):
        db = self.get_db()
        serial = synced_source_state.serial
        objname = synced_source_state.objname
        source = synced_source_state.archive
        target = synced_target_state.archive
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
367
        tinfo = synced_target_state.info
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
368
369
370
        logger.debug("Acking archive: %s, object: '%s', serial: %s "
                     "info: %s" %
                     (target, objname, serial, tinfo))
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
371
372
        decision_state = db.get_state(self.DECISION, objname)
        sync_state = db.get_state(self.SYNC, objname)
373
374
375
376
377
378
379
380
381
382

        if serial != decision_state.serial:
            raise AssertionError(
                "Serial mismatch: assumed sync %s, decision %s"
                % (serial, decision_state.serial))
        if serial <= sync_state.serial:
            raise common.SyncError(
                "cannot ack: serial %s < sync serial %s" %
                (serial, sync_state.serial))

Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
383
        db_source_state = db.get_state(source, objname)
384
385
386
387
        self.update_state(db_source_state, synced_source_state)

        final_target_state = synced_target_state.set(
            serial=serial)
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
388
        db_target_state = db.get_state(target, objname)
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
        self.update_state(db_target_state, final_target_state)

        sync_info = dict(synced_source_state.info)
        sync_info.update(synced_target_state.info)
        # The 'info' namespace is global. Some attributes may be globally
        # recognizable by all clients with the same semantics, such as
        # a content-hash (e.g. SHA256), while other may be specific to
        # each client. Clients are responsible to protect their private
        # attributes creating their own namespace, for example
        # 'localfs_mtime', 'object_store_etag'
        new_sync_state = sync_state.set(serial=serial, info=sync_info)
        db.put_state(new_sync_state)
        new_decision_state = new_sync_state.set(archive=self.DECISION)
        db.put_state(new_decision_state)

    def list_deciding(self, archives=None):
405
406
407
408
409
410
411
        try:
            return self._list_deciding(archives=archives)
        except DatabaseError:
            return self.list_deciding(archives=archives)

    @transaction()
    def _list_deciding(self, archives=None):
412
413
414
        db = self.get_db()
        if archives is None:
            archives = (self.MASTER, self.SLAVE)
415
416
        return set(db.list_deciding(archives=archives,
                                    sync=self.SYNC))
417

418
    def probe_archive(self, archive, forced=False):
419
        ident = utils.time_stamp()
420
        client = self.clients[archive]
421
422
423
424
425
426
        try:
            candidates = client.list_candidate_files(forced=forced)
            self._probe_files(archive, candidates, ident)
            client.remove_candidates(candidates, ident)
        except common.DatabaseError:
            pass
427

428
429
430
431
432
433
434
    def decide_archive(self, archive=None):
        try:
            archives = [archive] if archive is not None else None
            for objname in self.list_deciding(archives):
                self.decide_file_sync(objname)
        except common.DatabaseError:
            pass
435
436

    def decide_all_archives(self):
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
437
        logger.debug("Checking candidates to sync")
438
        self.probe_all()
439
        self.decide_archive()
440

441
442
443
444
    def probe_all(self, forced=False):
        self.probe_archive(self.MASTER, forced=forced)
        self.probe_archive(self.SLAVE, forced=forced)

445
    def _poll_decide(self, interval=3):
446
447
        class DecideThread(utils.StoppableThread):
            def run_body(this):
448
449
                self.decide_all_archives()
                time.sleep(interval)
450
451
        return utils.start_daemon(DecideThread)

452
453
454
455
456
457
458
459
460
461
462
    def check_decisions(self):
        deciding = self.list_deciding()
        decisions = self.dry_run_decisions(deciding)
        by_source = defaultdict(list)
        for decision in decisions:
            source_state = decision[0]
            source = source_state.archive
            objname = source_state.objname
            by_source[source].append(objname)
        return by_source

463
464
465
466
467
468
469
470
    # TODO cleanup db of objects deleted in all clients
    # def cleanup(self):
    #     db = self.get_db()
    #     master_deleted = set(db.list_files_with_info(MASTER, {}))
    #     client_deleted = set(db.list_files_with_info(SLAVE, {}))
    #     deleted = master_deleted.intersection(client_deleted)


Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
471
472
def conf(auth_url, auth_token, container, local_root_path, **kwargs):
    settings = SyncerSettings(auth_url=auth_url,
473
474
                              auth_token=auth_token,
                              container=container,
Giorgos Korfiatis's avatar
Giorgos Korfiatis committed
475
476
                              local_root_path=local_root_path,
                              **kwargs)
477
478
479
    master = PithosFileClient(settings)
    slave = LocalfsFileClient(settings)
    return FileSyncer(settings, master, slave)